mqtt-users team mailing list archive
-
mqtt-users team
-
Mailing list archive
-
Message #00108
mosquitto locking up
Hi, I am testing mosquitto on a 64-bit Windows 7 environment. (Core-i5, 8G
ram, ...).
I am using the latest paho client code.
When I run the following test classes with rsmb, they work flawlessly. When
I run them with mosquitto, they lock up. I am running mosquitto as a
Windows service with no special configuration changes since installation
(I've also tried starting the broker from the command line).
Basically there are 2 classes. The Subscriber class starts up a
configurable set of threads that subscribe to a topic, and listen
continually until the application is closed. Any messages received are
output to the console. The Publisher class publishes a configurable number
of messages on the channel.
The code (2 classes, Publisher and Subscriber):
/**
*
*/
package org.mike;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
public class Publisher {
public static void main(String[] args) {
try {
MqttClient client = new MqttClient("tcp://localhost:1883",
"publisher_client_id");
client.connect();
MqttTopic topic = client.getTopic("clients");
for (int i = 0; i < 1000; i++) {
MqttMessage msg = new MqttMessage(new String("payload:"+i).getBytes());
MqttDeliveryToken token = topic.publish(msg);
token.waitForCompletion();
}
client.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
/**
*
*/
package org.mike;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
public class Subscriber {
static int NUM_SUBSCRIBERS = 10;
public static void main(String[] args) {
for (int i = 0; i < NUM_SUBSCRIBERS; i++) {
startSubscriberThread(i);
}
}
private static void startSubscriberThread(final int id) {
Thread t = new Thread(new Runnable(){
@Override
public void run() {
startSubscriber(Integer.toString(id));
}
});
t.start();
}
private static void startSubscriber(String id) {
boolean listening = true;
try {
MqttClient client = new MqttClient("tcp://localhost:1883", id);
client.setCallback(createCallBack(id));
client.connect();
System.out.println("subscribing:"+id);
while (listening) {
client.subscribe("clients");
}
client.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}
private static MqttCallback createCallBack(final String id) {
return new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
System.out.println("Subscriber connection lost.");
}
@Override
public void messageArrived(MqttTopic topic, MqttMessage message)
throws Exception {
System.out.println("msg arrived; id:"+id+":"+new
String(message.getPayload()));
}
@Override
public void deliveryComplete(MqttDeliveryToken token) {
}
};
}
}
Follow ups