nrtb-core team mailing list archive
-
nrtb-core team
-
Mailing list archive
-
Message #00269
[Merge] lp:~fpstovall/nrtb/listener-fix into lp:nrtb
Rick Stovall has proposed merging lp:~fpstovall/nrtb/listener-fix into lp:nrtb.
Requested reviews:
NRTB Core (nrtb-core)
Related bugs:
Bug #834301 in New Real Time Battle: "Legacy file common/transceiver/transceiver.cpp needs to be removed"
https://bugs.launchpad.net/nrtb/+bug/834301
Bug #837090 in New Real Time Battle: "nrtb::tcp_server_socket_factory randomly crashes on shutdown."
https://bugs.launchpad.net/nrtb/+bug/837090
Bug #851880 in New Real Time Battle: "Some NRTB common classes destructors may throw."
https://bugs.launchpad.net/nrtb/+bug/851880
For more details, see:
https://code.launchpad.net/~fpstovall/nrtb/listener-fix/+merge/75840
Fixed problems with the various classes, particularly sockets and threads, which were preventing reliable unit test and build completion.
--
https://code.launchpad.net/~fpstovall/nrtb/listener-fix/+merge/75840
Your team NRTB Core is requested to review the proposed merge of lp:~fpstovall/nrtb/listener-fix into lp:nrtb.
=== modified file 'common/serializer/serializer.h'
--- common/serializer/serializer.h 2011-07-30 15:32:43 +0000
+++ common/serializer/serializer.h 2011-09-17 01:38:23 +0000
@@ -37,7 +37,7 @@
// constructor which sets the starting number.
serializer(unsigned long long start);
// NOP distructor for inheritance safety
- ~serializer();
+ virtual ~serializer();
// functor method, returns the next value in the sequence.
unsigned long long operator ()();
private:
=== modified file 'common/sockets/Makefile'
--- common/sockets/Makefile 2011-08-13 00:40:17 +0000
+++ common/sockets/Makefile 2011-09-17 01:38:23 +0000
@@ -24,7 +24,7 @@
socket_test: base_socket.o socket_test.cpp
@rm -f socket_test
- g++ -c socket_test.cpp -I ../include
+ g++ -c -O3 socket_test.cpp -I ../include
g++ -o socket_test socket_test.o base_socket.o ../obj/hires_timer.o ../obj/common.o ../obj/base_thread.o -lpthread
=== modified file 'common/sockets/base_socket.cpp'
--- common/sockets/base_socket.cpp 2011-08-25 04:18:41 +0000
+++ common/sockets/base_socket.cpp 2011-09-17 01:38:23 +0000
@@ -25,6 +25,9 @@
#include <netdb.h>
#include <boost/lexical_cast.hpp>
+// testing
+#include <iostream>
+
using boost::lexical_cast;
using std::string;
@@ -100,8 +103,8 @@
{
if (close_on_destruct)
{
- shutdown(mysock,SHUT_RDWR);
- ::close(mysock);
+ try {shutdown(mysock,SHUT_RDWR); } catch (...) {};
+ try {::close(mysock); } catch (...) {};
_status = sock_undef;
};
};
@@ -569,32 +572,18 @@
tcp_server_socket_factory::tcp_server_socket_factory(
- const string & address, const unsigned short int & backlog)
+ const string & address, const unsigned short int & backlog = 5)
{
// does not attempt to set up the connection initially.
_address = address;
_backlog = backlog;
_last_thread_fault = 0;
- thread_return = 0;
- in_on_accept = false;
- connect_sock = 0;
};
tcp_server_socket_factory::~tcp_server_socket_factory()
{
- // are we current handling requests?
- if (is_running())
- {
- // yes, this is -rude- shutdown of the listening thread.
- // Get over it.
- stop();
- try
- {
- if (listen_sock)
- close(listen_sock);
- }
- catch (...) {};
- };
+ // make sure we've stopped doing anything.
+ try { stop_listen(); } catch (...) {};
};
void tcp_server_socket_factory::start_listen()
@@ -602,74 +591,46 @@
// take no action if the listen thread is already running.
if (!is_running())
{
- // load up the control flags
- in_on_accept = false;
- okay_to_continue = true;
- thread_return = 0;
// start it up!
- detach();
start();
+ }
+ else
+ {
+ throw already_running_exception();
};
};
void tcp_server_socket_factory::stop_listen()
{
- // take action only if the listen thread is running.
- if (is_running())
- {
- bool okay_to_kill = false;
- // set the flag that indicates the thread should not process any more.
- {
- scope_lock lock(thread_data);
- bool okay_to_kill = !in_on_accept;
- }
- // close the listener socket.
- close(listen_sock);
- // is is okay to cancel the thread?
- if (!okay_to_kill)
- {
- // we'll allow up to 30 seconds for on_accept() to finish.
- time_t endtime = time(NULL);
- endtime += 30;
- bool done = false;
- while (!done)
- {
- // wait a second...
- sleep(1);
- // check boundaries.
- // -- are we out of on_accept()?
- {
- scope_lock lock(thread_data);
- done = !in_on_accept;
- }
- // -- are we out of time?
- okay_to_kill = done;
- if (time(NULL) > endtime) { done = true; };
- };
- };
- // kill the listener thread.
- if (!okay_to_kill) { throw on_accept_bound_exception(); };
- stop();
- };
+ // take action only if the listen thread is running.
+ if (listening())
+ {
+ // stop the listener thread
+ if (is_running()) stop();
+ // wait here until the thread stops.
+ if (is_running()) join();
+// try
+// {
+// if (listen_sock) close(listen_sock);
+// }
+// catch (...) {};
+ };
};
bool tcp_server_socket_factory::listening()
{
bool running = is_running();
- if (!running)
+/* if (!running)
{
// check to be sure the thread did not die due to an error.
- if (thread_return != 0)
+ if (_last_thread_fault != 0)
{
- // Clear the error state before throwing an exception.
- _last_thread_fault = thread_return;
- thread_return = 0;
- // if thread_return was non-zero, it is assumed the thread died an
- // evil and useless death. Scream in anger!
- throw listen_terminated_exception();
+ // if thread_return was non-zero, it is assumed the thread died an
+ // evil and useless death. Scream in anger!
+ throw listen_terminated_exception();
};
};
- return running;
+*/ return running;
};
unsigned short int tcp_server_socket_factory::backlog()
@@ -677,149 +638,114 @@
return _backlog;
};
+//socket closer to use with exit trap.
+void closeme(void * sock)
+{
+ std::cerr << "in thread cleanup sock closer" << std::endl;
+ int & socket = *(static_cast<int*>(sock));
+ ::close(socket);
+ std::cerr << "socker closer done." << std::endl;
+};
+
void tcp_server_socket_factory::run()
{
- /* Put this entire thing in a try block to protect the application.
- * Without this, an untrapped exception thrown here or in the user supplied
- * on_accept() method would abort the entire application instead of just this
- * thread.
- */
+ /* Put this entire thing in a try block to protect the application.
+ * Without this, an untrapped exception thrown here or in the
+ * user supplied on_accept() method would abort the entire
+ * application instead of just this
+ * thread.
+ */
+ int listen_sock;
+ // make sure the listener is closed when we exit.
+ pthread_cleanup_push(closeme, (void*) &listen_sock);
+ try
+ {
+ bool go = true;
+ // set up our listening socket.
+ listen_sock = socket(AF_INET,SOCK_STREAM,0);
+ sockaddr_in myaddr;
try
{
- bool go = true;
- // set up our listening socket.
- listen_sock = socket(AF_INET,SOCK_STREAM,0);
- sockaddr_in myaddr;
- try
- {
- myaddr = tcp_socket::str_to_sockaddr(_address);
- }
- catch (...)
- {
- // probably a tcp_socket::bad_address_exception,
- // but any reason will do.
- go = false;
- };
- if (bind(listen_sock,(sockaddr *) &myaddr,sizeof(myaddr)))
- {
- // bind did not work.
- go = false;
- };
- if (listen(listen_sock,_backlog))
- {
- // listen failed in some way.. I don't care which.
- go = false;
- };
- // processing loop
- while (go)
- {
- // are we okay to proceed?
- {
- scope_lock lock(thread_data);
- in_on_accept = false;
- go = okay_to_continue;
- }
- if (!go)
- {
- close(listen_sock);
- exit(0);
- };
- // accept a new connection
- bool good_connect = true;
- int new_conn = accept(listen_sock,NULL,NULL);
- // validate the accept return value.
- if (new_conn == -1)
- {
- // accept returned an error.
- switch (errno)
- {
- case ENETDOWN :
- case EPROTO :
- case ENOPROTOOPT :
- case EHOSTDOWN :
- case ENONET :
- case EHOSTUNREACH :
- case EOPNOTSUPP :
- case ENETUNREACH :
- case EAGAIN :
-// case EWOULDBLOCK :
- case EPERM :
- case ECONNABORTED :
- {
- good_connect = false;
- break;
- };
- default :
- {
- // for any other error, we're going to shutdown the
- // this listener thread.
- {
- scope_lock lock(thread_data);
- // If the error was caused by an attempt to close the
- // socket and shutdown the thread, don't store an
- // error flag.
- if (okay_to_continue)
- {
- thread_return = errno;
- }
- else
- {
- thread_return = 0;
- };
- };
- exit(errno);
- break;
- };
- }; // switch (errno)
- }; // error thrown by accept.
- // create connect_sock
- connect_sock = new tcp_socket(new_conn);
- // are we okay to proceed?
- {
- scope_lock lock(thread_data);
- go = okay_to_continue;
- // if we are go, then we'll be going to on_accept next.
- // Therefore, this equality makes sense if you hold your
- // head just so. :-P
- in_on_accept = go;
- }
- if (!go)
- {
- delete connect_sock;
- close(listen_sock);
- exit(0);
- };
- // only call on_accept() if we have a good connection.
- if (good_connect)
- {
- // make the thread easily cancelable.
- set_cancel_anytime();
- // call on_accept
- on_accept();
- // set back to cancel_deferred.
- set_deferred_cancel();
- };
- }; // while go;
- // if we get here then things are not going well...
- {
- scope_lock lock(thread_data);
- thread_return = -2;
- }
- close(listen_sock);
- exit(-2);
+ myaddr = tcp_socket::str_to_sockaddr(_address);
}
catch (...)
{
- /* an untrapped exception was thrown by someone in this thread.
- * We'll shutdown this thread and put -1 in the thread_return field
- * to let the world know that we don't know what killed us.
- */
- thread_data.lock();
- thread_return = -1;
- thread_data.unlock();
- close(listen_sock);
- exit(-1);
- };
+ // probably a tcp_socket::bad_address_exception,
+ // but any reason will do.
+ go = false;
+ };
+ if (bind(listen_sock,(sockaddr *) &myaddr,sizeof(myaddr)))
+ {
+ // bind did not work.
+ go = false;
+ };
+ if (listen(listen_sock,_backlog))
+ {
+ // listen failed in some way.. I don't care which.
+ go = false;
+ };
+ // processing loop
+ while (go)
+ {
+ // accept a new connection
+ bool good_connect = true;
+ int new_conn = accept(listen_sock,NULL,NULL);
+ // validate the accept return value.
+ if (new_conn == -1)
+ {
+ // accept returned an error.
+ switch (errno)
+ {
+// case ENETDOWN :
+ case EPROTO :
+// case ENOPROTOOPT :
+ case EHOSTDOWN :
+// case ENONET :
+ case EHOSTUNREACH :
+// case EOPNOTSUPP :
+// case ENETUNREACH :
+ case EAGAIN :
+// case EPERM :
+ case ECONNABORTED :
+ {
+ good_connect = false;
+ break;
+ };
+ default :
+ {
+ // for any other error, we're going to shutdown the
+ // this listener thread.
+ go = false;
+ good_connect = false;
+ _last_thread_fault = errno;
+ break;
+ };
+ }; // switch (errno)
+ }; // error thrown by accept.
+ if (good_connect)
+ {
+ // create connect_sock
+ connect_sock.reset(new tcp_socket(new_conn));
+ // make the thread easily cancelable.
+ set_cancel_anytime();
+ // call on_accept
+ go = on_accept();
+ // set back to cancel_deferred.
+ set_deferred_cancel();
+ // release our claim to the new socket
+ connect_sock.reset();
+ };
+ }; // while go;
+ }
+ catch (...)
+ {
+ /* an untrapped exception was thrown by someone in this thread.
+ * We'll shutdown this thread and put -1 in the thread_return field
+ * to let the world know that we don't know what killed us.
+ */
+ _last_thread_fault = -1;
+ };
+ pthread_cleanup_pop(0);
};
} // namespace nrtb
=== modified file 'common/sockets/base_socket.h'
--- common/sockets/base_socket.h 2011-08-15 01:58:41 +0000
+++ common/sockets/base_socket.h 2011-09-17 01:38:23 +0000
@@ -373,7 +373,7 @@
};
/// smart pointer for use with tcp_sockets
-typedef boost::shared_ptr<nrtb::tcp_socket> tcp_socketp;
+typedef boost::shared_ptr<nrtb::tcp_socket> tcp_socket_p;
/** Abstract "listener" TCP/IP socket for servers.
**
@@ -405,12 +405,11 @@
** the stop_listen() method, which will return when as soon any current calls
** to on_accept() complete.
**
- ** --WARNING--- This class is a tcp_socket factory in that a new tcp_socket is
- ** created for every connection. It is the responsibility of the host
- ** application to delete these when they are no longer needed, either in the
- ** overridden on_accept() method or (more likely) later in the processing flow
- ** when the transaction is complete.
- **
+ ** --NOTE--- This class is a tcp_socket_p factory in that a new one
+ ** is created for each request received. As the tcp_socket_p is a
+ ** smart pointer, the allocated socket will be closed and deallocated
+ ** when the last reference to the socket goes out of scope automatically.
+ **
** Descendent classes must override on_accept() to provide the necessary
** connection handling. See the documentation on on_accept for more details.
**/
@@ -419,13 +418,7 @@
private:
- int listen_sock;
- bool in_on_accept;
- bool okay_to_continue;
- int thread_return;
int _last_thread_fault;
- mutex thread_data;
-
// Provides the listener thread.
void run();
@@ -439,15 +432,12 @@
** only safe to use/manipulate after entry of the method on_accept();
** at any other time this pointer may be altered without notice.
**
- ** At entry to on_accept() this will point to a valid base_sock
- ** object. It is up to the application to delete this object when
- ** it is done with it; server_sock will not take any actions with or
- ** on the object pointed to by listen_sock once on_accept() is called.
- ** However, once on_accept() has returned, server_sock will create
- ** another new base_sock and store it's address here on the next
- ** connection.
+ ** At entry to on_accept() this will point to a valid tcp_socket_p
+ ** smart pointer. As with all smart pointers, the object it
+ ** points to will be deleted automatically when the last
+ ** reference to it goes out of scope.
**/
- tcp_socket * connect_sock;
+ tcp_socket_p connect_sock;
/** Abstract method to process connections. An on_accept() call is
** made on every connection accepted immediately after constructing a
@@ -460,10 +450,8 @@
** connection latency, so for most applications you'll want to queue
** the connection for processing by another thread.
**
- ** In either case, be sure the application deletes the tcp_socket object
- ** at connect_sock when it's done with it, as you can be sure of memory
- ** leaks if you don't. tcp_server_socket_factory never deletes any
- ** tcp_socket object it created.
+ ** on_accept() should return true to contine processing, or false
+ ** to force a shutdown of the listener.
**
** WARNING: While this class attempts to respect the integrity of any code
** placed in on_accept(), you must be aware that it may be cancelled
@@ -476,7 +464,7 @@
** guidelines could result in program deadlocks should on_accept be
** cancelled while holding a resource lock.
**/
- virtual void on_accept() = 0;
+ virtual bool on_accept() = 0;
public:
=== modified file 'common/sockets/socket_test.cpp'
--- common/sockets/socket_test.cpp 2011-08-25 02:21:28 +0000
+++ common/sockets/socket_test.cpp 2011-09-17 01:38:23 +0000
@@ -26,8 +26,6 @@
using namespace nrtb;
using namespace std;
-typedef boost::shared_ptr<tcp_socket> sockp;
-
class myserver: public tcp_server_socket_factory
{
public:
@@ -46,29 +44,30 @@
protected:
// on_accept() is called on each connection.
- void on_accept()
+ bool on_accept()
{
try
{
- sockp c(connect_sock);
// just return what we've recieved.
- string msg = c->getln();
- c->put(msg);
- // Close the socket.
- c->close();
+ string msg = connect_sock->getln();
+ connect_sock->put(msg);
// Update our hit count.
hits++;
}
catch (base_exception & e)
{
errors++;
- cerr << "Caught " << e.what() << endl;
+ cerr << "server Caught " << e.what() << endl;
}
catch (...)
{
errors++;
cerr << "Unexpected error in on_accept()" << endl;
};
+ if (hits > 99)
+ return false;
+ else
+ return true;
};
};
@@ -79,6 +78,7 @@
sender.connect(address);
sender.put(sendme);
returnme = sender.getln();
+ sender.close();//cerr << "tc>> sock closed" << endl;
return returnme;
};
@@ -106,7 +106,7 @@
usleep(5e5);
// Send test messages
- for (int i = 0; i < 1000; i++)
+ for (int i = 0; i < 100; i++)
{
stringstream msg;
msg << "test message " << i << "\r";
@@ -119,13 +119,6 @@
cout << returned.substr(0,returned.size()-1) << ": "
<< ((returned == checkme) ? "Passed" : "Failed")
<< endl;
- usleep(1000);
- };
- test_server.stop_listen();
- if (test_server.listening())
- {
- er_count++;
- cout << "Server failed to stop. " << endl;
};
}
catch (myserver::bind_failure_exception)
@@ -149,6 +142,26 @@
cout << "A bad_connect_exception was thrown.\n"
<< e.comment() << endl;
}
+ catch (tcp_socket::not_open_exception & e)
+ {
+ cout << "A tcp not open exception was caught.\n"
+ << e.comment() << endl;
+ }
+ catch (tcp_socket::close_exception & e)
+ {
+ cout << "A close_exception was caught.\n"
+ << e.comment() << endl;
+ }
+ catch (tcp_socket::overrun_exception & e)
+ {
+ cout << "An overrun_exception was caught.\n"
+ << e.comment() << endl;
+ }
+ catch (tcp_socket::buffer_full_exception & e)
+ {
+ cout << "A buffer_full_exception was caught.\n"
+ << e.comment() << endl;
+ }
catch (tcp_socket::general_exception & e)
{
cout << "A tcp_socket exception was caught.\n"
@@ -160,11 +173,11 @@
};
// final check.
- if (test_server.hits != 1000)
+ if (test_server.hits != 100)
{
er_count++;
cout << "Server does not report the proper number of hits.\n"
- << "\tExpected 1000, got " << test_server.hits
+ << "\tExpected 100, got " << test_server.hits
<< endl;
};
cout << "=========== tcp_socket test complete =============" << endl;
=== modified file 'common/threads/base_thread.cpp'
--- common/threads/base_thread.cpp 2011-08-06 20:45:00 +0000
+++ common/threads/base_thread.cpp 2011-09-17 01:38:23 +0000
@@ -100,7 +100,7 @@
thread::~thread()
{
- stop();
+ try {stop(); } catch (...) {};
};
void thread::run()
@@ -305,10 +305,7 @@
mutex::~mutex()
{
- if (pthread_mutex_destroy(&mymid))
- {
- throw can_not_destruct_exception();
- };
+ pthread_mutex_destroy(&mymid);
};
void mutex::lock()
@@ -356,10 +353,9 @@
if (!try_lock() || (waiting > 0))
{
// not good, there are others waiting on us or we are locked.
- can_not_destruct_exception e;
- e.store(lexical_cast<string>(waiting));
unlock();
- throw e;
+ cerr << "WARNING: there were " << waiting <<
+ " threads queued in ~cond_variable." << endl;
}
else
{
@@ -368,9 +364,9 @@
// unlock before we leave.
unlock();
}
- catch (mutex::general_exception & e)
+ catch (...)
{
- throw e;
+ cerr << "WARNING: there was an error in ~cond_variable." << endl;;
};
};
=== removed file 'common/transceiver/transceiver.cpp'
--- common/transceiver/transceiver.cpp 2011-08-15 03:47:28 +0000
+++ common/transceiver/transceiver.cpp 1970-01-01 00:00:00 +0000
@@ -1,140 +0,0 @@
-/***********************************************
- This file is part of the NRTB project (https://*launchpad.net/nrtb).
-
- NRTB is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- (at your option) any later version.
-
- NRTB is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with NRTB. If not, see <http://www.gnu.org/licenses/>.
-
- **********************************************/
-
-#include "transceiver.h"
-#include <confreader.h>
-#include <serializer.h>
-#include <string>
-#include <sstream>
-
-using namespace nrtb;
-using namespace std;
-
-serializer tscvr_sequence(0);
-
-template <class out, class in, class outp, class inp>
-transceiver<out,in,outp,inp>::transceiver(tcp_socketp socket)
-{
- // get the configuration parameters.
- global_conf_reader & config = global_conf_reader::get_instance();
- send_time_limit = config.get<unsigned int>("transceiver.send_timeout",2);
- attempt_recovery = config.get<bool>("transceiver.allow_recovery",true);
- error_run_limit =
- config.get<unsigned int>("transceiver.max_consecutive_errors",10);
- sent_messages.resize(config.get<int>("transceiver.history_size",50));
- // set up logging
- std::stringstream s;
- s << "transceiver:_" << tscvr_sequence();
- log = Poco::Logger::get(s.str());
- // set up the socket.
- sock(socket);
- // annouce ourselves...
- log->trace("Instanciated.");
- s.str("");
- s << "history_size=" << sent_messages.size()
- << ", send_timeout=" << send_time_limit
- << ", attempt_recovery=" << attempt_recovery
- << ", error_run_limit=" << error_run_limit
- << ", remote_address=" << sock->get_remote_address()
- << ", local_address=" << sock->get_local_address();
- log->trace(s.str());
-};
-
-template <class out, class in, class outp, class inp>
-transceiver<out,in,outp,inp>::~transceiver()
-{
- log->trace("In ~transciever");
- // shutdown the socket.
- sock->close();
- // discard the sent messages list.
- sent_messages.clear();
- log->trace("shutdown complete.");
-};
-
-template <class out, class in, class outp, class inp>
-inp transceiver<out,in,outp,inp>::get()
-{
- inp returnme(new in);
- string input = sock->getln();
- returnme->ParseFromString(input);
- // for the first messsge any number is
- // accepted.
- if (last_inbound == 0)
- {
- last_inbound = returnme->msg_uid();
- }
- else
- {
- last_inbound++;
- int temp = returnme->msg_uid();
- if (temp != last_inbound)
- {
- inbound_seq_error e();
- stringstream message;
- message << "Expected " << last_inbound
- << " received " << temp;
- e.store(message.str());
- throw e;
- };
- };
- return returnme;
-};
-
-template <class out, class in, class outp, class inp>
-void transceiver<out,in,outp,inp>::send(outp sendme)
-{
- sendme->set_msg_uid(out_msg_num());
- string output;
- output = sendme->SerializeAsString() + "\r";
- sock->put(output);
- sent_messages.push_back(sendme);
-};
-
-template <class out, class in, class outp, class inp>
-void transceiver<out,in,outp,inp>::nak_invalid_context(const unsigned long int msg_number)
-{
- general_exception e;
- e.store("transceiver<outp,inp>::nak_invalid_context not implemented.");
- throw e;
-};
-
-template <class out, class in, class outp, class inp>
-void transceiver<out,in,outp,inp>::nak_validation_error(const unsigned long int msg_number)
-{
- general_exception e;
- e.store("transceiver<outp,inp>::nak_validation_error not implemented.");
- throw e;
-};
-
-template <class out, class in, class outp, class inp>
-void transceiver<out,in,outp,inp>::handle_inbound_nak()
-{
- general_exception e;
- e.store("transceiver<outp,inp>::handle_inbound_nak not implemented.");
- throw e;
-};
-
-template <class out, class in, class outp, class inp>
-void transceiver<out,in,outp,inp>::handle_outbound_nak()
-{
- general_exception e;
- e.store("transceiver<outp,inp>::handle_outbound_nak not implemented.");
- throw e;
-};
-
-
=== modified file 'common/transceiver/transceiver.h'
--- common/transceiver/transceiver.h 2011-08-25 04:22:52 +0000
+++ common/transceiver/transceiver.h 2011-09-17 01:38:23 +0000
@@ -70,12 +70,12 @@
* socket. Once created this class assumes it uniquely owns the
* socket and will close it upon distruction.
* ***********************************************************/
- transceiver(tcp_socketp socket);
+ transceiver(tcp_socket_p socket);
/*************************************************************
* Closes the socket and releases all mmemory associated with
* this class.
* ***********************************************************/
- ~transceiver();
+ virtual ~transceiver();
/**************************************************************
* gets the next message from the socket. If no messages are
* ready, blocks util one arrives.
@@ -86,7 +86,7 @@
* sent_messages buffer in case it's needed for error recovery.
* ***********************************************************/
void send(outp sendme);
- /**q************************************************************
+ /**************************************************************
* Called by the data consumer when an inbound message was
* not valid in the current application context. msg_number
* is the sequence number of the offending message.
@@ -117,7 +117,7 @@
/// the name used for logging
std::string logname;
/// The socket used for communcation.
- tcp_socketp sock;
+ tcp_socket_p sock;
/// serializer used for message numbers
serializer out_msg_num;
/// last received message number
@@ -136,7 +136,7 @@
serializer tscvr_sequence(0);
template <class out, class in, class outp, class inp>
-transceiver<out,in,outp,inp>::transceiver(tcp_socketp socket)
+transceiver<out,in,outp,inp>::transceiver(tcp_socket_p socket)
{
// get the configuration parameters.
global_conf_reader & config = global_conf_reader::get_instance();
@@ -154,7 +154,7 @@
// set up the socket.
sock = socket;
// annouce ourselves...
- log.trace("Instanciated.");
+ log.information("Instanciated.");
s.str("");
s << "history_size=" << sent_messages.size()
<< ", send_timeout=" << send_time_limit
@@ -162,19 +162,25 @@
<< ", error_run_limit=" << error_run_limit
<< ", remote_address=" << sock->get_remote_address()
<< ", local_address=" << sock->get_local_address();
- log.trace(s.str());
+ log.information(s.str());
};
template <class out, class in, class outp, class inp>
transceiver<out,in,outp,inp>::~transceiver()
{
- Poco::Logger & log = Poco::Logger::get(logname);
- log.trace("In ~transciever");
- // shutdown the socket.
- sock->close();
- // discard the sent messages list.
- sent_messages.clear();
- log.trace("shutdown complete.");
+ Poco::Logger & log = Poco::Logger::get(logname);
+ log.information("In ~transciever");
+ // shutdown and release the socket.
+ try
+ {
+ if (sock)
+ {
+ sock.reset();
+ };
+ } catch (...) {};
+ // discard the sent messages list.
+ sent_messages.clear();
+ log.information("shutdown complete.");
};
template <class out, class in, class outp, class inp>
=== modified file 'common/transceiver/transceiver_test.cpp'
--- common/transceiver/transceiver_test.cpp 2011-08-25 04:22:52 +0000
+++ common/transceiver/transceiver_test.cpp 2011-09-17 01:38:23 +0000
@@ -29,18 +29,48 @@
using namespace std;
typedef nrtb_msg::sim_to_db my_msg;
-typedef boost::shared_ptr<my_msg> my_msgp;
typedef transceiver<my_msg,my_msg> linkt;
-int er_count = 0;
-
-class server_work_thread: public runnable
-{
-public:
-
- tcp_socketp sock;
+class safe_counter
+{
+private:
+ mutex data_lock;
+ int er_count;
+
+public:
+
+ safe_counter() { er_count = 0; };
+
+ ~safe_counter() {};
+
+ void inc()
+ {
+ scope_lock lock(data_lock);
+ er_count++;
+ };
+
+ int operator ()()
+ {
+ scope_lock lock(data_lock);
+ return er_count;
+ };
+};
+
+safe_counter er_count;
+
+class server_work_thread: public thread
+{
+public:
+
+ tcp_socket_p sock;
unsigned long long last_inbound;
+ ~server_work_thread()
+ {
+ cout << "Destructing server_work_thread" << endl;
+ sock.reset();
+ };
+
void run()
{
set_cancel_anytime();
@@ -52,53 +82,64 @@
linkt::out_ptr inbound = link.get();
last_inbound = inbound->msg_uid();
cout << "\tReceived #" << last_inbound << endl;
+ link.send(inbound);
if (last_inbound == 99)
+ {
+ cout << "Receiver thread closing." << endl;
exit(0);
+ };
}
catch (linkt::general_exception & e)
{
cerr << "Server work thread caught " << e.what()
<< "\n\tComment: " << e.comment() << endl;
- er_count++;
+ er_count.inc();;
}
catch (tcp_socket::general_exception & e)
{
cerr << "Server work thread caught " << e.what()
<< "\n\tComment: " << e.comment() << endl;
- er_count++;
+ er_count.inc();;
}
catch (std::exception & e)
{
cerr << "Server work thread caught " << e.what()
<< endl;
- er_count++;
+ er_count.inc();;
};
};
};
};
-thread process;
-server_work_thread task;
-
class listener: public tcp_server_socket_factory
{
+protected:
+ boost::shared_ptr<server_work_thread> task;
+
public:
listener(const string & add, const int & back)
: tcp_server_socket_factory(add, back) {};
+ ~listener()
+ {
+ cout << "Destructing listener" << endl;
+ task.reset();
+ };
- void on_accept()
+ bool on_accept()
{
- if (!process.is_running())
+ if (!task)
{
- task.last_inbound = 0;
- task.sock.reset(connect_sock);
- process.start(task);
+ task.reset(new server_work_thread);
+ task->last_inbound = 0;
+ task->sock = connect_sock;
+ task->start(*(task.get()));
cout << "server thread running." << endl;
+ // shutdown the listener thead.. our work is done here.
+ return false;
}
else
{
- tcp_socketp s(connect_sock);
- s->close();
+ connect_sock->close();
cerr << "Multiple attempts to connect to server"
<< endl;
};
@@ -111,35 +152,54 @@
int main()
{
setup_global_logging("transceiver.log");
-
- //set up our port and address
- boost::mt19937 rng;
- rng.seed(time(0));
- boost::uniform_int<> r(0,1000);
- stringstream s;
- s << address << port_base + r(rng);
- address = s.str();
- cout << "Using " << address << endl;
-
- // kick off the listener thread.
- listener server(address,5);
- server.start_listen();
- usleep(5e5);
-
- // set up our sender
- tcp_socketp sock(new tcp_socket);
- sock->connect(address);
- linkt sender(sock);
-
- // Let's send a few things.
- for (int i=0; i<100; i++)
- {
- my_msgp msg(new my_msg);
- sender.send(msg);
- cout << "Sent " << msg->msg_uid() << endl;
- usleep(1e4);
+
+ try
+ {
+ //set up our port and address
+ boost::mt19937 rng;
+ rng.seed(time(0));
+ boost::uniform_int<> r(0,1000);
+ stringstream s;
+ s << address << port_base + r(rng);
+ address = s.str();
+ cout << "Using " << address << endl;
+
+ // kick off the listener thread.
+ listener server(address,5);
+ server.start_listen();
+ usleep(1e4);
+
+ // set up our sender
+ tcp_socket_p sock(new tcp_socket);
+ sock->connect(address);
+ linkt sender(sock);
+
+ // Let's send a few things.
+ for (int i=0; i<100; i++)
+ {
+ linkt::out_ptr msg(new my_msg);
+ sender.send(msg);
+ cout << "Sent " << msg->msg_uid() << endl;
+ msg = sender.get();
+ cout << "Got back " << msg->msg_uid() << endl;
+ };
+ usleep(1e4);
+ }
+ catch (...)
+ {
+ cout << "exception caught during test." << endl;
+ er_count.inc();
};
-
- usleep(5e5);
- return er_count;
+
+ int faults = er_count();
+ if (faults)
+ {
+ cout << "========== ** There were " << faults
+ << "errors logged. =========" << endl;
+ }
+ else
+ cout << "========= nrtb::transceiver test complete.========="
+ << endl;
+
+ return faults;
};
\ No newline at end of file
Follow ups