← Back to team overview

mqtt-users team mailing list archive

mosquitto locking up

 

Hi, I am testing mosquitto on a 64-bit Windows 7 environment. (Core-i5, 8G
ram, ...).

I am using the latest paho client code.

When I run the following test classes with rsmb, they work flawlessly. When
I run them with mosquitto, they lock up. I am running mosquitto as a
Windows service with no special configuration changes since installation
(I've also tried starting the broker from the command line).

Basically there are 2 classes. The Subscriber class starts up a
configurable set of threads that subscribe to a topic, and listen
continually until the application is closed. Any messages received are
output to the console. The Publisher class publishes a configurable number
of messages on the channel.

The code (2 classes, Publisher and Subscriber):
/**
 *
 */
package org.mike;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;

public class Publisher {

public static void main(String[] args) {
try {
MqttClient client = new MqttClient("tcp://localhost:1883",
"publisher_client_id");
client.connect();
MqttTopic topic = client.getTopic("clients");
for (int i = 0; i < 1000; i++) {
MqttMessage msg = new MqttMessage(new String("payload:"+i).getBytes());
MqttDeliveryToken token = topic.publish(msg);
token.waitForCompletion();
}
client.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}
}

/**
 *
 */
package org.mike;

import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;

public class Subscriber {

static int NUM_SUBSCRIBERS = 10;

public static void main(String[] args) {
for (int i = 0; i < NUM_SUBSCRIBERS; i++) {
startSubscriberThread(i);
}
}

private static void startSubscriberThread(final int id) {
Thread t = new Thread(new Runnable(){
@Override
public void run() {
startSubscriber(Integer.toString(id));
}
});
t.start();
}

private static void startSubscriber(String id) {
boolean listening = true;
try {
MqttClient client = new MqttClient("tcp://localhost:1883", id);
client.setCallback(createCallBack(id));
client.connect();
 System.out.println("subscribing:"+id);
while (listening) {
client.subscribe("clients");
}
client.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}

private static MqttCallback createCallBack(final String id) {
return new MqttCallback() {

@Override
public void connectionLost(Throwable cause) {
System.out.println("Subscriber connection lost.");
}

@Override
public void messageArrived(MqttTopic topic, MqttMessage message)
throws Exception {
System.out.println("msg arrived; id:"+id+":"+new
String(message.getPayload()));
}

@Override
public void deliveryComplete(MqttDeliveryToken token) {
}
};
}
}

Follow ups