← Back to team overview

mosquitto-users team mailing list archive

Re: using libmosquitto in an external event loop

 

Hello Roger,

Thanks for replying.

I would expect the bindings to be functionally equivalent to what the C loop is doing. This is not the case since:
1/ the socket is not properly closed through _mosquitto_socket_close
2/ _mosquitto_socket_close cleans up mosq->ssl, mosq->ssl_ctx and mosq->sock
3/ mosquitto_loop checks against the mosq_cs_disconnecting state and calls the mosq->on_disconnect callback.

Please find in attachment a patch against the repo tip that integrates the rc handling in mosquitto_loop_read and mosquitto_loop_write. The patch works nicely with following Lua event loop:

local fds = { timer, broker }

while true do
    local poll = nixio.poll(fds, POLL_TIMEOUT_MS)

    if not poll then -- poll == -1
    elseif poll == 0 then
    elseif poll > 0 then
        if nixio.bit.check(broker.revents, POLLIN) then
            mqtt:read(MOSQ_MAX_READ)
        end

if nixio.bit.check(broker.revents, POLLOUT) and mqtt:want_write() then
            mqtt:write(MOSQ_MAX_WRITE)
        end

        if nixio.bit.check(timer.revents, POLLIN) then
            timer.fd:numexp() -- reset the numexp counter
            mqtt:misc()       -- mqtt housekeeping
        end
    end

    while (not mqtt:socket()) and (not mqtt:reconnect()) do
        print("trying to reconnect to broker ...")
        nixio.nanosleep(1, 0)
    end

    if mqtt:want_write() then
        broker.events = nixio.bit.set(broker.events, POLLOUT)
    else
        broker.events = nixio.bit.unset(broker.events, POLLOUT)
    end
end

I also ran a quick test of the patch against mosquitto_loop_start in combination with a stop/start of the broker. I didn't notice a change in client behaviour.

Cheers
/Bart


On 12/13/2012 09:26 AM, Roger Light wrote:
Hi Bart,

Would the lua equivalent of this do what you want?

rc = mosquitto_loop_read(mosq);
if(rc == MOSQ_ERR_CONN_LOST){
     /* We've been disconnected from the server */
     mosquitto_reconnect(mosq);
     do_something_with_socket(mosquitto_socket(mosq));
}

Apologies for the brief reply, I'm just heading out.

Cheers,

Roger

On Wed, Dec 12, 2012 at 9:41 PM, Bart Van Der Meerssche
<bart.vandermeerssche@xxxxxxxxxx> wrote:
Hello list,

I'm currently writing Lua bindings for libmosquitto [1]. Since Lua does not
support preemptive multithreading, using mosquitto_loop_start is not an
option. As suggested in the libmosquitto documentation, I'm integrating
mosquitto_loop_read/write/misc into a Lua event loop instead. The basic
functionality works ok. However, when a network error occurs or the broker
shuts down, the event loop starts busy looping. strace output:

poll([{fd=5, events=POLLIN}, {fd=6, events=POLLIN}], 2, -1) = 1 ([{fd=6,
revents=POLLIN}])
read(6, "", 1)                          = 0
poll([{fd=5, events=POLLIN}, {fd=6, events=POLLIN}], 2, -1) = 1 ([{fd=6,
revents=POLLIN}])
read(6, "", 1)                          = 0

...

stracing the libmosquitto event loop:
[pid 12721] select(4, [3], [], NULL, {1, 0}) = 1 (in [3], left {0, 332713})
[pid 12721] read(3, "", 1)              = 0
[pid 12721] close(3)                    = 0

Looking at the mosquitto_loop code shows that if a read or write error
occurs, the socket is closed with the private _mosquitto_socket_close
function, which updates the internal mosq state as well. This means that the
current libmosquitto API doesn't allow a proper implementation in an
external event loop. Making _mosquitto_socket_close public would be a start.
Integrating the rc handling of:
* rc = mosquitto_loop_read(mosq, max_packets)
* rc = mosquitto_loop_write(mosq, max_packets)
inside these public functions could be an even cleaner approach.

Would such a code change be an option?

[1]
https://github.com/flukso/flm02/tree/rfm12/openwrt/package/lua-mosquitto/src


Cheers
/Bart

--
Mailing list: https://launchpad.net/~mosquitto-users
Post to     : mosquitto-users@xxxxxxxxxxxxxxxxxxx
Unsubscribe : https://launchpad.net/~mosquitto-users
More help   : https://help.launchpad.net/ListHelp


diff -r aacc57ff6b21 lib/mosquitto.c
--- a/lib/mosquitto.c	Sat Nov 03 12:01:39 2012 +0000
+++ b/lib/mosquitto.c	Thu Dec 13 17:20:50 2012 +0100
@@ -669,39 +669,13 @@
 	}else{
 		if(FD_ISSET(mosq->sock, &readfds)){
 			rc = mosquitto_loop_read(mosq, max_packets);
-			if(rc){
-				_mosquitto_socket_close(mosq);
-				pthread_mutex_lock(&mosq->state_mutex);
-				if(mosq->state == mosq_cs_disconnecting){
-					rc = MOSQ_ERR_SUCCESS;
-				}
-				pthread_mutex_unlock(&mosq->state_mutex);
-				pthread_mutex_lock(&mosq->callback_mutex);
-				if(mosq->on_disconnect){
-					mosq->in_callback = true;
-					mosq->on_disconnect(mosq, mosq->obj, rc);
-					mosq->in_callback = false;
-				}
-				pthread_mutex_unlock(&mosq->callback_mutex);
+			if(rc || mosq->sock == INVALID_SOCKET){
 				return rc;
 			}
 		}
 		if(FD_ISSET(mosq->sock, &writefds)){
 			rc = mosquitto_loop_write(mosq, max_packets);
-			if(rc){
-				_mosquitto_socket_close(mosq);
-				pthread_mutex_lock(&mosq->state_mutex);
-				if(mosq->state == mosq_cs_disconnecting){
-					rc = MOSQ_ERR_SUCCESS;
-				}
-				pthread_mutex_unlock(&mosq->state_mutex);
-				pthread_mutex_lock(&mosq->callback_mutex);
-				if(mosq->on_disconnect){
-					mosq->in_callback = true;
-					mosq->on_disconnect(mosq, mosq->obj, rc);
-					mosq->in_callback = false;
-				}
-				pthread_mutex_unlock(&mosq->callback_mutex);
+			if(rc || mosq->sock == INVALID_SOCKET){
 				return rc;
 			}
 		}
@@ -746,6 +720,27 @@
 	return MOSQ_ERR_SUCCESS;
 }
 
+static int _mosquitto_loop_rc_handle(struct mosquitto *mosq, int rc)
+{
+	if(rc){
+		_mosquitto_socket_close(mosq);
+		pthread_mutex_lock(&mosq->state_mutex);
+		if(mosq->state == mosq_cs_disconnecting){
+			rc = MOSQ_ERR_SUCCESS;
+		}
+		pthread_mutex_unlock(&mosq->state_mutex);
+		pthread_mutex_lock(&mosq->callback_mutex);
+		if(mosq->on_disconnect){
+			mosq->in_callback = true;
+			mosq->on_disconnect(mosq, mosq->obj, rc);
+			mosq->in_callback = false;
+		}
+		pthread_mutex_unlock(&mosq->callback_mutex);
+		return rc;
+	}
+	return rc;
+}
+
 int mosquitto_loop_read(struct mosquitto *mosq, int max_packets)
 {
 	int rc;
@@ -755,7 +750,7 @@
 	for(i=0; i<max_packets; i++){
 		rc = _mosquitto_packet_read(mosq);
 		if(rc || errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
-			return rc;
+			return _mosquitto_loop_rc_handle(mosq, rc);
 		}
 	}
 	return rc;
@@ -770,7 +765,7 @@
 	for(i=0; i<max_packets; i++){
 		rc = _mosquitto_packet_write(mosq);
 		if(rc || errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
-			return rc;
+			return _mosquitto_loop_rc_handle(mosq, rc);
 		}
 	}
 	return rc;

References