]> git.basschouten.com Git - openhab-addons.git/blob
10ac1d7c215f8615eeddb67f316f4829ad06b5ac
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2021 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.discovery;
14
15 import java.util.concurrent.CompletableFuture;
16
17 import org.eclipse.jdt.annotation.NonNullByDefault;
18 import org.eclipse.jdt.annotation.Nullable;
19 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
20 import org.openhab.core.io.transport.mqtt.MqttMessageSubscriber;
21 import org.openhab.core.thing.ThingUID;
22
23 /**
24  * Represents a MQTT subscription for one specific topic. This is an immutable class.
25  *
26  * @author David Graeff - Initial contribution
27  */
28 @NonNullByDefault
29 public class TopicSubscribe implements MqttMessageSubscriber {
30     final @Nullable MqttBrokerConnection connection;
31     final ThingUID thing;
32     final String topic;
33     final MQTTTopicDiscoveryParticipant topicDiscoveredListener;
34
35     private boolean isStarted = false;
36
37     /**
38      * Creates a {@link TopicSubscribe} object.
39      *
40      * @param connection The broker connection
41      * @param topic The topic
42      * @param topicDiscoveredListener A listener
43      * @param thing A thing, used as an argument to the listener callback.
44      */
45     public TopicSubscribe(@Nullable MqttBrokerConnection connection, String topic,
46             MQTTTopicDiscoveryParticipant topicDiscoveredListener, ThingUID thing) {
47         this.connection = connection;
48         this.thing = thing;
49         this.topic = topic;
50         this.topicDiscoveredListener = topicDiscoveredListener;
51     }
52
53     @Override
54     public void processMessage(String topic, byte[] payload) {
55         final MqttBrokerConnection connection = this.connection;
56         if (connection == null)
57             return;
58         if (payload.length > 0) {
59             topicDiscoveredListener.receivedMessage(thing, connection, topic, payload);
60         } else {
61             topicDiscoveredListener.topicVanished(thing, connection, topic);
62         }
63     }
64
65     /**
66      * Subscribe to the topic
67      *
68      * @return Completes with true if successful. Completes with false if not connected yet. Exceptionally otherwise.
69      */
70     public CompletableFuture<Boolean> start() {
71         CompletableFuture<Boolean> startFuture = connection == null ? CompletableFuture.completedFuture(true)
72                 : connection.subscribe(topic, this);
73         isStarted = true;
74         return startFuture;
75     }
76
77     /**
78      * Unsubscribes from the topic
79      *
80      * @return Completes with true if successful. Exceptionally otherwise.
81      */
82     public CompletableFuture<Boolean> stop() {
83         CompletableFuture<Boolean> stopFuture = connection == null ? CompletableFuture.completedFuture(true)
84                 : connection.unsubscribe(topic, this);
85         isStarted = false;
86         return stopFuture;
87     }
88
89     /**
90      * status of this topic subscription
91      *
92      * @return true if started
93      */
94     public boolean isStarted() {
95         return isStarted;
96     }
97 }