]> git.basschouten.com Git - openhab-addons.git/blob
67da06e2fb0439f4cb48ab783c52c1266da494be
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.generic.tools;
14
15 import java.nio.charset.StandardCharsets;
16 import java.util.concurrent.CompletableFuture;
17 import java.util.concurrent.ExecutionException;
18 import java.util.concurrent.ScheduledExecutorService;
19 import java.util.concurrent.TimeUnit;
20 import java.util.concurrent.TimeoutException;
21
22 import org.eclipse.jdt.annotation.NonNullByDefault;
23 import org.eclipse.jdt.annotation.Nullable;
24 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
25 import org.openhab.core.io.transport.mqtt.MqttMessageSubscriber;
26
27 /**
28  * Waits for a topic value to appear on a MQTT topic. One-time usable only per instance.
29  *
30  * @author David Graeff - Initial contribution
31  */
32 @NonNullByDefault
33 public class WaitForTopicValue {
34     private final CompletableFuture<String> composeFuture;
35
36     /**
37      * Creates an instance.
38      *
39      * @param connection A broker connection.
40      * @param topic The topic
41      */
42     public WaitForTopicValue(MqttBrokerConnection connection, String topic) {
43         final CompletableFuture<String> future = new CompletableFuture<>();
44         final MqttMessageSubscriber mqttMessageSubscriber = (t, payload) -> {
45             future.complete(new String(payload, StandardCharsets.UTF_8));
46         };
47         future.whenComplete((r, e) -> {
48             connection.unsubscribe(topic, mqttMessageSubscriber);
49         });
50
51         composeFuture = connection.subscribe(topic, mqttMessageSubscriber).thenCompose(b -> future);
52     }
53
54     /**
55      * Free any resources
56      */
57     public void stop() {
58         composeFuture.completeExceptionally(new Exception("Stopped"));
59     }
60
61     /**
62      * Wait for the value to appear on the MQTT broker.
63      *
64      * @param timeoutInMS Maximum time in milliseconds to wait for the value.
65      * @return Return the value or null if timed out.
66      */
67     public @Nullable String waitForTopicValue(int timeoutInMS) {
68         try {
69             return composeFuture.get(timeoutInMS, TimeUnit.MILLISECONDS);
70         } catch (InterruptedException | ExecutionException | TimeoutException e) {
71             return null;
72         }
73     }
74
75     private void timeout() {
76         if (!composeFuture.isDone()) {
77             composeFuture.completeExceptionally(new TimeoutException());
78         }
79     }
80
81     /**
82      * Return a future that completes successfully with a topic value or fails exceptionally with a timeout exception.
83      *
84      * @param scheduler A scheduler for the timeout
85      * @param timeoutInMS The timeout in milliseconds
86      * @return The future
87      */
88     public CompletableFuture<String> waitForTopicValueAsync(ScheduledExecutorService scheduler, int timeoutInMS) {
89         scheduler.schedule(this::timeout, timeoutInMS, TimeUnit.MILLISECONDS);
90         return composeFuture;
91     }
92 }