2 * Copyright (c) 2010-2020 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.io.mqttembeddedbroker.internal;
15 import java.io.IOException;
16 import java.net.InetSocketAddress;
17 import java.net.Socket;
18 import java.net.SocketAddress;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.concurrent.ScheduledFuture;
21 import java.util.concurrent.TimeUnit;
23 import org.eclipse.jdt.annotation.NonNullByDefault;
24 import org.eclipse.jdt.annotation.Nullable;
26 import io.moquette.broker.Server;
29 * Unfortunately there is no listener interface for the Moquette MQTT Broker
30 * to get notified when it is started and ready to accept connections.
31 * We therefore try to connect to the socket with a Socket object until a timeout is reached.
33 * @author David Graeff - Inital contriution
36 public class MqttEmbeddedBrokerDetectStart {
37 protected @Nullable Server server;
38 protected final MqttEmbeddedBrokerStartedListener startedListener;
39 protected long startTime;
41 protected int timeout = 2000;
42 protected @Nullable ScheduledExecutorService scheduler;
43 protected @Nullable ScheduledFuture<?> schedule;
46 * Implement this interface to be notified if a connection to the given tcp port can be established.
48 public static interface MqttEmbeddedBrokerStartedListener {
49 public void mqttEmbeddedBrokerStarted(boolean timeout);
53 * Registers the given listener. Start with {@link #startBrokerStartedDetection(int, ScheduledExecutorService)}.
55 * @param startedListener A listener
57 public MqttEmbeddedBrokerDetectStart(MqttEmbeddedBrokerStartedListener startedListener) {
58 this.startedListener = startedListener;
62 * Performs a tcp socket open/close process. Will notify the registered listener on success
63 * and retry until a timeout is reached otherwise.
65 protected void servicePing() {
66 ScheduledExecutorService scheduler = this.scheduler;
67 if (scheduler == null) {
72 SocketAddress socketAddress = new InetSocketAddress("127.0.0.1", port);
73 Socket socket = new Socket();
74 socket.connect(socketAddress, 500);
77 startedListener.mqttEmbeddedBrokerStarted(false);
79 } catch (IOException ignored) {
81 if (System.currentTimeMillis() - startTime < timeout) {
82 schedule = scheduler.schedule(() -> servicePing(), 100, TimeUnit.MILLISECONDS);
84 startedListener.mqttEmbeddedBrokerStarted(true);
89 * Start the broker server reachable detection
91 * @param port The Mqtt Server port
92 * @param scheduler A scheduler
94 public void startBrokerStartedDetection(int port, ScheduledExecutorService scheduler) {
96 this.scheduler = scheduler;
97 this.startTime = System.currentTimeMillis();
103 * Stops the broker server reachable detection if it is still running.
105 public void stopBrokerStartDetection() {
106 if (schedule != null) {
107 schedule.cancel(true);