nrtb-core team mailing list archive
-
nrtb-core team
-
Mailing list archive
-
Message #00205
[Branch ~fpstovall/nrtb/cpp_common] Rev 49: Implmented the transceiver class and some cleanups and bugfixs to the sockets class.
Merge authors:
Rick Stovall (fpstovall)
------------------------------------------------------------
revno: 49 [merge]
committer: fpstovall@xxxxxxxxx
branch nick: dev
timestamp: Thu 2011-08-25 00:32:29 -0400
message:
Implmented the transceiver class and some cleanups and bugfixs to the sockets class.
I believe this is now ready for alpha service.
modified:
common/Makefile
common/sockets/base_socket.cpp
common/sockets/base_socket.h
common/sockets/socket_test.cpp
common/transceiver/Makefile
common/transceiver/transceiver.cpp
common/transceiver/transceiver.h
common/transceiver/transceiver_test.cpp
--
lp:~fpstovall/nrtb/cpp_common
https://code.launchpad.net/~fpstovall/nrtb/cpp_common
Your team NRTB Core is subscribed to branch lp:~fpstovall/nrtb/cpp_common.
To unsubscribe from this branch go to https://code.launchpad.net/~fpstovall/nrtb/cpp_common/+edit-subscription
=== modified file 'common/Makefile'
--- common/Makefile 2011-08-13 14:23:21 +0000
+++ common/Makefile 2011-08-15 03:47:28 +0000
@@ -46,3 +46,4 @@
@cd logger; make ${action}
@cd confreader; make ${action}
@cd GPB; make ${action}
+ @cd transceiver; make ${action}
=== modified file 'common/sockets/base_socket.cpp'
--- common/sockets/base_socket.cpp 2011-08-13 01:04:57 +0000
+++ common/sockets/base_socket.cpp 2011-08-25 04:18:41 +0000
@@ -65,8 +65,11 @@
if (returnme > 0)
{
// yes.. store for the caller.
- inbuff[returnme] = 0;
- data.assign((const char *) &(inbuff[0]));
+ data.resize(returnme,0);
+ for (int i=0; i<returnme; i++)
+ {
+ data[i] = inbuff[i];
+ };
}
// ... or was there an error?
else if (errno)
@@ -584,11 +587,13 @@
{
// yes, this is -rude- shutdown of the listening thread.
// Get over it.
- thread_data.lock();
- okay_to_continue = false;
- thread_data.unlock();
- close(listen_sock);
stop();
+ try
+ {
+ if (listen_sock)
+ close(listen_sock);
+ }
+ catch (...) {};
};
};
=== modified file 'common/sockets/base_socket.h'
--- common/sockets/base_socket.h 2011-08-09 00:40:44 +0000
+++ common/sockets/base_socket.h 2011-08-15 01:58:41 +0000
@@ -20,6 +20,7 @@
#define base_socket_header
#include <base_thread.h>
+#include <boost/shared_ptr.hpp>
#include <sys/socket.h>
#include <netinet/in.h>
@@ -371,6 +372,9 @@
std::string get_remote_address();
};
+/// smart pointer for use with tcp_sockets
+typedef boost::shared_ptr<nrtb::tcp_socket> tcp_socketp;
+
/** Abstract "listener" TCP/IP socket for servers.
**
** Simplifies the use of TCP/IP sockets for applications providing services.
=== modified file 'common/sockets/socket_test.cpp'
--- common/sockets/socket_test.cpp 2011-08-13 00:11:42 +0000
+++ common/sockets/socket_test.cpp 2011-08-25 02:21:28 +0000
@@ -19,6 +19,7 @@
#include <iostream>
#include <sstream>
#include <string>
+#include <boost/random.hpp>
#include "base_socket.h"
#include <boost/shared_ptr.hpp>
@@ -81,11 +82,21 @@
return returnme;
};
-const string address = "127.0.0.1:17000";
+string address = "127.0.0.1:";
+int port_base = 17000;
int main()
{
int er_count = 0;
+ //set up our port and address
+ boost::mt19937 rng;
+ rng.seed(time(0));
+ boost::uniform_int<> r(0,1000);
+ stringstream s;
+ s << address << port_base + r(rng);
+ address = s.str();
+ cout << "Using " << address << endl;
+
myserver test_server(address,5);
try
@@ -105,7 +116,7 @@
{
er_count++;
};
- cout << returned << ": "
+ cout << returned.substr(0,returned.size()-1) << ": "
<< ((returned == checkme) ? "Passed" : "Failed")
<< endl;
usleep(1000);
=== modified file 'common/transceiver/Makefile'
--- common/transceiver/Makefile 2010-12-30 03:05:40 +0000
+++ common/transceiver/Makefile 2011-08-25 04:22:52 +0000
@@ -17,22 +17,15 @@
#***********************************************
lib: transceiver_test
+ @./transceiver_test
@cp -v transceiver.h ../include
- @cp -v transceiver.o ../obj
@echo build complete
-../include/confreader.h:
- @cp ../confreader; make lib
-
-transceiver.o: transceiver.h transceiver.cpp Makefile ../include/confreader.h
- @rm -f transceiver.o
- g++ -c transceiver.cpp -I ../include
-
-transceiver_test: transceiver.o transceiver_test.cpp
+transceiver_test: transceiver.h transceiver_test.cpp
@rm -f transceiver_test
- g++ -c transceiver_test.cpp
- g++ -o transceiver_test transceiver_test.o transceiver.o ../obj/common.o ../obj/log_setup.o -lPocoFoundation -lPocoUtil
+ g++ -c transceiver_test.cpp -I../include
+ g++ -o transceiver_test transceiver_test.o ../obj/common.o ../obj/log_setup.o ../obj/serializer.o ../obj/base_thread.o ../obj/base_socket.o ../obj/confreader.o -lpthread -lprotobuf ../lib/nrtb_gpb.a -lPocoFoundation
clean:
- @rm -rvf *.o transceiver_test ../include/transceiver.h ../obj/transceiver.o
+ @rm -rvf *.o transceiver_test ../include/transceiver.h *.log ../obj/transceiver.o
@echo all objects and executables have been erased.
=== modified file 'common/transceiver/transceiver.cpp'
--- common/transceiver/transceiver.cpp 2010-12-31 14:48:07 +0000
+++ common/transceiver/transceiver.cpp 2011-08-15 03:47:28 +0000
@@ -17,89 +17,124 @@
**********************************************/
#include "transceiver.h"
-#include "../include/confreader.h"
-#include <Poco/Mutex.h>
-#include <Poco/ScopedLock.h>
-#include <stringstream>
-
-class serializer_type
-{
-public:
- unsigned int operator()
- {
- Poco::ScopedLock<Poco::Mutex>(lock);
- return ++counter;
- };
-private:
- Poco::Mutex lock;
- unsigned int counter;
- serializer_type()
- {
- counter = 0;
- };
-};
-
-serializer_type serializer;
+#include <confreader.h>
+#include <serializer.h>
+#include <string>
+#include <sstream>
using namespace nrtb;
-
-template <class out, class in>
-transceiver::transceiver(Poco::Net::StreamSocket socket)
+using namespace std;
+
+serializer tscvr_sequence(0);
+
+template <class out, class in, class outp, class inp>
+transceiver<out,in,outp,inp>::transceiver(tcp_socketp socket)
{
// get the configuration parameters.
- conf_reader & config = conf_reader::get_instance();
+ global_conf_reader & config = global_conf_reader::get_instance();
send_time_limit = config.get<unsigned int>("transceiver.send_timeout",2);
attempt_recovery = config.get<bool>("transceiver.allow_recovery",true);
error_run_limit =
config.get<unsigned int>("transceiver.max_consecutive_errors",10);
sent_messages.resize(config.get<int>("transceiver.history_size",50));
- // set up uid
- uid = serializer();
// set up logging
- std::stringstream s << logname << "_" << uid ;
+ std::stringstream s;
+ s << "transceiver:_" << tscvr_sequence();
log = Poco::Logger::get(s.str());
-};
-
-template <class out, class in>
-transceiver::~transceiver()
-{
-
-};
-
-template <class out, class in>
-in transceiver::get()
-{
-
-};
-
-template <class out, class in>
-void transceiver::send(out & sendme)
-{
-
-};
-
-template <class out, class in>
-void transceiver::nak_invalid_context(const unsigned long int msg_number)
-{
-
-};
-
-template <class out, class in>
-void transceiver::nak_validation_error(const unsigned long int msg_number)
-{
-
-};
-
-template <class out, class in>
-void transceiver::handle_inbound_nak()
-{
-
-};
-
-template <class out, class in>
-void transceiver::handle_outbound_nak()
-{
-
+ // set up the socket.
+ sock(socket);
+ // annouce ourselves...
+ log->trace("Instanciated.");
+ s.str("");
+ s << "history_size=" << sent_messages.size()
+ << ", send_timeout=" << send_time_limit
+ << ", attempt_recovery=" << attempt_recovery
+ << ", error_run_limit=" << error_run_limit
+ << ", remote_address=" << sock->get_remote_address()
+ << ", local_address=" << sock->get_local_address();
+ log->trace(s.str());
+};
+
+template <class out, class in, class outp, class inp>
+transceiver<out,in,outp,inp>::~transceiver()
+{
+ log->trace("In ~transciever");
+ // shutdown the socket.
+ sock->close();
+ // discard the sent messages list.
+ sent_messages.clear();
+ log->trace("shutdown complete.");
+};
+
+template <class out, class in, class outp, class inp>
+inp transceiver<out,in,outp,inp>::get()
+{
+ inp returnme(new in);
+ string input = sock->getln();
+ returnme->ParseFromString(input);
+ // for the first messsge any number is
+ // accepted.
+ if (last_inbound == 0)
+ {
+ last_inbound = returnme->msg_uid();
+ }
+ else
+ {
+ last_inbound++;
+ int temp = returnme->msg_uid();
+ if (temp != last_inbound)
+ {
+ inbound_seq_error e();
+ stringstream message;
+ message << "Expected " << last_inbound
+ << " received " << temp;
+ e.store(message.str());
+ throw e;
+ };
+ };
+ return returnme;
+};
+
+template <class out, class in, class outp, class inp>
+void transceiver<out,in,outp,inp>::send(outp sendme)
+{
+ sendme->set_msg_uid(out_msg_num());
+ string output;
+ output = sendme->SerializeAsString() + "\r";
+ sock->put(output);
+ sent_messages.push_back(sendme);
+};
+
+template <class out, class in, class outp, class inp>
+void transceiver<out,in,outp,inp>::nak_invalid_context(const unsigned long int msg_number)
+{
+ general_exception e;
+ e.store("transceiver<outp,inp>::nak_invalid_context not implemented.");
+ throw e;
+};
+
+template <class out, class in, class outp, class inp>
+void transceiver<out,in,outp,inp>::nak_validation_error(const unsigned long int msg_number)
+{
+ general_exception e;
+ e.store("transceiver<outp,inp>::nak_validation_error not implemented.");
+ throw e;
+};
+
+template <class out, class in, class outp, class inp>
+void transceiver<out,in,outp,inp>::handle_inbound_nak()
+{
+ general_exception e;
+ e.store("transceiver<outp,inp>::handle_inbound_nak not implemented.");
+ throw e;
+};
+
+template <class out, class in, class outp, class inp>
+void transceiver<out,in,outp,inp>::handle_outbound_nak()
+{
+ general_exception e;
+ e.store("transceiver<outp,inp>::handle_outbound_nak not implemented.");
+ throw e;
};
=== modified file 'common/transceiver/transceiver.h'
--- common/transceiver/transceiver.h 2010-12-31 14:48:07 +0000
+++ common/transceiver/transceiver.h 2011-08-25 04:22:52 +0000
@@ -20,10 +20,13 @@
#define nrtb_transceiver_h
#include <string>
-#include <Poco/Net/StreamSocket.h>
-#include <Poco/Net/SocketStream.h>
+#include <sstream>
+#include <common.h>
+#include <base_socket.h>
+#include <serializer.h>
+#include <confreader.h>
#include <Poco/Logger.h>
-#include <Poco/Exception.h>
+#include <boost/shared_ptr.hpp>
#include <boost/circular_buffer.hpp>
namespace nrtb
@@ -47,21 +50,28 @@
* See https://blueprints.launchpad.net/nrtb/+spec/icp-spec for
* specification this class implements.
* ***************************************************************/
- template <class out, class in>
+ template <class out, class in,
+ class outp = boost::shared_ptr<out>,
+ class inp = boost::shared_ptr<in> >
class transceiver
{
public:
- /// outbound messages will be of this type
- typedef outbound_type out;
- /// inbound messages will be of this type.
- typedef inbound_type in;
- /**************************************************************
+ typedef inp in_ptr;
+ typedef outp out_ptr;
+
+ union msg_num_t
+ {
+ uint32_t number;
+ unsigned char bytes[4];
+ };
+
+ /*************************************************************
* Creates the transceiver and associates it with a provided
* socket. Once created this class assumes it uniquely owns the
* socket and will close it upon distruction.
* ***********************************************************/
- transceiver(Poco::Net::StreamSocket socket);
- /**************************************************************
+ transceiver(tcp_socketp socket);
+ /*************************************************************
* Closes the socket and releases all mmemory associated with
* this class.
* ***********************************************************/
@@ -70,13 +80,13 @@
* gets the next message from the socket. If no messages are
* ready, blocks util one arrives.
* ***********************************************************/
- in & get();
+ inp get();
/**************************************************************
* Sends a message over the socket and adds it to the
* sent_messages buffer in case it's needed for error recovery.
* ***********************************************************/
- void send(out & sendme);
- /**************************************************************
+ void send(outp sendme);
+ /**q************************************************************
* Called by the data consumer when an inbound message was
* not valid in the current application context. msg_number
* is the sequence number of the offending message.
@@ -91,37 +101,175 @@
/**************************************************************
* Exceptions which may be thrown for external resulution.
* ***********************************************************/
- // parent of all transceiver exceptions
- POCO_DECLARE_EXCEPTION(transceiver, general_exception, Poco::Exception)
- // thrown if send fails due to timeout.
- POCO_DECLARE_EXCEPTION(transceiver, send_timeout, general_exception)
- // thrown in the case of a fault while in get()
- POCO_DECLARE_EXCEPTION(transceiver, get_fault, general_exception)
- // Thrown if the socket is closed due to an unrecoverable date error.
- POCO_DECLARE_EXCEPTION(transceiver, unrecoverable_data_error, general_exception)
- // Thrown if the socket is closed due to too many errors in a row
- POCO_DECLARE_EXCEPTION(transceiver, consecutive_error_overrun, general_exception)
- protected:
- unsigned in uid;
- const std::string logname = "transceiver:";
+ /// parent of all transceiver exceptions
+ class general_exception : public nrtb::base_exception {};
+ /// thrown if the a unexpected msg_uid is received
+ class inbound_seq_error : public general_exception {};
+ /// thrown the connection is unexpectedly lost
+ class connection_lost: public general_exception {};
+ /// thrown if too many sequencial errors occur.
+ class too_many_errors: public general_exception {};
+
+ protected:
unsigned int send_time_limit;
bool attempt_recovery;
unsigned int error_run_limit;
- // pointer to this class's logger instance
- Poco::Logger * log;
- // The socket used for communcation.
- Poco::Net::StreamSocket sock;
- // the associated iostream (Do we need this?)
- Poco::Net::SocketStream stream;
- // buffer to hold previously sent messages; required for
- // error recovery.
- boost::circular_buffer<out> sent_messages;
- // fence post for recovery efforts, zero if none in play
+ /// the name used for logging
+ std::string logname;
+ /// The socket used for communcation.
+ tcp_socketp sock;
+ /// serializer used for message numbers
+ serializer out_msg_num;
+ /// last received message number
+ unsigned long long last_inbound;
+ /// buffer to hold previously sent messages; required for
+ /// error recovery.
+ boost::circular_buffer<outp> sent_messages;
+ /// fence post for recovery efforts, zero if none in play
unsigned long long nak_fence_post;
- // These methods implment actual nak recovery.
+ /// These methods implment actual nak recovery.
void handle_inbound_nak();
void handle_outbound_nak();
};
+
+
+serializer tscvr_sequence(0);
+
+template <class out, class in, class outp, class inp>
+transceiver<out,in,outp,inp>::transceiver(tcp_socketp socket)
+{
+ // get the configuration parameters.
+ global_conf_reader & config = global_conf_reader::get_instance();
+ send_time_limit = config.get<unsigned int>("transceiver.send_timeout",2);
+ attempt_recovery = config.get<bool>("transceiver.allow_recovery",true);
+ error_run_limit =
+ config.get<unsigned int>("transceiver.max_consecutive_errors",10);
+ sent_messages.resize(config.get<int>("transceiver.history_size",50));
+ last_inbound = 0;
+ // set up logging
+ std::stringstream s;
+ s << "transceiver:_" << tscvr_sequence();
+ logname = s.str();
+ Poco::Logger & log = Poco::Logger::get(logname);
+ // set up the socket.
+ sock = socket;
+ // annouce ourselves...
+ log.trace("Instanciated.");
+ s.str("");
+ s << "history_size=" << sent_messages.size()
+ << ", send_timeout=" << send_time_limit
+ << ", attempt_recovery=" << attempt_recovery
+ << ", error_run_limit=" << error_run_limit
+ << ", remote_address=" << sock->get_remote_address()
+ << ", local_address=" << sock->get_local_address();
+ log.trace(s.str());
+};
+
+template <class out, class in, class outp, class inp>
+transceiver<out,in,outp,inp>::~transceiver()
+{
+ Poco::Logger & log = Poco::Logger::get(logname);
+ log.trace("In ~transciever");
+ // shutdown the socket.
+ sock->close();
+ // discard the sent messages list.
+ sent_messages.clear();
+ log.trace("shutdown complete.");
+};
+
+template <class out, class in, class outp, class inp>
+inp transceiver<out,in,outp,inp>::get()
+{
+ // get the message length first.
+ std::string len_field = sock->get(4,10);
+//std::cout << "len_field=" << http_chartohex(len_field) << std::endl;
+ msg_num_t msg_len;
+ for (int i=0; i<4; i++)
+ {
+ msg_len.bytes[i] = len_field[i];
+ };
+//std::cout << ":len=" << msg_len.number << std::endl;
+ // get the rest of the message.
+ inp returnme(new in);
+ std::string input = sock->get(msg_len.number);
+//std::cout << ":received=" << http_chartohex(input) << std::endl;
+ returnme->ParseFromString(input);
+ // for the first messsge any number is
+ // accepted.
+ if (last_inbound == 0)
+ {
+ last_inbound = returnme->msg_uid();
+ }
+ else
+ {
+ last_inbound++;
+ int temp = returnme->msg_uid();
+ if (temp != last_inbound)
+ {
+ inbound_seq_error e;
+ std::stringstream message;
+ message << "Expected " << last_inbound
+ << " received " << temp;
+ e.store(message.str());
+ throw e;
+ };
+ };
+ return returnme;
+};
+
+template <class out, class in, class outp, class inp>
+void transceiver<out,in,outp,inp>::send(outp sendme)
+{
+ sendme->set_msg_uid(out_msg_num());
+ std::string output;
+ output = sendme->SerializeAsString();
+ msg_num_t msg_len;
+ msg_len.number = output.size();
+//std::cout << "num:len" << msg_len.number << ":" << output.length() << //std::endl;
+ std::string num_field = " ";
+ for (int i=0; i<4; i++)
+ {
+ num_field[i] = msg_len.bytes[i];
+//std::cout << int(num_field[i]) << "," ;
+ };
+//std::cout << " = " << msg_len.number << std::endl;
+ output = num_field + output;
+//std::cout << "out msg=" << http_chartohex(output) << std::endl;
+ sock->put(output);
+ sent_messages.push_back(sendme);
+};
+
+template <class out, class in, class outp, class inp>
+void transceiver<out,in,outp,inp>::nak_invalid_context(const unsigned long int msg_number)
+{
+ general_exception e;
+ e.store("transceiver<outp,inp>::nak_invalid_context not implemented.");
+ throw e;
+};
+
+template <class out, class in, class outp, class inp>
+void transceiver<out,in,outp,inp>::nak_validation_error(const unsigned long int msg_number)
+{
+ general_exception e;
+ e.store("transceiver<outp,inp>::nak_validation_error not implemented.");
+ throw e;
+};
+
+template <class out, class in, class outp, class inp>
+void transceiver<out,in,outp,inp>::handle_inbound_nak()
+{
+ general_exception e;
+ e.store("transceiver<outp,inp>::handle_inbound_nak not implemented.");
+ throw e;
+};
+
+template <class out, class in, class outp, class inp>
+void transceiver<out,in,outp,inp>::handle_outbound_nak()
+{
+ general_exception e;
+ e.store("transceiver<outp,inp>::handle_outbound_nak not implemented.");
+ throw e;
+};
} // namespace nrtb
=== modified file 'common/transceiver/transceiver_test.cpp'
--- common/transceiver/transceiver_test.cpp 2010-12-30 03:05:40 +0000
+++ common/transceiver/transceiver_test.cpp 2011-08-25 04:22:52 +0000
@@ -14,4 +14,132 @@
You should have received a copy of the GNU General Public License
along with NRTB. If not, see <http://www.gnu.org/licenses/>.
- **********************************************/
\ No newline at end of file
+ **********************************************/
+
+#include <string>
+#include <iostream>
+#include <base_thread.h>
+#include <base_socket.h>
+#include "transceiver.h"
+#include <log_setup.h>
+#include <sim_to_db_wrapper.pb.h>
+#include <boost/random.hpp>
+
+using namespace nrtb;
+using namespace std;
+
+typedef nrtb_msg::sim_to_db my_msg;
+typedef boost::shared_ptr<my_msg> my_msgp;
+typedef transceiver<my_msg,my_msg> linkt;
+
+int er_count = 0;
+
+class server_work_thread: public runnable
+{
+public:
+
+ tcp_socketp sock;
+ unsigned long long last_inbound;
+
+ void run()
+ {
+ set_cancel_anytime();
+ linkt link(sock);
+ while (sock->status() == tcp_socket::sock_connect)
+ {
+ try
+ {
+ linkt::out_ptr inbound = link.get();
+ last_inbound = inbound->msg_uid();
+ cout << "\tReceived #" << last_inbound << endl;
+ if (last_inbound == 99)
+ exit(0);
+ }
+ catch (linkt::general_exception & e)
+ {
+ cerr << "Server work thread caught " << e.what()
+ << "\n\tComment: " << e.comment() << endl;
+ er_count++;
+ }
+ catch (tcp_socket::general_exception & e)
+ {
+ cerr << "Server work thread caught " << e.what()
+ << "\n\tComment: " << e.comment() << endl;
+ er_count++;
+ }
+ catch (std::exception & e)
+ {
+ cerr << "Server work thread caught " << e.what()
+ << endl;
+ er_count++;
+ };
+ };
+ };
+};
+
+thread process;
+server_work_thread task;
+
+class listener: public tcp_server_socket_factory
+{
+public:
+ listener(const string & add, const int & back)
+ : tcp_server_socket_factory(add, back) {};
+
+ void on_accept()
+ {
+ if (!process.is_running())
+ {
+ task.last_inbound = 0;
+ task.sock.reset(connect_sock);
+ process.start(task);
+ cout << "server thread running." << endl;
+ }
+ else
+ {
+ tcp_socketp s(connect_sock);
+ s->close();
+ cerr << "Multiple attempts to connect to server"
+ << endl;
+ };
+ };
+};
+
+string address = "127.0.0.1:";
+int port_base = 12334;
+
+int main()
+{
+ setup_global_logging("transceiver.log");
+
+ //set up our port and address
+ boost::mt19937 rng;
+ rng.seed(time(0));
+ boost::uniform_int<> r(0,1000);
+ stringstream s;
+ s << address << port_base + r(rng);
+ address = s.str();
+ cout << "Using " << address << endl;
+
+ // kick off the listener thread.
+ listener server(address,5);
+ server.start_listen();
+ usleep(5e5);
+
+ // set up our sender
+ tcp_socketp sock(new tcp_socket);
+ sock->connect(address);
+ linkt sender(sock);
+
+ // Let's send a few things.
+ for (int i=0; i<100; i++)
+ {
+ my_msgp msg(new my_msg);
+ sender.send(msg);
+ cout << "Sent " << msg->msg_uid() << endl;
+ usleep(1e4);
+ };
+
+ usleep(5e5);
+ return er_count;
+};
\ No newline at end of file