]> git.basschouten.com Git - openhab-addons.git/commitdiff
[openhabcloud] Reconnection Fixes (#14251)
authorDan Cunningham <dan@digitaldan.com>
Sun, 29 Jan 2023 09:21:54 +0000 (01:21 -0800)
committerGitHub <noreply@github.com>
Sun, 29 Jan 2023 09:21:54 +0000 (10:21 +0100)
* [openhabcloud] Possible connection leak
* Creates thread safe reconnection, reduces unnecessary polling on setup, removes unused variables.
* adds the reconnect settings to the internal socket.io options.
* Up the min reconnect time
* Use @ssalonen sugestion for backoff mins and randomness
* Reconnect after server initiated disconnect
* Remove unhelpful comments

Signed-off-by: Dan Cunningham <dan@digitaldan.com>
bundles/org.openhab.io.openhabcloud/src/main/java/org/openhab/io/openhabcloud/internal/CloudClient.java
bundles/org.openhab.io.openhabcloud/src/main/java/org/openhab/io/openhabcloud/internal/CloudService.java

index c3cd5136e4511e61577d4a4c614e3828eb260cc9..6e2fbc63d805511d4aee868f0512adeb56d768d4 100644 (file)
@@ -22,10 +22,13 @@ import java.nio.charset.StandardCharsets;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.eclipse.jdt.annotation.Nullable;
 import org.eclipse.jetty.client.HttpClient;
@@ -51,6 +54,7 @@ import io.socket.client.Manager;
 import io.socket.client.Socket;
 import io.socket.emitter.Emitter;
 import io.socket.engineio.client.Transport;
+import io.socket.engineio.client.transports.WebSocket;
 import io.socket.parser.Packet;
 import io.socket.parser.Parser;
 import okhttp3.OkHttpClient.Builder;
@@ -68,6 +72,15 @@ import okhttp3.logging.HttpLoggingInterceptor.Level;
  * @author Kai Kreuzer - migrated code to new Jetty client and ESH APIs
  */
 public class CloudClient {
+
+    private static final long RECONNECT_MIN = 2_000;
+
+    private static final long RECONNECT_MAX = 60_000;
+
+    private static final double RECONNECT_JITTER = 0.75;
+
+    private static final long READ_TIMEOUT = 60_0000;
+
     /*
      * Logger for this class
      */
@@ -108,11 +121,6 @@ public class CloudClient {
      */
     private boolean isConnected;
 
-    /*
-     * This variable holds version of local openHAB
-     */
-    private String openHABVersion;
-
     /*
      * This variable holds instance of Socket.IO client class which provides communication
      * with the openHAB Cloud
@@ -139,11 +147,15 @@ public class CloudClient {
 
     /*
      * Delay reconnect scheduler pool
-     * 
+     *
      */
     protected final ScheduledExecutorService scheduler = ThreadPoolManager
             .getScheduledPool(ThreadPoolManager.THREAD_POOL_NAME_COMMON);
 
+    @SuppressWarnings("null")
+    private final AtomicReference<Optional<ScheduledFuture<?>>> reconnectFuture = new AtomicReference<>(
+            Optional.empty());
+
     /**
      * Constructor of CloudClient
      *
@@ -161,9 +173,9 @@ public class CloudClient {
         this.remoteAccessEnabled = remoteAccessEnabled;
         this.exposedItems = exposedItems;
         this.jettyClient = httpClient;
-        reconnectBackoff.setMin(1000);
-        reconnectBackoff.setMax(30_000);
-        reconnectBackoff.setJitter(0.5);
+        reconnectBackoff.setMin(RECONNECT_MIN);
+        reconnectBackoff.setMax(RECONNECT_MAX);
+        reconnectBackoff.setJitter(RECONNECT_JITTER);
     }
 
     /**
@@ -173,17 +185,25 @@ public class CloudClient {
     public void connect() {
         try {
             Options options = new Options();
+            options.transports = new String[] { WebSocket.NAME };
+            options.reconnection = true;
+            options.reconnectionAttempts = Integer.MAX_VALUE;
+            options.reconnectionDelay = RECONNECT_MIN;
+            options.reconnectionDelayMax = RECONNECT_MAX;
+            options.randomizationFactor = RECONNECT_JITTER;
+            options.timeout = READ_TIMEOUT;
+            Builder okHttpBuilder = new Builder();
+            okHttpBuilder.readTimeout(READ_TIMEOUT, TimeUnit.MILLISECONDS);
             if (logger.isTraceEnabled()) {
                 // When trace level logging is enabled, we activate further logging of HTTP calls
                 // of the Socket.IO library
-                Builder okHttpBuilder = new Builder();
                 HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor();
                 loggingInterceptor.setLevel(Level.BASIC);
                 okHttpBuilder.addInterceptor(loggingInterceptor);
                 okHttpBuilder.addNetworkInterceptor(loggingInterceptor);
-                options.callFactory = okHttpBuilder.build();
-                options.webSocketFactory = okHttpBuilder.build();
             }
+            options.callFactory = okHttpBuilder.build();
+            options.webSocketFactory = okHttpBuilder.build();
             socket = IO.socket(baseURL, options);
             URL parsed = new URL(baseURL);
             protocol = parsed.getProtocol();
@@ -273,13 +293,17 @@ public class CloudClient {
                 .on(Socket.EVENT_RECONNECT_FAILED,
                         args -> logger.debug("Socket.IO re-connect attempts failed. Stopping reconnection."))//
                 .on(Socket.EVENT_DISCONNECT, args -> {
-                    if (args.length > 0) {
-                        logger.warn("Socket.IO disconnected: {}", args[0]);
-                    } else {
-                        logger.warn("Socket.IO disconnected");
-                    }
+                    String message = args.length > 0 ? args[0].toString() : "";
+                    logger.warn("Socket.IO disconnected: {}", message);
                     isConnected = false;
                     onDisconnect();
+                    // https://github.com/socketio/socket.io-client/commit/afb952d854e1d8728ce07b7c3a9f0dee2a61ef4e
+                    if ("io server disconnect".equals(message)) {
+                        socket.close();
+                        long delay = reconnectBackoff.duration();
+                        logger.warn("Reconnecting after {} ms.", delay);
+                        scheduleReconnect(delay);
+                    }
                 })//
                 .on(Socket.EVENT_ERROR, args -> {
                     if (CloudClient.this.socket.connected()) {
@@ -325,12 +349,7 @@ public class CloudClient {
                             logger.warn("Error connecting to the openHAB Cloud instance. Reconnecting.");
                         }
                         socket.close();
-                        scheduler.schedule(new Runnable() {
-                            @Override
-                            public void run() {
-                                socket.connect();
-                            }
-                        }, delay, TimeUnit.MILLISECONDS);
+                        scheduleReconnect(delay);
                     }
                 })//
 
@@ -671,21 +690,23 @@ public class CloudClient {
      */
     public void shutdown() {
         logger.info("Shutting down openHAB Cloud service connection");
+        reconnectFuture.get().ifPresent(future -> future.cancel(true));
         socket.disconnect();
     }
 
-    public String getOpenHABVersion() {
-        return openHABVersion;
-    }
-
-    public void setOpenHABVersion(String openHABVersion) {
-        this.openHABVersion = openHABVersion;
-    }
-
     public void setListener(CloudClientListener listener) {
         this.listener = listener;
     }
 
+    private void scheduleReconnect(long delay) {
+        reconnectFuture.getAndSet(Optional.of(scheduler.schedule(new Runnable() {
+            @Override
+            public void run() {
+                socket.connect();
+            }
+        }, delay, TimeUnit.MILLISECONDS))).ifPresent(future -> future.cancel(true));
+    }
+
     private JSONObject getJSONHeaders(HttpFields httpFields) {
         JSONObject headersJSON = new JSONObject();
         try {
index f9ef62542e8603371768dce3a65ece95cd94efbc..5114ff8bb7802ac24cd2b00c99bb5652bdd82de1 100644 (file)
@@ -247,7 +247,6 @@ public class CloudService implements ActionService, CloudClientListener, EventSu
         String localBaseUrl = "http://localhost:" + localPort;
         cloudClient = new CloudClient(httpClient, InstanceUUID.get(), getSecret(), cloudBaseUrl, localBaseUrl,
                 remoteAccessEnabled, exposedItems);
-        cloudClient.setOpenHABVersion(OpenHAB.getVersion());
         cloudClient.connect();
         cloudClient.setListener(this);
         NotificationAction.cloudService = this;