← Back to team overview

nrtb-core team mailing list archive

[Merge] lp:~fpstovall/nrtb/listener-fix into lp:nrtb

 

Rick Stovall has proposed merging lp:~fpstovall/nrtb/listener-fix into lp:nrtb.

Requested reviews:
  NRTB Core (nrtb-core)
Related bugs:
  Bug #834301 in New Real Time Battle: "Legacy file common/transceiver/transceiver.cpp needs to be removed"
  https://bugs.launchpad.net/nrtb/+bug/834301
  Bug #837090 in New Real Time Battle: "nrtb::tcp_server_socket_factory randomly crashes on shutdown."
  https://bugs.launchpad.net/nrtb/+bug/837090
  Bug #851880 in New Real Time Battle: "Some NRTB common classes destructors may throw."
  https://bugs.launchpad.net/nrtb/+bug/851880

For more details, see:
https://code.launchpad.net/~fpstovall/nrtb/listener-fix/+merge/75840

Fixed problems with the various classes, particularly sockets and threads, which were preventing reliable unit test and build completion.
-- 
https://code.launchpad.net/~fpstovall/nrtb/listener-fix/+merge/75840
Your team NRTB Core is requested to review the proposed merge of lp:~fpstovall/nrtb/listener-fix into lp:nrtb.
=== modified file 'common/serializer/serializer.h'
--- common/serializer/serializer.h	2011-07-30 15:32:43 +0000
+++ common/serializer/serializer.h	2011-09-17 01:38:23 +0000
@@ -37,7 +37,7 @@
 	// constructor which sets the starting number.
 	serializer(unsigned long long start);
 	// NOP distructor for inheritance safety
-	~serializer();
+	virtual ~serializer();
 	// functor method, returns the next value in the sequence.
 	unsigned long long operator ()();
   private:

=== modified file 'common/sockets/Makefile'
--- common/sockets/Makefile	2011-08-13 00:40:17 +0000
+++ common/sockets/Makefile	2011-09-17 01:38:23 +0000
@@ -24,7 +24,7 @@
 
 socket_test:	base_socket.o socket_test.cpp
 	@rm -f socket_test
-	g++ -c socket_test.cpp -I ../include
+	g++ -c -O3 socket_test.cpp -I ../include
 	g++ -o socket_test socket_test.o base_socket.o ../obj/hires_timer.o ../obj/common.o ../obj/base_thread.o -lpthread 
 
 

=== modified file 'common/sockets/base_socket.cpp'
--- common/sockets/base_socket.cpp	2011-08-25 04:18:41 +0000
+++ common/sockets/base_socket.cpp	2011-09-17 01:38:23 +0000
@@ -25,6 +25,9 @@
 #include <netdb.h>
 #include <boost/lexical_cast.hpp>
 
+// testing
+#include <iostream>
+
 using boost::lexical_cast;
 using std::string;
 
@@ -100,8 +103,8 @@
 {	
 	if (close_on_destruct)
 	{
-		shutdown(mysock,SHUT_RDWR);
-		::close(mysock);
+		try {shutdown(mysock,SHUT_RDWR); } catch (...) {};
+		try {::close(mysock); } catch (...) {};
 		_status = sock_undef;
 	};
 };
@@ -569,32 +572,18 @@
 
 
 tcp_server_socket_factory::tcp_server_socket_factory(
-		const string & address, const unsigned short int & backlog)
+		const string & address, const unsigned short int & backlog = 5)
 {
 	// does not attempt to set up the connection initially.
 	_address = address;
 	_backlog = backlog;
 	_last_thread_fault = 0;
-	thread_return = 0;
-	in_on_accept = false;
-	connect_sock = 0;
 };
 
 tcp_server_socket_factory::~tcp_server_socket_factory()
 {
-	// are we current handling requests?
-	if (is_running())
-	{
-		// yes, this is -rude- shutdown of the listening thread.
-		// Get over it.
-		stop();
-		try
-		{ 
-		  if (listen_sock)
-			close(listen_sock);
-		}
-		catch (...) {};
-	};
+  // make sure we've stopped doing anything.
+  try { stop_listen(); } catch (...) {};
 };
 
 void tcp_server_socket_factory::start_listen()
@@ -602,74 +591,46 @@
 	// take no action if the listen thread is already running.
 	if (!is_running())
 	{
-		// load up the control flags
-		in_on_accept = false;
-		okay_to_continue = true;
-		thread_return = 0;
 		// start it up!
-		detach();
 		start();
+	}
+	else
+	{
+	  throw already_running_exception();
 	};
 };
 
 void tcp_server_socket_factory::stop_listen()
 {
-	// take action only if the listen thread is running.
-	if (is_running())
-	{
-		bool okay_to_kill = false;
-		// set the flag that indicates the thread should not process any more.
-		{
-			scope_lock lock(thread_data);
-			bool okay_to_kill = !in_on_accept;
-		}
-		// close the listener socket.
-		close(listen_sock);
-		// is is okay to cancel the thread?
-		if (!okay_to_kill)
-		{
-			// we'll allow up to 30 seconds for on_accept() to finish.
-			time_t endtime = time(NULL);
-			endtime += 30;
-			bool done = false;
-			while (!done)
-			{
-				// wait a second...
-				sleep(1);
-				// check boundaries.
-				// -- are we out of on_accept()?
-				{
-				  scope_lock lock(thread_data);
-				  done = !in_on_accept;
-				}
-				// -- are we out of time?
-				okay_to_kill = done;
-				if (time(NULL) > endtime) { done = true; };
-			};
-		};
-		// kill the listener thread.
-		if (!okay_to_kill) { throw on_accept_bound_exception(); };
-		stop();
-	};
+  // take action only if the listen thread is running.
+  if (listening())
+  {
+	// stop the listener thread
+	if (is_running()) stop();
+	// wait here until the thread stops.
+	if (is_running()) join();
+//	try
+//	{ 
+//	  if (listen_sock) close(listen_sock);
+//	}
+//	catch (...) {};
+  };
 };
 
 bool tcp_server_socket_factory::listening()
 {
 	bool running = is_running();
-	if (!running)
+/*	if (!running)
 	{
 		// check to be sure the thread did not die due to an error.
-		if (thread_return != 0)
+		if (_last_thread_fault != 0)
 		{
-			// Clear the error state before throwing an exception.
-			_last_thread_fault = thread_return;
-			thread_return = 0;
-			// if thread_return was non-zero, it is assumed the thread died an
-			// evil and useless death. Scream in anger!
-			throw listen_terminated_exception();
+		  // if thread_return was non-zero, it is assumed the thread died an
+		  // evil and useless death. Scream in anger!
+		  throw listen_terminated_exception();
 		};
 	};
-	return running;
+*/	return running;
 };
 
 unsigned short int tcp_server_socket_factory::backlog()
@@ -677,149 +638,114 @@
 	return _backlog;
 };
 
+//socket closer to use with exit trap.
+void closeme(void * sock)
+{
+  std::cerr << "in thread cleanup sock closer" << std::endl;
+  int & socket = *(static_cast<int*>(sock));
+  ::close(socket);
+  std::cerr << "socker closer done." << std::endl;
+};
+
 void tcp_server_socket_factory::run()
 {
-	/* Put this entire thing in a try block to protect the application. 
-	 * Without this, an untrapped exception thrown here or in the user supplied
-	 * on_accept() method would abort the entire application instead of just this
-	 * thread.
-	 */
+  /* Put this entire thing in a try block to protect the application. 
+	* Without this, an untrapped exception thrown here or in the 
+	* user supplied on_accept() method would abort the entire 
+	* application instead of just this
+	* thread.
+	*/
+  int listen_sock;
+  // make sure the listener is closed when we exit.
+  pthread_cleanup_push(closeme, (void*) &listen_sock);
+  try
+  {
+	bool go = true;
+	// set up our listening socket.
+	listen_sock = socket(AF_INET,SOCK_STREAM,0);
+	sockaddr_in myaddr;
 	try
 	{
-		bool go = true;
-		// set up our listening socket.
-		listen_sock = socket(AF_INET,SOCK_STREAM,0);
-		sockaddr_in myaddr;
-		try
-		{
-			myaddr = tcp_socket::str_to_sockaddr(_address);
-		}
-		catch (...)
-		{
-			// probably a tcp_socket::bad_address_exception, 
-			// but any reason will do.
-			go = false;
-		};
-		if (bind(listen_sock,(sockaddr *) &myaddr,sizeof(myaddr)))
-		{
-			// bind did not work.
-			go = false;
-		};	
-		if (listen(listen_sock,_backlog))
-		{
-			// listen failed in some way.. I don't care which.
-			go = false;
-		};
-		// processing loop
-		while (go)
-		{
-			// are we okay to proceed?
-			{
-			  scope_lock lock(thread_data);
-			  in_on_accept = false;
-			  go = okay_to_continue;
-			}
-			if (!go)
-			{
-				close(listen_sock);
-				exit(0);
-			};
-			// accept a new connection
-			bool good_connect = true;
-			int new_conn = accept(listen_sock,NULL,NULL);
-			// validate the accept return value.
-			if (new_conn == -1)
-			{
-				// accept returned an error.
-				switch (errno) 
-				{
-					case ENETDOWN :
-					case EPROTO :
-					case ENOPROTOOPT :
-					case EHOSTDOWN :
-					case ENONET :
-					case EHOSTUNREACH :
-					case EOPNOTSUPP :
-					case ENETUNREACH :
-					case EAGAIN :
-//					case EWOULDBLOCK : 
-					case EPERM :
-					case ECONNABORTED :
-						{
-							good_connect = false;
-							break;
-						};
-					default : 
-						{	
-							// for any other error, we're going to shutdown the 
-							// this listener thread.
-							{
-							  scope_lock lock(thread_data);
-							  // If the error was caused by an attempt to close the 
-							  // socket and shutdown the thread, don't store an
-							  // error flag.
-							  if (okay_to_continue)
-							  {
-								  thread_return = errno;
-							  }
-							  else
-							  {
-								  thread_return = 0;
-							  };
-							};
-							exit(errno);
-							break;
-						};
-				};  // switch (errno)
-			}; // error thrown by accept.
-			// create connect_sock
-			connect_sock = new tcp_socket(new_conn);
-			// are we okay to proceed?
-			{
-			  scope_lock lock(thread_data);
-			  go = okay_to_continue;
-			  // if we are go, then we'll be going to on_accept next.
-			  // Therefore, this equality makes sense if you hold your
-			  // head just so. :-P
-			  in_on_accept = go;
-			}
-			if (!go)
-			{
-				delete connect_sock;
-				close(listen_sock);
-				exit(0);
-			};
-			// only call on_accept() if we have a good connection.
-			if (good_connect)
-			{
-				// make the thread easily cancelable.
-				set_cancel_anytime();
-				// call on_accept
-				on_accept();
-				// set back to cancel_deferred.
-				set_deferred_cancel();
-			};
-		}; // while go;
-		// if we get here then things are not going well...
-		{
-		  scope_lock lock(thread_data);
-		  thread_return = -2;
-		}
-		close(listen_sock);
-		exit(-2);
+		myaddr = tcp_socket::str_to_sockaddr(_address);
 	}
 	catch (...)
 	{
-		/* an untrapped exception was thrown by someone in this thread.
-		 * We'll shutdown this thread and put -1 in the thread_return field
-		 * to let the world know that we don't know what killed us.
-		 */
-		thread_data.lock();
-		thread_return = -1;
-		thread_data.unlock();
-		close(listen_sock);
-		exit(-1);
-	};
+		// probably a tcp_socket::bad_address_exception, 
+		// but any reason will do.
+		go = false;
+	};
+	if (bind(listen_sock,(sockaddr *) &myaddr,sizeof(myaddr)))
+	{
+		// bind did not work.
+		go = false;
+	};	
+	if (listen(listen_sock,_backlog))
+	{
+		// listen failed in some way.. I don't care which.
+		go = false;
+	};
+	// processing loop
+	while (go)
+	{
+	  // accept a new connection
+	  bool good_connect = true;
+	  int new_conn = accept(listen_sock,NULL,NULL);
+	  // validate the accept return value.
+	  if (new_conn == -1)
+	  {
+		// accept returned an error.
+		switch (errno) 
+		{
+//			case ENETDOWN :
+			case EPROTO :
+//			case ENOPROTOOPT :
+			case EHOSTDOWN :
+//			case ENONET :
+			case EHOSTUNREACH :
+//			case EOPNOTSUPP :
+//			case ENETUNREACH :
+			case EAGAIN :
+//			case EPERM :
+			case ECONNABORTED :
+				{
+				  good_connect = false;
+				  break;
+				};
+			default : 
+				{	
+				  // for any other error, we're going to shutdown the 
+				  // this listener thread.
+				  go = false;
+				  good_connect = false;
+				  _last_thread_fault = errno;
+				  break;
+				};
+		};  // switch (errno)
+	  }; // error thrown by accept.
+	  if (good_connect)
+	  {
+		// create connect_sock
+		connect_sock.reset(new tcp_socket(new_conn));
+		// make the thread easily cancelable.
+		set_cancel_anytime();
+		// call on_accept
+		go = on_accept();
+		// set back to cancel_deferred.
+		set_deferred_cancel();
+		// release our claim to the new socket
+		connect_sock.reset();
+	  };
+	}; // while go;
+  }
+  catch (...)
+  {
+	/* an untrapped exception was thrown by someone in this thread.
+	* We'll shutdown this thread and put -1 in the thread_return field
+	* to let the world know that we don't know what killed us.
+	*/
+	_last_thread_fault = -1;
+  };
+  pthread_cleanup_pop(0);
 };
 
 } // namespace nrtb

=== modified file 'common/sockets/base_socket.h'
--- common/sockets/base_socket.h	2011-08-15 01:58:41 +0000
+++ common/sockets/base_socket.h	2011-09-17 01:38:23 +0000
@@ -373,7 +373,7 @@
 };
 
 /// smart pointer for use with tcp_sockets
-typedef boost::shared_ptr<nrtb::tcp_socket> tcp_socketp;
+typedef boost::shared_ptr<nrtb::tcp_socket> tcp_socket_p;
 
 /** Abstract "listener" TCP/IP socket for servers. 
  ** 
@@ -405,12 +405,11 @@
  ** the stop_listen() method, which will return when as soon any current calls
  ** to on_accept() complete.
  ** 
- ** --WARNING--- This class is a tcp_socket factory in that a new tcp_socket is 
- ** created for every connection. It is the responsibility of the host 
- ** application to delete these when they are no longer needed, either in the 
- ** overridden on_accept() method or (more likely) later in the processing flow
- ** when the transaction is complete.
- ** 
+ ** --NOTE--- This class is a tcp_socket_p factory in that a new one 
+ ** is created for each request received. As the tcp_socket_p is a 
+ ** smart pointer, the allocated socket will be closed and deallocated 
+ ** when the last reference to the socket goes out of scope automatically.
+ **
  ** Descendent classes must override on_accept() to provide the necessary
  ** connection handling. See the documentation on on_accept for more details.
  **/
@@ -419,13 +418,7 @@
 
 	private:
 	
-		int listen_sock;
-		bool in_on_accept;
-		bool okay_to_continue;
-		int thread_return;
 		int _last_thread_fault;
-		mutex thread_data;
-
 		// Provides the listener thread.
 		void run();
 		
@@ -439,15 +432,12 @@
 		 ** only safe to use/manipulate after entry of the method on_accept(); 
 		 ** at any other time this pointer may be altered without notice. 
 		 ** 
-		 ** At entry to on_accept() this will point to a valid base_sock 
-		 ** object. It is up to the application to delete this object when
-		 ** it is done with it; server_sock will not take any actions with or 
-		 ** on the object pointed to by listen_sock once on_accept() is called.
-		 ** However, once on_accept() has returned, server_sock will create 
-		 ** another new base_sock and store it's address here on the next 
-		 ** connection.
+		 ** At entry to on_accept() this will point to a valid tcp_socket_p
+		 ** smart pointer. As with all smart pointers, the object it 
+		 ** points to will be deleted automatically when the last 
+		 ** reference to it goes out of scope. 
 		 **/
-		tcp_socket * connect_sock;
+		tcp_socket_p connect_sock;
 
 		/** Abstract method to process connections. An on_accept() call is 
 		 ** made on every connection accepted immediately after constructing a 
@@ -460,10 +450,8 @@
 		 ** connection latency, so for most applications you'll want to queue
 		 ** the connection for processing by another thread.
 		 ** 
-		 ** In either case, be sure the application deletes the tcp_socket object 
-		 ** at connect_sock when it's done with it, as you can be sure of memory
-		 ** leaks if you don't. tcp_server_socket_factory never deletes any 
-		 ** tcp_socket object it created.
+		 ** on_accept() should return true to contine processing, or false 
+		 ** to force a shutdown of the listener.
 		 ** 
 		 ** WARNING: While this class attempts to respect the integrity of any code
 		 ** placed in on_accept(), you must be aware that it may be cancelled 
@@ -476,7 +464,7 @@
 		 ** guidelines could result in program deadlocks should on_accept be 
 		 ** cancelled while holding a resource lock.
 		 **/
-		virtual void on_accept() = 0;
+		virtual bool on_accept() = 0;
 		
 	public:
 

=== modified file 'common/sockets/socket_test.cpp'
--- common/sockets/socket_test.cpp	2011-08-25 02:21:28 +0000
+++ common/sockets/socket_test.cpp	2011-09-17 01:38:23 +0000
@@ -26,8 +26,6 @@
 using namespace nrtb;
 using namespace std;
 
-typedef boost::shared_ptr<tcp_socket> sockp;
-
 class myserver: public tcp_server_socket_factory
 {
 public:
@@ -46,29 +44,30 @@
 
 protected:
 	// on_accept() is called on each connection.
-	void on_accept()
+	bool on_accept()
 	{
 		try
 		{
-			sockp c(connect_sock);
 			// just return what we've recieved.
-			string msg = c->getln();
-			c->put(msg);
-			// Close the socket.
-			c->close();
+			string msg = connect_sock->getln();
+			connect_sock->put(msg);
 			// Update our hit count. 
 			hits++;
 		}
 		catch (base_exception & e)
 		{
 		  errors++;
-		  cerr << "Caught " << e.what() << endl;
+		  cerr << "server Caught " << e.what() << endl;
 		}
 		catch (...)
 		{
 		  errors++;
 		  cerr << "Unexpected error in on_accept()" << endl;
 		};
+		if (hits > 99) 
+		  return false;
+		else
+		  return true;
 	};
 };
 
@@ -79,6 +78,7 @@
   sender.connect(address);
   sender.put(sendme);
   returnme = sender.getln();
+  sender.close();//cerr << "tc>> sock closed" << endl;
   return returnme;
 };
 
@@ -106,7 +106,7 @@
 	usleep(5e5);
 	
 	// Send test messages
-	for (int i = 0; i < 1000; i++)
+	for (int i = 0; i < 100; i++)
 	{
 	  stringstream msg;
 	  msg << "test message " << i << "\r";
@@ -119,13 +119,6 @@
 	  cout << returned.substr(0,returned.size()-1) << ": " 
 		<< ((returned == checkme) ? "Passed" : "Failed")
 		<< endl;
-	  usleep(1000);
-	};
-	test_server.stop_listen();
-	if (test_server.listening())
-	{  
-	  er_count++;
-	  cout << "Server failed to stop. " << endl;
 	};
   }
   catch (myserver::bind_failure_exception)
@@ -149,6 +142,26 @@
 	  cout << "A bad_connect_exception was thrown.\n" 
 		<< e.comment() << endl;
   }
+  catch (tcp_socket::not_open_exception & e)
+  {
+	  cout << "A tcp not open exception was caught.\n" 
+		<< e.comment() << endl;
+  }
+  catch (tcp_socket::close_exception & e)
+  {
+	  cout << "A close_exception was caught.\n" 
+		<< e.comment() << endl;
+  }
+  catch (tcp_socket::overrun_exception & e)
+  {
+	  cout << "An overrun_exception was caught.\n" 
+		<< e.comment() << endl;
+  }
+  catch (tcp_socket::buffer_full_exception & e)
+  {
+	  cout << "A buffer_full_exception was caught.\n" 
+		<< e.comment() << endl;
+  }
   catch (tcp_socket::general_exception & e)
   {
 	  cout << "A tcp_socket exception was caught.\n" 
@@ -160,11 +173,11 @@
   };
 
   // final check.
-  if (test_server.hits != 1000)
+  if (test_server.hits != 100)
   {
 	er_count++;
 	cout << "Server does not report the proper number of hits.\n"
-	  << "\tExpected 1000, got " << test_server.hits 
+	  << "\tExpected 100, got " << test_server.hits 
 	  << endl;
   };
   cout << "=========== tcp_socket test complete =============" << endl;

=== modified file 'common/threads/base_thread.cpp'
--- common/threads/base_thread.cpp	2011-08-06 20:45:00 +0000
+++ common/threads/base_thread.cpp	2011-09-17 01:38:23 +0000
@@ -100,7 +100,7 @@
 
 thread::~thread()
 {
-	stop();
+	try {stop(); } catch (...) {};
 };
 
 void thread::run()
@@ -305,10 +305,7 @@
 
 mutex::~mutex()
 {
-	if (pthread_mutex_destroy(&mymid))
-	{
-		throw can_not_destruct_exception();
-	};
+	pthread_mutex_destroy(&mymid);
 };
 
 void mutex::lock()
@@ -356,10 +353,9 @@
 		if (!try_lock() || (waiting > 0))
 		{	
 			// not good, there are others waiting on us or we are locked.
-			can_not_destruct_exception e;
-			e.store(lexical_cast<string>(waiting));
 			unlock();
-			throw e;
+			cerr << "WARNING: there were " << waiting << 
+			  " threads queued in ~cond_variable." << endl; 
 		}
 		else
 		{
@@ -368,9 +364,9 @@
 		// unlock before we leave.
 		unlock();
 	}
-	catch (mutex::general_exception & e)
+	catch (...)
 	{
-		throw e;
+		cerr << "WARNING: there was an error in ~cond_variable." << endl;;
 	};
 };
 

=== removed file 'common/transceiver/transceiver.cpp'
--- common/transceiver/transceiver.cpp	2011-08-15 03:47:28 +0000
+++ common/transceiver/transceiver.cpp	1970-01-01 00:00:00 +0000
@@ -1,140 +0,0 @@
-/***********************************************
- This file is part of the NRTB project (https://*launchpad.net/nrtb).
- 
- NRTB is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- (at your option) any later version.
- 
- NRTB is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- GNU General Public License for more details.
- 
- You should have received a copy of the GNU General Public License
- along with NRTB.  If not, see <http://www.gnu.org/licenses/>.
- 
- **********************************************/
- 
-#include "transceiver.h"
-#include <confreader.h>
-#include <serializer.h>
-#include <string>
-#include <sstream>
-
-using namespace nrtb;
-using namespace std;
-
-serializer tscvr_sequence(0);
-
-template <class out, class in, class outp, class inp>
-transceiver<out,in,outp,inp>::transceiver(tcp_socketp socket)
-{
-  // get the configuration parameters.
-  global_conf_reader & config = global_conf_reader::get_instance();
-  send_time_limit = config.get<unsigned int>("transceiver.send_timeout",2);
-  attempt_recovery = config.get<bool>("transceiver.allow_recovery",true);
-  error_run_limit = 
-	config.get<unsigned int>("transceiver.max_consecutive_errors",10);
-  sent_messages.resize(config.get<int>("transceiver.history_size",50));
-  // set up logging
-  std::stringstream s;
-  s << "transceiver:_" << tscvr_sequence();
-  log = Poco::Logger::get(s.str());
-  // set up the socket.
-  sock(socket);
-  // annouce ourselves...
-  log->trace("Instanciated."); 
-  s.str("");
-  s << "history_size=" << sent_messages.size()
-    << ", send_timeout=" << send_time_limit
-    << ", attempt_recovery=" << attempt_recovery
-    << ", error_run_limit=" << error_run_limit
-    << ", remote_address=" << sock->get_remote_address()
-    << ", local_address=" << sock->get_local_address();
-  log->trace(s.str());
-};
-
-template <class out, class in, class outp, class inp>
-transceiver<out,in,outp,inp>::~transceiver()
-{
-	log->trace("In ~transciever");
-	// shutdown the socket.
-	sock->close();
-	// discard the sent messages list.
-	sent_messages.clear();
-	log->trace("shutdown complete.");
-};
-
-template <class out, class in, class outp, class inp>
-inp transceiver<out,in,outp,inp>::get()
-{
-  inp returnme(new in);
-  string input = sock->getln();
-  returnme->ParseFromString(input);
-  // for the first messsge any number is
-  // accepted.
-  if (last_inbound == 0)
-  {
-	last_inbound = returnme->msg_uid();
-  }
-  else
-  {
-	last_inbound++;
-	int temp = returnme->msg_uid();
-	if (temp != last_inbound)
-	{ 
-	  inbound_seq_error e();
-	  stringstream message;
-	  message << "Expected " << last_inbound
-		<< " received "  << temp;
-	  e.store(message.str());
-	  throw e;
-	};
-  };
-  return returnme;
-};
-
-template <class out, class in, class outp, class inp>
-void transceiver<out,in,outp,inp>::send(outp sendme)
-{
-  sendme->set_msg_uid(out_msg_num());
-  string output;
-  output = sendme->SerializeAsString() + "\r";
-  sock->put(output);
-  sent_messages.push_back(sendme);
-};
-
-template <class out, class in, class outp, class inp>
-void transceiver<out,in,outp,inp>::nak_invalid_context(const unsigned long int msg_number)
-{
-  general_exception e;
-  e.store("transceiver<outp,inp>::nak_invalid_context not implemented.");
-  throw e;
-};
-
-template <class out, class in, class outp, class inp>
-void transceiver<out,in,outp,inp>::nak_validation_error(const unsigned long int msg_number)
-{
-  general_exception e;
-  e.store("transceiver<outp,inp>::nak_validation_error not implemented.");
-  throw e;
-};
-
-template <class out, class in, class outp, class inp>
-void transceiver<out,in,outp,inp>::handle_inbound_nak()
-{
-  general_exception e;
-  e.store("transceiver<outp,inp>::handle_inbound_nak not implemented.");
-  throw e;  
-};
-
-template <class out, class in, class outp, class inp>
-void transceiver<out,in,outp,inp>::handle_outbound_nak()
-{
-  general_exception e;
-  e.store("transceiver<outp,inp>::handle_outbound_nak not implemented.");
-  throw e;    
-};
-
-

=== modified file 'common/transceiver/transceiver.h'
--- common/transceiver/transceiver.h	2011-08-25 04:22:52 +0000
+++ common/transceiver/transceiver.h	2011-09-17 01:38:23 +0000
@@ -70,12 +70,12 @@
 	   * socket. Once created this class assumes it uniquely owns the 
 	   * socket and will close it upon distruction.
 	   * ***********************************************************/
-	  transceiver(tcp_socketp socket);
+	  transceiver(tcp_socket_p socket);
 	  /*************************************************************
 	   * Closes the socket and releases all mmemory associated with
 	   * this class.
 	   * ***********************************************************/
-	  ~transceiver();
+	  virtual ~transceiver();
 	  /**************************************************************
 	   * gets the next message from the socket. If no messages are 
 	   * ready, blocks util one arrives. 
@@ -86,7 +86,7 @@
 	   * sent_messages buffer in case it's needed for error recovery.
 	   * ***********************************************************/
 	  void send(outp sendme);
-	  /**q************************************************************
+	  /**************************************************************
 	   * Called by the data consumer when an inbound message was 
 	   * not valid in the current application context. msg_number
 	   * is the sequence number of the offending message.
@@ -117,7 +117,7 @@
 	  /// the name used for logging
 	  std::string logname;
 	  /// The socket used for communcation.
-	  tcp_socketp sock;
+	  tcp_socket_p sock;
 	  /// serializer used for message numbers
 	  serializer out_msg_num;
 	  /// last received message number 
@@ -136,7 +136,7 @@
 serializer tscvr_sequence(0);
 
 template <class out, class in, class outp, class inp>
-transceiver<out,in,outp,inp>::transceiver(tcp_socketp socket)
+transceiver<out,in,outp,inp>::transceiver(tcp_socket_p socket)
 {
   // get the configuration parameters.
   global_conf_reader & config = global_conf_reader::get_instance();
@@ -154,7 +154,7 @@
   // set up the socket.
   sock = socket;
   // annouce ourselves...
-  log.trace("Instanciated."); 
+  log.information("Instanciated."); 
   s.str("");
   s << "history_size=" << sent_messages.size()
     << ", send_timeout=" << send_time_limit
@@ -162,19 +162,25 @@
     << ", error_run_limit=" << error_run_limit
     << ", remote_address=" << sock->get_remote_address()
     << ", local_address=" << sock->get_local_address();
-  log.trace(s.str());
+  log.information(s.str());
 };
 
 template <class out, class in, class outp, class inp>
 transceiver<out,in,outp,inp>::~transceiver()
 {
-	Poco::Logger & log = Poco::Logger::get(logname);
-	log.trace("In ~transciever");
-	// shutdown the socket.
-	sock->close();
-	// discard the sent messages list.
-	sent_messages.clear();
-	log.trace("shutdown complete.");
+  Poco::Logger & log = Poco::Logger::get(logname);
+  log.information("In ~transciever");
+  // shutdown and release  the socket.
+  try 
+  {
+	if (sock)
+	{
+	  sock.reset(); 
+	};
+  } catch (...) {};
+  // discard the sent messages list.
+  sent_messages.clear();
+  log.information("shutdown complete.");
 };
 
 template <class out, class in, class outp, class inp>

=== modified file 'common/transceiver/transceiver_test.cpp'
--- common/transceiver/transceiver_test.cpp	2011-08-25 04:22:52 +0000
+++ common/transceiver/transceiver_test.cpp	2011-09-17 01:38:23 +0000
@@ -29,18 +29,48 @@
 using namespace std;
 
 typedef nrtb_msg::sim_to_db my_msg;
-typedef boost::shared_ptr<my_msg> my_msgp;
 typedef transceiver<my_msg,my_msg> linkt;
 
-int er_count = 0;
-
-class server_work_thread: public runnable
-{
-public:
-  
-  tcp_socketp sock;
+class safe_counter
+{
+private:
+  mutex data_lock;
+  int er_count;
+
+public:
+  
+  safe_counter() { er_count = 0; };
+
+  ~safe_counter() {};
+
+  void inc()
+  {
+	scope_lock lock(data_lock);
+	er_count++;
+  };
+
+  int operator ()()
+  {
+	scope_lock lock(data_lock);
+	return er_count;
+  };
+};
+
+safe_counter er_count;
+
+class server_work_thread: public thread
+{
+public:
+  
+  tcp_socket_p sock;
   unsigned long long last_inbound;
   
+  ~server_work_thread()
+  {
+	cout << "Destructing server_work_thread" << endl;
+	sock.reset();
+  };
+  
   void run()
   {
 	set_cancel_anytime();
@@ -52,53 +82,64 @@
 		linkt::out_ptr inbound = link.get();
 		last_inbound = inbound->msg_uid();
 		cout << "\tReceived #" << last_inbound << endl;
+		link.send(inbound);
 		if (last_inbound == 99)
+		{
+		  cout << "Receiver thread closing." << endl;
 		  exit(0);
+		};
 	  }
 	  catch (linkt::general_exception & e)
 	  {
 		cerr << "Server work thread caught " << e.what()
 		  << "\n\tComment: " << e.comment() << endl;
-		er_count++;
+		er_count.inc();;
 	  }
 	  catch (tcp_socket::general_exception & e)
 	  {
 		cerr << "Server work thread caught " << e.what()
 		  << "\n\tComment: " << e.comment() << endl;
-		er_count++;
+		er_count.inc();;
 	  }
 	  catch (std::exception & e)
 	  {
 		cerr << "Server work thread caught " << e.what() 
 		  << endl;
-		er_count++;
+		er_count.inc();;
 	  };
 	};
   };
 };
 
-thread process;
-server_work_thread task;
-  
 class listener: public tcp_server_socket_factory
 {
+protected:
+  boost::shared_ptr<server_work_thread> task;
+
 public:
   listener(const string & add, const int & back)
    : tcp_server_socket_factory(add, back) {};
+  ~listener()
+  {
+	cout << "Destructing listener" << endl;
+	task.reset();
+  };
   
-  void on_accept()
+  bool on_accept()
   {
-	if (!process.is_running())
+	if (!task)
 	{
-	  task.last_inbound = 0;
-	  task.sock.reset(connect_sock);
-	  process.start(task);
+	  task.reset(new server_work_thread);
+	  task->last_inbound = 0;
+	  task->sock = connect_sock;
+	  task->start(*(task.get()));
 	  cout << "server thread running." << endl;
+	  // shutdown the listener thead.. our work is done here.
+	  return false;
 	}
 	else
 	{
-	  tcp_socketp s(connect_sock);
-	  s->close();
+	  connect_sock->close();
 	  cerr << "Multiple attempts to connect to server" 
 		<< endl;
 	};
@@ -111,35 +152,54 @@
 int main()
 {
   setup_global_logging("transceiver.log");
-  
-  //set up our port and address
-  boost::mt19937 rng;
-  rng.seed(time(0));
-  boost::uniform_int<> r(0,1000);
-  stringstream s;
-  s << address << port_base + r(rng);
-  address = s.str();
-  cout << "Using " << address << endl;
-
-  // kick off the listener thread.
-  listener server(address,5);
-  server.start_listen();
-  usleep(5e5);
-
-  // set up our sender
-  tcp_socketp sock(new tcp_socket);
-  sock->connect(address);
-  linkt sender(sock);
-
-  // Let's send a few things.
-  for (int i=0; i<100; i++)
-  {
-	my_msgp msg(new my_msg);
-	sender.send(msg);
-	cout << "Sent " << msg->msg_uid() << endl;
-	usleep(1e4);
+
+  try
+  {
+	//set up our port and address
+	boost::mt19937 rng;
+	rng.seed(time(0));
+	boost::uniform_int<> r(0,1000);
+	stringstream s;
+	s << address << port_base + r(rng);
+	address = s.str();
+	cout << "Using " << address << endl;
+
+	// kick off the listener thread.
+	listener server(address,5);
+	server.start_listen();
+	usleep(1e4);
+
+	// set up our sender
+	tcp_socket_p sock(new tcp_socket);
+	sock->connect(address);
+	linkt sender(sock);
+
+	// Let's send a few things.
+	for (int i=0; i<100; i++)
+	{
+	  linkt::out_ptr msg(new my_msg);
+	  sender.send(msg);
+	  cout << "Sent " << msg->msg_uid() << endl;
+	  msg = sender.get();
+	  cout << "Got back " << msg->msg_uid() << endl;
+	};
+	usleep(1e4);
+  }
+  catch (...)
+  {
+	cout << "exception caught during test." << endl;
+	er_count.inc();
   };
-  
-  usleep(5e5);
-  return er_count;
+
+  int faults = er_count(); 
+  if (faults)
+  {
+	cout << "========== ** There were " << faults 
+	  << "errors logged. =========" << endl; 
+  }
+  else
+	cout << "========= nrtb::transceiver test complete.=========" 
+	  << endl;
+
+  return faults;
 };
\ No newline at end of file


Follow ups