doReturn(voidFutureComplete).when(connection).unsubscribeAll();
doReturn(CompletableFuture.completedFuture(true)).when(connection).subscribe(any(), any());
doReturn(CompletableFuture.completedFuture(true)).when(connection).unsubscribe(any(), any());
- doReturn(CompletableFuture.completedFuture(true)).when(connection).publish(any(), any());
doReturn(CompletableFuture.completedFuture(true)).when(connection).publish(any(), any(), anyInt(),
anyBoolean());
doReturn(voidFutureComplete).when(connection).unsubscribeAll();
doReturn(CompletableFuture.completedFuture(true)).when(connection).subscribe(any(), any());
doReturn(CompletableFuture.completedFuture(true)).when(connection).unsubscribe(any(), any());
- doReturn(CompletableFuture.completedFuture(true)).when(connection).publish(any(), any());
doReturn(CompletableFuture.completedFuture(true)).when(connection).publish(any(), any(), anyInt(),
anyBoolean());
@Override
@RuleAction(label = "@text/actionLabel", description = "@text/actionDesc")
public void publishMQTT(
- @ActionInput(name = "topic", label = "@text/actionInputTopicLabel", description = "@text/actionInputTopicDesc") @Nullable String topic,
- @ActionInput(name = "value", label = "@text/actionInputValueLabel", description = "@text/actionInputValueDesc") @Nullable String value,
- @ActionInput(name = "retain", label = "@text/actionInputRetainlabel", description = "@text/actionInputRetainDesc") @Nullable Boolean retain) {
+ @ActionInput(name = "topic", label = "@text/actionInputTopicLabel", description = "@text/actionInputTopicDesc") @Nullable final String topic,
+ @ActionInput(name = "value", label = "@text/actionInputValueLabel", description = "@text/actionInputValueDesc") @Nullable final String value,
+ @ActionInput(name = "retain", label = "@text/actionInputRetainlabel", description = "@text/actionInputRetainDesc") @Nullable final Boolean retain) {
AbstractBrokerHandler brokerHandler = handler;
if (brokerHandler == null) {
logger.warn("MQTT Action service ThingHandler is null!");
logger.debug("skipping MQTT publishing of value '{}' as topic is null.", value);
return;
}
- if (retain == null) {
- retain = connection.isRetain();
- }
- connection.publish(topic, value.getBytes(), connection.getQos(), retain).thenRun(() -> {
- logger.debug("MQTT publish to {} performed", topic);
- }).exceptionally(e -> {
- logger.warn("MQTT publish to {} failed!", topic);
- return null;
- });
+
+ connection.publish(topic, value.getBytes(), connection.getQos(), retain != null && retain.booleanValue())
+ .thenRun(() -> {
+ logger.debug("MQTT publish to {} performed", topic);
+ }).exceptionally(e -> {
+ logger.warn("MQTT publish to {} failed!", topic);
+ return null;
+ });
}
public static void publishMQTT(@Nullable ThingActions actions, @Nullable String topic, @Nullable String value) {
/**
* Publish a message to all connected brokers
*
- * @param topic The topic to publish on
- * @param payload The message to publish
+ * @param topic The topic
+ * @param payload The message payload
+ * @param qos The quality of service for this message
+ * @param retain Set to true to retain the message on the broker
*/
- void publish(String topic, byte[] payload);
+ void publish(String topic, byte[] payload, int qos, boolean retain);
}
connection.setTimeoutExecutor(scheduler, TIMEOUT_DEFAULT);
}
- connection.setRetain(config.retainMessages);
-
return connection;
}
properties.put(PROPERTY_PASSWORD, password);
}
properties.put(PROPERTY_QOS, String.valueOf(connection.getQos()));
- properties.put(PROPERTY_RETAIN, String.valueOf(connection.isRetain()));
final MqttWillAndTestament lastWill = connection.getLastWill();
if (lastWill != null) {
properties.put(PROPERTY_LAST_WILL, lastWill.toString());
*/
package org.openhab.binding.mqtt.internal;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.WeakHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
}
@Override
- public void publish(String topic, byte[] payload) {
+ public void publish(String topic, byte[] payload, int qos, boolean retain) {
handlers.forEach(handler -> {
handler.getConnectionAsync().thenAccept(connection -> {
- connection.publish(topic, payload);
+ connection.publish(topic, payload, qos, retain);
});
});
}
import org.mockito.junit.jupiter.MockitoExtension;
import org.openhab.core.OpenHAB;
import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
+import org.openhab.core.io.transport.mqtt.MqttBrokerConnection.MqttVersion;
import org.openhab.core.io.transport.mqtt.MqttBrokerConnection.Protocol;
import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
import org.openhab.core.io.transport.mqtt.MqttConnectionState;
verify(service).addBrokerConnection(anyString(), eq(c));
// Connect with a second connection but wrong credentials
- MqttBrokerConnection wrongCredentials = new MqttBrokerConnection(Protocol.TCP, c.getHost(), c.getPort(), false,
- "wrongCred");
+ MqttBrokerConnection wrongCredentials = new MqttBrokerConnection(Protocol.TCP, MqttVersion.V3, c.getHost(),
+ c.getPort(), false, "wrongCred");
wrongCredentials.setCredentials("someUser", "somePassword");
if (wrongCredentials.start().get()) {
wrongCredentials.stop().get();
// Connect with a second connection but correct credentials
- MqttBrokerConnection correctCredentials = new MqttBrokerConnection(Protocol.TCP, c.getHost(), c.getPort(),
- false, "correctCred");
+ MqttBrokerConnection correctCredentials = new MqttBrokerConnection(Protocol.TCP, MqttVersion.V3, c.getHost(),
+ c.getPort(), false, "correctCred");
correctCredentials.setCredentials(c.getUser(), c.getPassword());
if (!correctCredentials.start().get()) {