← Back to team overview

nrtb-core team mailing list archive

[Branch ~fpstovall/nrtb/cpp_common] Rev 49: Implmented the transceiver class and some cleanups and bugfixs to the sockets class.

 

Merge authors:
  Rick Stovall (fpstovall)
------------------------------------------------------------
revno: 49 [merge]
committer: fpstovall@xxxxxxxxx
branch nick: dev
timestamp: Thu 2011-08-25 00:32:29 -0400
message:
  Implmented the transceiver class and some cleanups and bugfixs to the sockets class.
  
  I believe this is now ready for alpha service.
modified:
  common/Makefile
  common/sockets/base_socket.cpp
  common/sockets/base_socket.h
  common/sockets/socket_test.cpp
  common/transceiver/Makefile
  common/transceiver/transceiver.cpp
  common/transceiver/transceiver.h
  common/transceiver/transceiver_test.cpp


--
lp:~fpstovall/nrtb/cpp_common
https://code.launchpad.net/~fpstovall/nrtb/cpp_common

Your team NRTB Core is subscribed to branch lp:~fpstovall/nrtb/cpp_common.
To unsubscribe from this branch go to https://code.launchpad.net/~fpstovall/nrtb/cpp_common/+edit-subscription
=== modified file 'common/Makefile'
--- common/Makefile	2011-08-13 14:23:21 +0000
+++ common/Makefile	2011-08-15 03:47:28 +0000
@@ -46,3 +46,4 @@
 	@cd logger; make ${action}
 	@cd confreader; make ${action}
 	@cd GPB; make ${action}
+	@cd transceiver; make ${action}

=== modified file 'common/sockets/base_socket.cpp'
--- common/sockets/base_socket.cpp	2011-08-13 01:04:57 +0000
+++ common/sockets/base_socket.cpp	2011-08-25 04:18:41 +0000
@@ -65,8 +65,11 @@
 	if (returnme > 0)
 	{
 		// yes.. store for the caller.
-		inbuff[returnme] = 0;
-		data.assign((const char *) &(inbuff[0]));
+		data.resize(returnme,0);
+		for (int i=0; i<returnme; i++)
+		{
+		  data[i] = inbuff[i];
+		};
 	}
 	// ... or was there an error?
 	else if (errno) 
@@ -584,11 +587,13 @@
 	{
 		// yes, this is -rude- shutdown of the listening thread.
 		// Get over it.
-		thread_data.lock();
-		okay_to_continue = false;
-		thread_data.unlock();
-		close(listen_sock);
 		stop();
+		try
+		{ 
+		  if (listen_sock)
+			close(listen_sock);
+		}
+		catch (...) {};
 	};
 };
 

=== modified file 'common/sockets/base_socket.h'
--- common/sockets/base_socket.h	2011-08-09 00:40:44 +0000
+++ common/sockets/base_socket.h	2011-08-15 01:58:41 +0000
@@ -20,6 +20,7 @@
 #define base_socket_header
 
 #include <base_thread.h>
+#include <boost/shared_ptr.hpp>
 #include <sys/socket.h>
 #include <netinet/in.h>
 
@@ -371,6 +372,9 @@
 		std::string get_remote_address();
 };
 
+/// smart pointer for use with tcp_sockets
+typedef boost::shared_ptr<nrtb::tcp_socket> tcp_socketp;
+
 /** Abstract "listener" TCP/IP socket for servers. 
  ** 
  ** Simplifies the use of TCP/IP sockets for applications providing services.

=== modified file 'common/sockets/socket_test.cpp'
--- common/sockets/socket_test.cpp	2011-08-13 00:11:42 +0000
+++ common/sockets/socket_test.cpp	2011-08-25 02:21:28 +0000
@@ -19,6 +19,7 @@
 #include <iostream>
 #include <sstream>
 #include <string>
+#include <boost/random.hpp>
 #include "base_socket.h"
 #include <boost/shared_ptr.hpp>
 
@@ -81,11 +82,21 @@
   return returnme;
 };
 
-const string address = "127.0.0.1:17000";
+string address = "127.0.0.1:";
+int port_base = 17000;
 
 int main()
 {
   int er_count = 0;
+  //set up our port and address
+  boost::mt19937 rng;
+  rng.seed(time(0));
+  boost::uniform_int<> r(0,1000);
+  stringstream s;
+  s << address << port_base + r(rng);
+  address = s.str();
+  cout << "Using " << address << endl;
+  
   myserver test_server(address,5);
 
   try
@@ -105,7 +116,7 @@
 	  {
 		er_count++;
 	  };
-	  cout << returned << ": " 
+	  cout << returned.substr(0,returned.size()-1) << ": " 
 		<< ((returned == checkme) ? "Passed" : "Failed")
 		<< endl;
 	  usleep(1000);

=== modified file 'common/transceiver/Makefile'
--- common/transceiver/Makefile	2010-12-30 03:05:40 +0000
+++ common/transceiver/Makefile	2011-08-25 04:22:52 +0000
@@ -17,22 +17,15 @@
 #***********************************************
 
 lib:	transceiver_test
+	@./transceiver_test
 	@cp -v transceiver.h ../include
-	@cp -v transceiver.o ../obj
 	@echo build complete
 
-../include/confreader.h:
-	@cp ../confreader; make lib
-
-transceiver.o:	transceiver.h transceiver.cpp Makefile ../include/confreader.h
-	@rm -f transceiver.o
-	g++ -c transceiver.cpp -I ../include
-
-transceiver_test:	transceiver.o transceiver_test.cpp
+transceiver_test:	transceiver.h transceiver_test.cpp
 	@rm -f transceiver_test
-	g++ -c transceiver_test.cpp
-	g++ -o transceiver_test transceiver_test.o transceiver.o ../obj/common.o ../obj/log_setup.o -lPocoFoundation -lPocoUtil
+	g++ -c transceiver_test.cpp -I../include
+	g++ -o transceiver_test transceiver_test.o ../obj/common.o ../obj/log_setup.o ../obj/serializer.o ../obj/base_thread.o ../obj/base_socket.o ../obj/confreader.o -lpthread -lprotobuf  ../lib/nrtb_gpb.a -lPocoFoundation
 
 clean:
-	@rm -rvf *.o transceiver_test ../include/transceiver.h ../obj/transceiver.o
+	@rm -rvf *.o transceiver_test ../include/transceiver.h *.log ../obj/transceiver.o
 	@echo all objects and executables have been erased.

=== modified file 'common/transceiver/transceiver.cpp'
--- common/transceiver/transceiver.cpp	2010-12-31 14:48:07 +0000
+++ common/transceiver/transceiver.cpp	2011-08-15 03:47:28 +0000
@@ -17,89 +17,124 @@
  **********************************************/
  
 #include "transceiver.h"
-#include "../include/confreader.h"
-#include <Poco/Mutex.h>
-#include <Poco/ScopedLock.h>
-#include <stringstream>
-
-class serializer_type
-{
-public:
-  unsigned int operator()
-  {
-	Poco::ScopedLock<Poco::Mutex>(lock);
-	return ++counter;
-  };
-private:
-  Poco::Mutex lock;
-  unsigned int counter;
-  serializer_type()
-  {
-	counter = 0;
-  };
-};
-
-serializer_type serializer;  
+#include <confreader.h>
+#include <serializer.h>
+#include <string>
+#include <sstream>
 
 using namespace nrtb;
-
-template <class out, class in>
-transceiver::transceiver(Poco::Net::StreamSocket socket)
+using namespace std;
+
+serializer tscvr_sequence(0);
+
+template <class out, class in, class outp, class inp>
+transceiver<out,in,outp,inp>::transceiver(tcp_socketp socket)
 {
   // get the configuration parameters.
-  conf_reader & config = conf_reader::get_instance();
+  global_conf_reader & config = global_conf_reader::get_instance();
   send_time_limit = config.get<unsigned int>("transceiver.send_timeout",2);
   attempt_recovery = config.get<bool>("transceiver.allow_recovery",true);
   error_run_limit = 
 	config.get<unsigned int>("transceiver.max_consecutive_errors",10);
   sent_messages.resize(config.get<int>("transceiver.history_size",50));
-  // set up uid
-  uid = serializer();
   // set up logging
-  std::stringstream s << logname << "_" << uid ;
+  std::stringstream s;
+  s << "transceiver:_" << tscvr_sequence();
   log = Poco::Logger::get(s.str());
-};
-
-template <class out, class in>
-transceiver::~transceiver()
-{
-  
-};
-
-template <class out, class in>
-in transceiver::get()
-{
-  
-};
-
-template <class out, class in>
-void transceiver::send(out & sendme)
-{
-  
-};
-
-template <class out, class in>
-void transceiver::nak_invalid_context(const unsigned long int msg_number)
-{
-  
-};
-
-template <class out, class in>
-void transceiver::nak_validation_error(const unsigned long int msg_number)
-{
-  
-};
-
-template <class out, class in>
-void transceiver::handle_inbound_nak()
-{
-  
-};
-
-template <class out, class in>
-void transceiver::handle_outbound_nak()
-{
-  
+  // set up the socket.
+  sock(socket);
+  // annouce ourselves...
+  log->trace("Instanciated."); 
+  s.str("");
+  s << "history_size=" << sent_messages.size()
+    << ", send_timeout=" << send_time_limit
+    << ", attempt_recovery=" << attempt_recovery
+    << ", error_run_limit=" << error_run_limit
+    << ", remote_address=" << sock->get_remote_address()
+    << ", local_address=" << sock->get_local_address();
+  log->trace(s.str());
+};
+
+template <class out, class in, class outp, class inp>
+transceiver<out,in,outp,inp>::~transceiver()
+{
+	log->trace("In ~transciever");
+	// shutdown the socket.
+	sock->close();
+	// discard the sent messages list.
+	sent_messages.clear();
+	log->trace("shutdown complete.");
+};
+
+template <class out, class in, class outp, class inp>
+inp transceiver<out,in,outp,inp>::get()
+{
+  inp returnme(new in);
+  string input = sock->getln();
+  returnme->ParseFromString(input);
+  // for the first messsge any number is
+  // accepted.
+  if (last_inbound == 0)
+  {
+	last_inbound = returnme->msg_uid();
+  }
+  else
+  {
+	last_inbound++;
+	int temp = returnme->msg_uid();
+	if (temp != last_inbound)
+	{ 
+	  inbound_seq_error e();
+	  stringstream message;
+	  message << "Expected " << last_inbound
+		<< " received "  << temp;
+	  e.store(message.str());
+	  throw e;
+	};
+  };
+  return returnme;
+};
+
+template <class out, class in, class outp, class inp>
+void transceiver<out,in,outp,inp>::send(outp sendme)
+{
+  sendme->set_msg_uid(out_msg_num());
+  string output;
+  output = sendme->SerializeAsString() + "\r";
+  sock->put(output);
+  sent_messages.push_back(sendme);
+};
+
+template <class out, class in, class outp, class inp>
+void transceiver<out,in,outp,inp>::nak_invalid_context(const unsigned long int msg_number)
+{
+  general_exception e;
+  e.store("transceiver<outp,inp>::nak_invalid_context not implemented.");
+  throw e;
+};
+
+template <class out, class in, class outp, class inp>
+void transceiver<out,in,outp,inp>::nak_validation_error(const unsigned long int msg_number)
+{
+  general_exception e;
+  e.store("transceiver<outp,inp>::nak_validation_error not implemented.");
+  throw e;
+};
+
+template <class out, class in, class outp, class inp>
+void transceiver<out,in,outp,inp>::handle_inbound_nak()
+{
+  general_exception e;
+  e.store("transceiver<outp,inp>::handle_inbound_nak not implemented.");
+  throw e;  
+};
+
+template <class out, class in, class outp, class inp>
+void transceiver<out,in,outp,inp>::handle_outbound_nak()
+{
+  general_exception e;
+  e.store("transceiver<outp,inp>::handle_outbound_nak not implemented.");
+  throw e;    
 };
 
 

=== modified file 'common/transceiver/transceiver.h'
--- common/transceiver/transceiver.h	2010-12-31 14:48:07 +0000
+++ common/transceiver/transceiver.h	2011-08-25 04:22:52 +0000
@@ -20,10 +20,13 @@
 #define nrtb_transceiver_h
 
 #include <string>
-#include <Poco/Net/StreamSocket.h>
-#include <Poco/Net/SocketStream.h>
+#include <sstream>
+#include <common.h>
+#include <base_socket.h>
+#include <serializer.h>
+#include <confreader.h>
 #include <Poco/Logger.h>
-#include <Poco/Exception.h>
+#include <boost/shared_ptr.hpp>
 #include <boost/circular_buffer.hpp>
 
 namespace nrtb
@@ -47,21 +50,28 @@
    * See https://blueprints.launchpad.net/nrtb/+spec/icp-spec for
    * specification this class implements.
    * ***************************************************************/
-  template <class out, class in>
+  template <class out, class in,
+	class outp = boost::shared_ptr<out>,
+	class inp = boost::shared_ptr<in> >
   class transceiver
   {
 	public:
-	  /// outbound messages will be of this type
-	  typedef outbound_type out;
-	  /// inbound messages will be of this type.
-	  typedef inbound_type in;
-	  /**************************************************************
+	  typedef inp in_ptr;
+	  typedef outp out_ptr;
+	  
+	  union msg_num_t
+	  {
+		uint32_t number;
+		unsigned char bytes[4];
+	  };
+	  
+	  /*************************************************************
 	   * Creates the transceiver and associates it with a provided 
 	   * socket. Once created this class assumes it uniquely owns the 
 	   * socket and will close it upon distruction.
 	   * ***********************************************************/
-	  transceiver(Poco::Net::StreamSocket socket);
-	  /**************************************************************
+	  transceiver(tcp_socketp socket);
+	  /*************************************************************
 	   * Closes the socket and releases all mmemory associated with
 	   * this class.
 	   * ***********************************************************/
@@ -70,13 +80,13 @@
 	   * gets the next message from the socket. If no messages are 
 	   * ready, blocks util one arrives. 
 	   * ***********************************************************/
-	  in & get();
+	  inp get();
 	  /**************************************************************
 	   * Sends a message over the socket and adds it to the 
 	   * sent_messages buffer in case it's needed for error recovery.
 	   * ***********************************************************/
-	  void send(out & sendme);
-	  /**************************************************************
+	  void send(outp sendme);
+	  /**q************************************************************
 	   * Called by the data consumer when an inbound message was 
 	   * not valid in the current application context. msg_number
 	   * is the sequence number of the offending message.
@@ -91,37 +101,175 @@
 	  /**************************************************************
 	   * Exceptions which may be thrown for external resulution.
 	   * ***********************************************************/
-	  // parent of all transceiver exceptions
-	  POCO_DECLARE_EXCEPTION(transceiver, general_exception, Poco::Exception)
-	  // thrown if send fails due to timeout.
-	  POCO_DECLARE_EXCEPTION(transceiver, send_timeout, general_exception)
-	  // thrown in the case of a fault while in get()
-	  POCO_DECLARE_EXCEPTION(transceiver, get_fault, general_exception)
-	  // Thrown if the socket is closed due to an unrecoverable date error.
-	  POCO_DECLARE_EXCEPTION(transceiver, unrecoverable_data_error, general_exception)
-	  // Thrown if the socket is closed due to too many errors in a row
-	  POCO_DECLARE_EXCEPTION(transceiver, consecutive_error_overrun, general_exception)
-	protected:
-	  unsigned in uid;
-	  const std::string logname = "transceiver:";
+	  /// parent of all transceiver exceptions
+	  class general_exception : public nrtb::base_exception {};
+	  /// thrown if the a unexpected msg_uid is received
+	  class inbound_seq_error : public general_exception {};
+	  /// thrown the connection is unexpectedly lost
+	  class connection_lost: public general_exception {};
+	  /// thrown if too many sequencial errors occur.
+	  class too_many_errors: public general_exception {};
+	  
+  protected:
 	  unsigned int send_time_limit;
 	  bool attempt_recovery;
 	  unsigned int error_run_limit;
-	  // pointer to this class's logger instance
-	  Poco::Logger * log;
-	  // The socket used for communcation.
-	  Poco::Net::StreamSocket sock;
-	  // the associated iostream (Do we need this?)
-	  Poco::Net::SocketStream stream;
-	  // buffer to hold previously sent messages; required for 
-	  // error recovery.
-	  boost::circular_buffer<out> sent_messages;
-	  // fence post for recovery efforts, zero if none in play
+	  /// the name used for logging
+	  std::string logname;
+	  /// The socket used for communcation.
+	  tcp_socketp sock;
+	  /// serializer used for message numbers
+	  serializer out_msg_num;
+	  /// last received message number 
+	  unsigned long long  last_inbound;
+	  /// buffer to hold previously sent messages; required for 
+	  /// error recovery.
+	  boost::circular_buffer<outp> sent_messages;
+	  /// fence post for recovery efforts, zero if none in play
 	  unsigned long long nak_fence_post;
-	  // These methods implment actual nak recovery.
+	  /// These methods implment actual nak recovery.
 	  void handle_inbound_nak();
 	  void handle_outbound_nak();
   };
+  
+
+serializer tscvr_sequence(0);
+
+template <class out, class in, class outp, class inp>
+transceiver<out,in,outp,inp>::transceiver(tcp_socketp socket)
+{
+  // get the configuration parameters.
+  global_conf_reader & config = global_conf_reader::get_instance();
+  send_time_limit = config.get<unsigned int>("transceiver.send_timeout",2);
+  attempt_recovery = config.get<bool>("transceiver.allow_recovery",true);
+  error_run_limit = 
+	config.get<unsigned int>("transceiver.max_consecutive_errors",10);
+  sent_messages.resize(config.get<int>("transceiver.history_size",50));
+  last_inbound = 0;
+  // set up logging
+  std::stringstream s;
+  s << "transceiver:_" << tscvr_sequence();
+  logname = s.str();
+  Poco::Logger & log = Poco::Logger::get(logname);
+  // set up the socket.
+  sock = socket;
+  // annouce ourselves...
+  log.trace("Instanciated."); 
+  s.str("");
+  s << "history_size=" << sent_messages.size()
+    << ", send_timeout=" << send_time_limit
+    << ", attempt_recovery=" << attempt_recovery
+    << ", error_run_limit=" << error_run_limit
+    << ", remote_address=" << sock->get_remote_address()
+    << ", local_address=" << sock->get_local_address();
+  log.trace(s.str());
+};
+
+template <class out, class in, class outp, class inp>
+transceiver<out,in,outp,inp>::~transceiver()
+{
+	Poco::Logger & log = Poco::Logger::get(logname);
+	log.trace("In ~transciever");
+	// shutdown the socket.
+	sock->close();
+	// discard the sent messages list.
+	sent_messages.clear();
+	log.trace("shutdown complete.");
+};
+
+template <class out, class in, class outp, class inp>
+inp transceiver<out,in,outp,inp>::get()
+{
+  // get the message length first.
+  std::string len_field = sock->get(4,10);
+//std::cout << "len_field=" << http_chartohex(len_field) << std::endl;  
+  msg_num_t msg_len;
+  for (int i=0; i<4; i++)
+  {
+	msg_len.bytes[i] = len_field[i];
+  };
+//std::cout << ":len=" << msg_len.number << std::endl;
+  // get the rest of the message.
+  inp returnme(new in);
+  std::string input = sock->get(msg_len.number);
+//std::cout << ":received=" << http_chartohex(input) << std::endl;
+  returnme->ParseFromString(input);
+  // for the first messsge any number is
+  // accepted.
+  if (last_inbound == 0)
+  {
+	last_inbound = returnme->msg_uid();
+  }
+  else
+  {
+	last_inbound++;
+	int temp = returnme->msg_uid();
+	if (temp != last_inbound)
+	{ 
+	  inbound_seq_error e;
+	  std::stringstream message;
+	  message << "Expected " << last_inbound
+		<< " received "  << temp;
+	  e.store(message.str());
+	  throw e;
+	};
+  };
+  return returnme;
+};
+
+template <class out, class in, class outp, class inp>
+void transceiver<out,in,outp,inp>::send(outp sendme)
+{
+  sendme->set_msg_uid(out_msg_num());
+  std::string output;
+  output = sendme->SerializeAsString();
+  msg_num_t msg_len;
+  msg_len.number = output.size();
+//std::cout << "num:len" << msg_len.number << ":" << output.length() << //std::endl;
+  std::string num_field = "    ";
+  for (int i=0; i<4; i++)
+  {
+	num_field[i] = msg_len.bytes[i];
+//std::cout << int(num_field[i]) << "," ;
+  };
+//std::cout << " = " << msg_len.number << std::endl;  
+  output = num_field + output;
+//std::cout << "out msg=" << http_chartohex(output) << std::endl;  
+  sock->put(output);
+  sent_messages.push_back(sendme);
+};
+
+template <class out, class in, class outp, class inp>
+void transceiver<out,in,outp,inp>::nak_invalid_context(const unsigned long int msg_number)
+{
+  general_exception e;
+  e.store("transceiver<outp,inp>::nak_invalid_context not implemented.");
+  throw e;
+};
+
+template <class out, class in, class outp, class inp>
+void transceiver<out,in,outp,inp>::nak_validation_error(const unsigned long int msg_number)
+{
+  general_exception e;
+  e.store("transceiver<outp,inp>::nak_validation_error not implemented.");
+  throw e;
+};
+
+template <class out, class in, class outp, class inp>
+void transceiver<out,in,outp,inp>::handle_inbound_nak()
+{
+  general_exception e;
+  e.store("transceiver<outp,inp>::handle_inbound_nak not implemented.");
+  throw e;  
+};
+
+template <class out, class in, class outp, class inp>
+void transceiver<out,in,outp,inp>::handle_outbound_nak()
+{
+  general_exception e;
+  e.store("transceiver<outp,inp>::handle_outbound_nak not implemented.");
+  throw e;    
+};
 
 } // namespace nrtb
  

=== modified file 'common/transceiver/transceiver_test.cpp'
--- common/transceiver/transceiver_test.cpp	2010-12-30 03:05:40 +0000
+++ common/transceiver/transceiver_test.cpp	2011-08-25 04:22:52 +0000
@@ -14,4 +14,132 @@
  You should have received a copy of the GNU General Public License
  along with NRTB.  If not, see <http://www.gnu.org/licenses/>.
  
- **********************************************/
\ No newline at end of file
+ **********************************************/
+ 
+#include <string>
+#include <iostream>
+#include <base_thread.h>
+#include <base_socket.h>
+#include "transceiver.h"
+#include <log_setup.h>
+#include <sim_to_db_wrapper.pb.h>
+#include <boost/random.hpp>
+
+using namespace nrtb;
+using namespace std;
+
+typedef nrtb_msg::sim_to_db my_msg;
+typedef boost::shared_ptr<my_msg> my_msgp;
+typedef transceiver<my_msg,my_msg> linkt;
+
+int er_count = 0;
+
+class server_work_thread: public runnable
+{
+public:
+  
+  tcp_socketp sock;
+  unsigned long long last_inbound;
+  
+  void run()
+  {
+	set_cancel_anytime();
+	linkt link(sock);
+	while (sock->status() == tcp_socket::sock_connect)
+	{
+	  try 
+	  {
+		linkt::out_ptr inbound = link.get();
+		last_inbound = inbound->msg_uid();
+		cout << "\tReceived #" << last_inbound << endl;
+		if (last_inbound == 99)
+		  exit(0);
+	  }
+	  catch (linkt::general_exception & e)
+	  {
+		cerr << "Server work thread caught " << e.what()
+		  << "\n\tComment: " << e.comment() << endl;
+		er_count++;
+	  }
+	  catch (tcp_socket::general_exception & e)
+	  {
+		cerr << "Server work thread caught " << e.what()
+		  << "\n\tComment: " << e.comment() << endl;
+		er_count++;
+	  }
+	  catch (std::exception & e)
+	  {
+		cerr << "Server work thread caught " << e.what() 
+		  << endl;
+		er_count++;
+	  };
+	};
+  };
+};
+
+thread process;
+server_work_thread task;
+  
+class listener: public tcp_server_socket_factory
+{
+public:
+  listener(const string & add, const int & back)
+   : tcp_server_socket_factory(add, back) {};
+  
+  void on_accept()
+  {
+	if (!process.is_running())
+	{
+	  task.last_inbound = 0;
+	  task.sock.reset(connect_sock);
+	  process.start(task);
+	  cout << "server thread running." << endl;
+	}
+	else
+	{
+	  tcp_socketp s(connect_sock);
+	  s->close();
+	  cerr << "Multiple attempts to connect to server" 
+		<< endl;
+	};
+  };
+};
+
+string address = "127.0.0.1:";
+int port_base = 12334;
+
+int main()
+{
+  setup_global_logging("transceiver.log");
+  
+  //set up our port and address
+  boost::mt19937 rng;
+  rng.seed(time(0));
+  boost::uniform_int<> r(0,1000);
+  stringstream s;
+  s << address << port_base + r(rng);
+  address = s.str();
+  cout << "Using " << address << endl;
+
+  // kick off the listener thread.
+  listener server(address,5);
+  server.start_listen();
+  usleep(5e5);
+
+  // set up our sender
+  tcp_socketp sock(new tcp_socket);
+  sock->connect(address);
+  linkt sender(sock);
+
+  // Let's send a few things.
+  for (int i=0; i<100; i++)
+  {
+	my_msgp msg(new my_msg);
+	sender.send(msg);
+	cout << "Sent " << msg->msg_uid() << endl;
+	usleep(1e4);
+  };
+  
+  usleep(5e5);
+  return er_count;
+};
\ No newline at end of file