]> git.basschouten.com Git - openhab-addons.git/blob
f55b4389ac72c12a4b01d46c5d90216bd544c063
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.io.mqttembeddedbroker.internal;
14
15 import java.io.IOException;
16 import java.net.InetSocketAddress;
17 import java.net.Socket;
18 import java.net.SocketAddress;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.concurrent.ScheduledFuture;
21 import java.util.concurrent.TimeUnit;
22
23 import org.eclipse.jdt.annotation.NonNullByDefault;
24 import org.eclipse.jdt.annotation.Nullable;
25
26 import io.moquette.broker.Server;
27
28 /**
29  * Unfortunately there is no listener interface for the Moquette MQTT Broker
30  * to get notified when it is started and ready to accept connections.
31  * We therefore try to connect to the socket with a Socket object until a timeout is reached.
32  *
33  * @author David Graeff - Inital contriution
34  */
35 @NonNullByDefault
36 public class MqttEmbeddedBrokerDetectStart {
37     protected @Nullable Server server;
38     protected final MqttEmbeddedBrokerStartedListener startedListener;
39     protected long startTime;
40     protected int port;
41     protected int timeout = 2000;
42     protected @Nullable ScheduledExecutorService scheduler;
43     protected @Nullable ScheduledFuture<?> schedule;
44
45     /**
46      * Implement this interface to be notified if a connection to the given tcp port can be established.
47      */
48     public static interface MqttEmbeddedBrokerStartedListener {
49         public void mqttEmbeddedBrokerStarted(boolean timeout);
50     }
51
52     /**
53      * Registers the given listener. Start with {@link #startBrokerStartedDetection(int, ScheduledExecutorService)}.
54      *
55      * @param startedListener A listener
56      */
57     public MqttEmbeddedBrokerDetectStart(MqttEmbeddedBrokerStartedListener startedListener) {
58         this.startedListener = startedListener;
59     }
60
61     /**
62      * Performs a tcp socket open/close process. Will notify the registered listener on success
63      * and retry until a timeout is reached otherwise.
64      */
65     protected void servicePing() {
66         ScheduledExecutorService scheduler = this.scheduler;
67         if (scheduler == null) {
68             return;
69         }
70
71         try {
72             SocketAddress socketAddress = new InetSocketAddress("127.0.0.1", port);
73             Socket socket = new Socket();
74             socket.connect(socketAddress, 500);
75             socket.close();
76             schedule = null;
77             startedListener.mqttEmbeddedBrokerStarted(false);
78             return;
79         } catch (IOException ignored) {
80         }
81         if (System.currentTimeMillis() - startTime < timeout) {
82             schedule = scheduler.schedule(() -> servicePing(), 100, TimeUnit.MILLISECONDS);
83         } else {
84             startedListener.mqttEmbeddedBrokerStarted(true);
85         }
86     }
87
88     /**
89      * Start the broker server reachable detection
90      *
91      * @param port The Mqtt Server port
92      * @param scheduler A scheduler
93      */
94     public void startBrokerStartedDetection(int port, ScheduledExecutorService scheduler) {
95         this.port = port;
96         this.scheduler = scheduler;
97         this.startTime = System.currentTimeMillis();
98         this.schedule = null;
99         servicePing();
100     }
101
102     /**
103      * Stops the broker server reachable detection if it is still running.
104      */
105     public void stopBrokerStartDetection() {
106         if (schedule != null) {
107             schedule.cancel(true);
108             schedule = null;
109         }
110     }
111 }