@Override
public void processMessage(String topic, byte[] payload) {
final MqttBrokerConnection connection = this.connection;
- if (connection == null)
+ if (connection == null) {
return;
+ }
if (payload.length > 0) {
topicDiscoveredListener.receivedMessage(thing, connection, topic, payload);
} else {
* @return Completes with true if successful. Exceptionally otherwise.
*/
public CompletableFuture<Boolean> stop() {
- CompletableFuture<Boolean> stopFuture = connection == null ? CompletableFuture.completedFuture(true)
+ CompletableFuture<Boolean> stopFuture = connection == null || !isStarted
+ ? CompletableFuture.completedFuture(true)
: connection.unsubscribe(topic, this);
isStarted = false;
return stopFuture;