]> git.basschouten.com Git - openhab-addons.git/blob
a912136fe1a2c2431ec8571f1ea98f2eff8a2e25
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2022 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.discovery;
14
15 import java.util.concurrent.CompletableFuture;
16
17 import org.eclipse.jdt.annotation.NonNullByDefault;
18 import org.eclipse.jdt.annotation.Nullable;
19 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
20 import org.openhab.core.io.transport.mqtt.MqttMessageSubscriber;
21 import org.openhab.core.thing.ThingUID;
22
23 /**
24  * Represents a MQTT subscription for one specific topic. This is an immutable class.
25  *
26  * @author David Graeff - Initial contribution
27  */
28 @NonNullByDefault
29 public class TopicSubscribe implements MqttMessageSubscriber {
30     final @Nullable MqttBrokerConnection connection;
31     final ThingUID thing;
32     final String topic;
33     final MQTTTopicDiscoveryParticipant topicDiscoveredListener;
34
35     private boolean isStarted = false;
36
37     /**
38      * Creates a {@link TopicSubscribe} object.
39      *
40      * @param connection The broker connection
41      * @param topic The topic
42      * @param topicDiscoveredListener A listener
43      * @param thing A thing, used as an argument to the listener callback.
44      */
45     public TopicSubscribe(@Nullable MqttBrokerConnection connection, String topic,
46             MQTTTopicDiscoveryParticipant topicDiscoveredListener, ThingUID thing) {
47         this.connection = connection;
48         this.thing = thing;
49         this.topic = topic;
50         this.topicDiscoveredListener = topicDiscoveredListener;
51     }
52
53     @Override
54     public void processMessage(String topic, byte[] payload) {
55         final MqttBrokerConnection connection = this.connection;
56         if (connection == null) {
57             return;
58         }
59         if (payload.length > 0) {
60             topicDiscoveredListener.receivedMessage(thing, connection, topic, payload);
61         } else {
62             topicDiscoveredListener.topicVanished(thing, connection, topic);
63         }
64     }
65
66     /**
67      * Subscribe to the topic
68      *
69      * @return Completes with true if successful. Completes with false if not connected yet. Exceptionally otherwise.
70      */
71     public CompletableFuture<Boolean> start() {
72         CompletableFuture<Boolean> startFuture = connection == null ? CompletableFuture.completedFuture(true)
73                 : connection.subscribe(topic, this);
74         isStarted = true;
75         return startFuture;
76     }
77
78     /**
79      * Unsubscribes from the topic
80      *
81      * @return Completes with true if successful. Exceptionally otherwise.
82      */
83     public CompletableFuture<Boolean> stop() {
84         CompletableFuture<Boolean> stopFuture = connection == null || !isStarted
85                 ? CompletableFuture.completedFuture(true)
86                 : connection.unsubscribe(topic, this);
87         isStarted = false;
88         return stopFuture;
89     }
90
91     /**
92      * status of this topic subscription
93      *
94      * @return true if started
95      */
96     public boolean isStarted() {
97         return isStarted;
98     }
99 }