2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.mqtt.generic.tools;
15 import java.nio.charset.StandardCharsets;
16 import java.util.concurrent.CompletableFuture;
17 import java.util.concurrent.ExecutionException;
18 import java.util.concurrent.ScheduledExecutorService;
19 import java.util.concurrent.TimeUnit;
20 import java.util.concurrent.TimeoutException;
22 import org.eclipse.jdt.annotation.NonNullByDefault;
23 import org.eclipse.jdt.annotation.Nullable;
24 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
25 import org.openhab.core.io.transport.mqtt.MqttMessageSubscriber;
28 * Waits for a topic value to appear on a MQTT topic. One-time usable only per instance.
30 * @author David Graeff - Initial contribution
33 public class WaitForTopicValue {
34 private final CompletableFuture<String> composeFuture;
37 * Creates an instance.
39 * @param connection A broker connection.
40 * @param topic The topic
42 public WaitForTopicValue(MqttBrokerConnection connection, String topic) {
43 final CompletableFuture<String> future = new CompletableFuture<>();
44 final MqttMessageSubscriber mqttMessageSubscriber = (t, payload) -> {
45 future.complete(new String(payload, StandardCharsets.UTF_8));
47 future.whenComplete((r, e) -> {
48 connection.unsubscribe(topic, mqttMessageSubscriber);
51 composeFuture = connection.subscribe(topic, mqttMessageSubscriber).thenCompose(b -> future);
58 composeFuture.completeExceptionally(new Exception("Stopped"));
62 * Wait for the value to appear on the MQTT broker.
64 * @param timeoutInMS Maximum time in milliseconds to wait for the value.
65 * @return Return the value or null if timed out.
67 public @Nullable String waitForTopicValue(int timeoutInMS) {
69 return composeFuture.get(timeoutInMS, TimeUnit.MILLISECONDS);
70 } catch (InterruptedException | ExecutionException | TimeoutException e) {
75 private void timeout() {
76 if (!composeFuture.isDone()) {
77 composeFuture.completeExceptionally(new TimeoutException());
82 * Return a future that completes successfully with a topic value or fails exceptionally with a timeout exception.
84 * @param scheduler A scheduler for the timeout
85 * @param timeoutInMS The timeout in milliseconds
88 public CompletableFuture<String> waitForTopicValueAsync(ScheduledExecutorService scheduler, int timeoutInMS) {
89 scheduler.schedule(this::timeout, timeoutInMS, TimeUnit.MILLISECONDS);