mosquitto-users team mailing list archive
-
mosquitto-users team
-
Mailing list archive
-
Message #00139
Re: using libmosquitto in an external event loop
Hello Roger,
Thanks for replying.
I would expect the bindings to be functionally equivalent to what the C
loop is doing. This is not the case since:
1/ the socket is not properly closed through _mosquitto_socket_close
2/ _mosquitto_socket_close cleans up mosq->ssl, mosq->ssl_ctx and mosq->sock
3/ mosquitto_loop checks against the mosq_cs_disconnecting state and
calls the mosq->on_disconnect callback.
Please find in attachment a patch against the repo tip that integrates
the rc handling in mosquitto_loop_read and mosquitto_loop_write. The
patch works nicely with following Lua event loop:
local fds = { timer, broker }
while true do
local poll = nixio.poll(fds, POLL_TIMEOUT_MS)
if not poll then -- poll == -1
elseif poll == 0 then
elseif poll > 0 then
if nixio.bit.check(broker.revents, POLLIN) then
mqtt:read(MOSQ_MAX_READ)
end
if nixio.bit.check(broker.revents, POLLOUT) and
mqtt:want_write() then
mqtt:write(MOSQ_MAX_WRITE)
end
if nixio.bit.check(timer.revents, POLLIN) then
timer.fd:numexp() -- reset the numexp counter
mqtt:misc() -- mqtt housekeeping
end
end
while (not mqtt:socket()) and (not mqtt:reconnect()) do
print("trying to reconnect to broker ...")
nixio.nanosleep(1, 0)
end
if mqtt:want_write() then
broker.events = nixio.bit.set(broker.events, POLLOUT)
else
broker.events = nixio.bit.unset(broker.events, POLLOUT)
end
end
I also ran a quick test of the patch against mosquitto_loop_start in
combination with a stop/start of the broker. I didn't notice a change in
client behaviour.
Cheers
/Bart
On 12/13/2012 09:26 AM, Roger Light wrote:
Hi Bart,
Would the lua equivalent of this do what you want?
rc = mosquitto_loop_read(mosq);
if(rc == MOSQ_ERR_CONN_LOST){
/* We've been disconnected from the server */
mosquitto_reconnect(mosq);
do_something_with_socket(mosquitto_socket(mosq));
}
Apologies for the brief reply, I'm just heading out.
Cheers,
Roger
On Wed, Dec 12, 2012 at 9:41 PM, Bart Van Der Meerssche
<bart.vandermeerssche@xxxxxxxxxx> wrote:
Hello list,
I'm currently writing Lua bindings for libmosquitto [1]. Since Lua does not
support preemptive multithreading, using mosquitto_loop_start is not an
option. As suggested in the libmosquitto documentation, I'm integrating
mosquitto_loop_read/write/misc into a Lua event loop instead. The basic
functionality works ok. However, when a network error occurs or the broker
shuts down, the event loop starts busy looping. strace output:
poll([{fd=5, events=POLLIN}, {fd=6, events=POLLIN}], 2, -1) = 1 ([{fd=6,
revents=POLLIN}])
read(6, "", 1) = 0
poll([{fd=5, events=POLLIN}, {fd=6, events=POLLIN}], 2, -1) = 1 ([{fd=6,
revents=POLLIN}])
read(6, "", 1) = 0
...
stracing the libmosquitto event loop:
[pid 12721] select(4, [3], [], NULL, {1, 0}) = 1 (in [3], left {0, 332713})
[pid 12721] read(3, "", 1) = 0
[pid 12721] close(3) = 0
Looking at the mosquitto_loop code shows that if a read or write error
occurs, the socket is closed with the private _mosquitto_socket_close
function, which updates the internal mosq state as well. This means that the
current libmosquitto API doesn't allow a proper implementation in an
external event loop. Making _mosquitto_socket_close public would be a start.
Integrating the rc handling of:
* rc = mosquitto_loop_read(mosq, max_packets)
* rc = mosquitto_loop_write(mosq, max_packets)
inside these public functions could be an even cleaner approach.
Would such a code change be an option?
[1]
https://github.com/flukso/flm02/tree/rfm12/openwrt/package/lua-mosquitto/src
Cheers
/Bart
--
Mailing list: https://launchpad.net/~mosquitto-users
Post to : mosquitto-users@xxxxxxxxxxxxxxxxxxx
Unsubscribe : https://launchpad.net/~mosquitto-users
More help : https://help.launchpad.net/ListHelp
diff -r aacc57ff6b21 lib/mosquitto.c
--- a/lib/mosquitto.c Sat Nov 03 12:01:39 2012 +0000
+++ b/lib/mosquitto.c Thu Dec 13 17:20:50 2012 +0100
@@ -669,39 +669,13 @@
}else{
if(FD_ISSET(mosq->sock, &readfds)){
rc = mosquitto_loop_read(mosq, max_packets);
- if(rc){
- _mosquitto_socket_close(mosq);
- pthread_mutex_lock(&mosq->state_mutex);
- if(mosq->state == mosq_cs_disconnecting){
- rc = MOSQ_ERR_SUCCESS;
- }
- pthread_mutex_unlock(&mosq->state_mutex);
- pthread_mutex_lock(&mosq->callback_mutex);
- if(mosq->on_disconnect){
- mosq->in_callback = true;
- mosq->on_disconnect(mosq, mosq->obj, rc);
- mosq->in_callback = false;
- }
- pthread_mutex_unlock(&mosq->callback_mutex);
+ if(rc || mosq->sock == INVALID_SOCKET){
return rc;
}
}
if(FD_ISSET(mosq->sock, &writefds)){
rc = mosquitto_loop_write(mosq, max_packets);
- if(rc){
- _mosquitto_socket_close(mosq);
- pthread_mutex_lock(&mosq->state_mutex);
- if(mosq->state == mosq_cs_disconnecting){
- rc = MOSQ_ERR_SUCCESS;
- }
- pthread_mutex_unlock(&mosq->state_mutex);
- pthread_mutex_lock(&mosq->callback_mutex);
- if(mosq->on_disconnect){
- mosq->in_callback = true;
- mosq->on_disconnect(mosq, mosq->obj, rc);
- mosq->in_callback = false;
- }
- pthread_mutex_unlock(&mosq->callback_mutex);
+ if(rc || mosq->sock == INVALID_SOCKET){
return rc;
}
}
@@ -746,6 +720,27 @@
return MOSQ_ERR_SUCCESS;
}
+static int _mosquitto_loop_rc_handle(struct mosquitto *mosq, int rc)
+{
+ if(rc){
+ _mosquitto_socket_close(mosq);
+ pthread_mutex_lock(&mosq->state_mutex);
+ if(mosq->state == mosq_cs_disconnecting){
+ rc = MOSQ_ERR_SUCCESS;
+ }
+ pthread_mutex_unlock(&mosq->state_mutex);
+ pthread_mutex_lock(&mosq->callback_mutex);
+ if(mosq->on_disconnect){
+ mosq->in_callback = true;
+ mosq->on_disconnect(mosq, mosq->obj, rc);
+ mosq->in_callback = false;
+ }
+ pthread_mutex_unlock(&mosq->callback_mutex);
+ return rc;
+ }
+ return rc;
+}
+
int mosquitto_loop_read(struct mosquitto *mosq, int max_packets)
{
int rc;
@@ -755,7 +750,7 @@
for(i=0; i<max_packets; i++){
rc = _mosquitto_packet_read(mosq);
if(rc || errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
- return rc;
+ return _mosquitto_loop_rc_handle(mosq, rc);
}
}
return rc;
@@ -770,7 +765,7 @@
for(i=0; i<max_packets; i++){
rc = _mosquitto_packet_write(mosq);
if(rc || errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
- return rc;
+ return _mosquitto_loop_rc_handle(mosq, rc);
}
}
return rc;
References