]> git.basschouten.com Git - openhab-addons.git/commitdiff
[neohub] Improved Web-Socket Communications (#16312)
authorAndrew Fiddian-Green <software@whitebear.ch>
Fri, 15 Mar 2024 12:45:57 +0000 (12:45 +0000)
committerGitHub <noreply@github.com>
Fri, 15 Mar 2024 12:45:57 +0000 (13:45 +0100)
* [neohub] Improved WebSocket Communications
* [neohub] session recycled only by handler; not by socket class

---------

Signed-off-by: Andrew Fiddian-Green <software@whitebear.ch>
bundles/org.openhab.binding.neohub/src/main/java/org/openhab/binding/neohub/internal/NeoHubSocket.java
bundles/org.openhab.binding.neohub/src/main/java/org/openhab/binding/neohub/internal/NeoHubSocketBase.java
bundles/org.openhab.binding.neohub/src/main/java/org/openhab/binding/neohub/internal/NeoHubTlsTrustManagerProvider.java [new file with mode: 0644]
bundles/org.openhab.binding.neohub/src/main/java/org/openhab/binding/neohub/internal/NeoHubWebSocket.java

index a501d687b5ca6b7325a07c3b67a05b03e03fcd03..d0140337885324d49b991137cf99a569330c1d9a 100644 (file)
@@ -47,8 +47,7 @@ public class NeoHubSocket extends NeoHubSocketBase {
         IOException caughtException = null;
         StringBuilder builder = new StringBuilder();
 
-        throttle();
-        try (Socket socket = new Socket()) {
+        try (Socket socket = new Socket(); Throttler throttler = new Throttler();) {
             int port = config.portNumber > 0 ? config.portNumber : NeoHubBindingConstants.PORT_TCP;
             socket.connect(new InetSocketAddress(config.hostName, port), config.socketTimeout * 1000);
             socket.setSoTimeout(config.socketTimeout * 1000);
@@ -76,6 +75,8 @@ public class NeoHubSocket extends NeoHubSocketBase {
         } catch (IOException e) {
             // catch IOExceptions here, and save them to be re-thrown later
             caughtException = e;
+        } catch (InterruptedException e) {
+            caughtException = new IOException(e);
         }
 
         String responseJson = builder.toString().strip();
index 6fd883e428a16ecf9aafa882c777684cb1f3453b..69bea74bb562cdefa28341fb79df48c30c676175 100644 (file)
@@ -17,6 +17,8 @@ import java.io.IOException;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Optional;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.eclipse.jdt.annotation.NonNullByDefault;
 
@@ -33,6 +35,7 @@ public abstract class NeoHubSocketBase implements Closeable {
     protected final String hubId;
 
     private static final int REQUEST_INTERVAL_MILLISECS = 1000;
+    private final Lock lock = new ReentrantLock(true);
     private Optional<Instant> lastRequestTime = Optional.empty();
 
     public NeoHubSocketBase(NeoHubConfiguration config, String hubId) {
@@ -51,22 +54,31 @@ public abstract class NeoHubSocketBase implements Closeable {
     public abstract String sendMessage(final String requestJson) throws IOException, NeoHubException;
 
     /**
-     * Method for throttling requests to prevent overloading the hub.
+     * Class for throttling requests to prevent overloading the hub.
      * <p>
      * The NeoHub can get confused if, while it is uploading data to the cloud, it also receives too many local
      * requests, so this method throttles the requests to one per REQUEST_INTERVAL_MILLISECS maximum.
      *
-     * @throws NeoHubException if the wait is interrupted
+     * @throws InterruptedException if the wait is interrupted
      */
-    protected synchronized void throttle() throws NeoHubException {
-        try {
-            Instant now = Instant.now();
-            long delay = lastRequestTime
-                    .map(t -> Math.max(0, Duration.between(now, t).toMillis() + REQUEST_INTERVAL_MILLISECS)).orElse(0L);
-            lastRequestTime = Optional.of(now.plusMillis(delay));
+    protected class Throttler implements AutoCloseable {
+
+        public Throttler() throws InterruptedException {
+            lock.lock();
+            long delay;
+            synchronized (NeoHubSocketBase.this) {
+                Instant now = Instant.now();
+                delay = lastRequestTime
+                        .map(t -> Math.max(0, Duration.between(now, t).toMillis() + REQUEST_INTERVAL_MILLISECS))
+                        .orElse(0L);
+                lastRequestTime = Optional.of(now.plusMillis(delay));
+            }
             Thread.sleep(delay);
-        } catch (InterruptedException e) {
-            throw new NeoHubException("Throttle sleep interrupted", e);
+        }
+
+        @Override
+        public void close() {
+            lock.unlock();
         }
     }
 }
diff --git a/bundles/org.openhab.binding.neohub/src/main/java/org/openhab/binding/neohub/internal/NeoHubTlsTrustManagerProvider.java b/bundles/org.openhab.binding.neohub/src/main/java/org/openhab/binding/neohub/internal/NeoHubTlsTrustManagerProvider.java
new file mode 100644 (file)
index 0000000..4dae839
--- /dev/null
@@ -0,0 +1,45 @@
+/**
+ * Copyright (c) 2010-2024 Contributors to the openHAB project
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.openhab.binding.neohub.internal;
+
+import javax.net.ssl.X509ExtendedTrustManager;
+
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.openhab.core.io.net.http.TlsTrustManagerProvider;
+import org.openhab.core.io.net.http.TrustAllTrustManager;
+
+/**
+ * A {@link TlsTrustManagerProvider} implementation to validate the NeoHub web socket self signed certificate.
+ *
+ * @author Andrew Fiddian-Green - Initial contribution
+ */
+@NonNullByDefault
+public class NeoHubTlsTrustManagerProvider implements TlsTrustManagerProvider {
+
+    private final String fullHostName;
+
+    public NeoHubTlsTrustManagerProvider(NeoHubConfiguration config) {
+        fullHostName = String.format("%s:%d", config.hostName,
+                config.portNumber > 0 ? config.portNumber : NeoHubBindingConstants.PORT_WSS);
+    }
+
+    @Override
+    public String getHostName() {
+        return fullHostName;
+    }
+
+    @Override
+    public X509ExtendedTrustManager getTrustManager() {
+        return TrustAllTrustManager.getInstance();
+    }
+}
index 721e981e5f39590dbb31d0e60fa2211c92bac19b..dabe0b074270afe08a51b56501c5dad767387c34 100644 (file)
@@ -16,11 +16,12 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.time.Instant;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 
 import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.eclipse.jdt.annotation.Nullable;
-import org.eclipse.jetty.util.ssl.SslContextFactory;
 import org.eclipse.jetty.websocket.api.Session;
 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
@@ -28,9 +29,12 @@ import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
 import org.eclipse.jetty.websocket.api.annotations.WebSocket;
 import org.eclipse.jetty.websocket.client.WebSocketClient;
+import org.openhab.core.io.net.http.TlsTrustManagerProvider;
 import org.openhab.core.io.net.http.WebSocketFactory;
 import org.openhab.core.thing.ThingUID;
 import org.openhab.core.thing.util.ThingWebClientUtil;
+import org.osgi.framework.FrameworkUtil;
+import org.osgi.framework.ServiceRegistration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,10 +59,12 @@ public class NeoHubWebSocket extends NeoHubSocketBase {
     private final Logger logger = LoggerFactory.getLogger(NeoHubWebSocket.class);
     private final Gson gson = new Gson();
     private final WebSocketClient webSocketClient;
+    private final ServiceRegistration<?> trustManagerRegistration;
 
     private @Nullable Session session = null;
-    private String responseOuter = "";
-    private boolean responsePending;
+    private @Nullable IOException websocketException = null;
+    private List<String> responses = new CopyOnWriteArrayList<>();
+    private boolean closing;
 
     /**
      * DTO to receive and parse the response JSON.
@@ -78,16 +84,21 @@ public class NeoHubWebSocket extends NeoHubSocketBase {
             throws IOException {
         super(config, bridgeUID.getAsString());
 
-        SslContextFactory.Client sslContextFactory = new SslContextFactory.Client();
-        sslContextFactory.setTrustAll(true);
         String name = ThingWebClientUtil.buildWebClientConsumerName(bridgeUID, null);
-        webSocketClient = webSocketFactory.createWebSocketClient(name, sslContextFactory);
+        webSocketClient = webSocketFactory.createWebSocketClient(name);
         webSocketClient.setConnectTimeout(config.socketTimeout * 1000);
         try {
             webSocketClient.start();
         } catch (Exception e) {
             throw new IOException("Error starting Web Socket client", e);
         }
+        NeoHubTlsTrustManagerProvider trustManagerProvider = new NeoHubTlsTrustManagerProvider(config);
+        try {
+            trustManagerRegistration = FrameworkUtil.getBundle(getClass()).getBundleContext()
+                    .registerService(TlsTrustManagerProvider.class.getName(), trustManagerProvider, null);
+        } catch (IllegalStateException e) {
+            throw new IOException("Error registering trust manager", e);
+        }
     }
 
     /**
@@ -95,14 +106,13 @@ public class NeoHubWebSocket extends NeoHubSocketBase {
      *
      * @throws IOException if unable to open the web socket
      */
-    private void startSession() throws IOException {
+    private synchronized void startSession() throws IOException {
         Session session = this.session;
         if (session == null || !session.isOpen()) {
-            closeSession();
             try {
                 int port = config.portNumber > 0 ? config.portNumber : NeoHubBindingConstants.PORT_WSS;
                 URI uri = new URI(String.format("wss://%s:%d", config.hostName, port));
-                webSocketClient.connect(this, uri).get();
+                this.session = webSocketClient.connect(this, uri).get();
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
                 throw new IOException("Error starting session", e);
@@ -112,17 +122,6 @@ public class NeoHubWebSocket extends NeoHubSocketBase {
         }
     }
 
-    /**
-     * Close the web socket session.
-     */
-    private void closeSession() {
-        Session session = this.session;
-        this.session = null;
-        if (session != null) {
-            session.close();
-        }
-    }
-
     /**
      * Helper to escape the quote marks in a JSON string.
      *
@@ -154,85 +153,101 @@ public class NeoHubWebSocket extends NeoHubSocketBase {
     }
 
     @Override
-    public synchronized String sendMessage(final String requestJson) throws IOException, NeoHubException {
-        // start the session
-        startSession();
-
-        // session start failed
-        Session session = this.session;
-        if (session == null) {
-            throw new IOException("Session is null");
+    public String sendMessage(final String requestJson) throws IOException, NeoHubException {
+        if (!closing && websocketException != null) {
+            throw websocketException;
         }
 
-        // wrap the inner request in an outer request string
-        String requestOuter = String.format(REQUEST_OUTER,
-                jsonEscape(String.format(REQUEST_INNER, config.apiToken, jsonReplaceQuotes(requestJson))));
+        try (Throttler throttler = new Throttler()) {
+            // start the session
+            startSession();
 
-        // initialise the response
-        responseOuter = "";
-        responsePending = true;
+            // session start failed
+            Session session = this.session;
+            if (session == null) {
+                throw new IOException("Session is null");
+            }
 
-        IOException caughtException = null;
-        throttle();
-        try {
-            // send the request
-            logger.debug("hub '{}' sending characters:{}", hubId, requestOuter.length());
-            session.getRemote().sendString(requestOuter);
-            logger.trace("hub '{}' sent:{}", hubId, requestOuter);
+            // wrap the inner request in an outer request string
+            String requestOuter = String.format(REQUEST_OUTER,
+                    jsonEscape(String.format(REQUEST_INNER, config.apiToken, jsonReplaceQuotes(requestJson))));
 
-            // sleep and loop until we get a response, the socket is closed, or it times out
-            Instant timeout = Instant.now().plusSeconds(config.socketTimeout);
-            while (responsePending) {
-                try {
-                    Thread.sleep(SLEEP_MILLISECONDS);
-                    if (Instant.now().isAfter(timeout)) {
-                        throw new IOException("Read timed out");
+            IOException caughtException = null;
+            try {
+                // send the request
+                logger.debug("hub '{}' sending characters:{}", hubId, requestOuter.length());
+                session.getRemote().sendString(requestOuter);
+                logger.trace("hub '{}' sent:{}", hubId, requestOuter);
+
+                // sleep and loop until we get a response, the socket is closed, or it times out
+                Instant timeout = Instant.now().plusSeconds(config.socketTimeout);
+                while (!closing && websocketException == null && responses.isEmpty()) {
+                    try {
+                        Thread.sleep(SLEEP_MILLISECONDS);
+                        if (Instant.now().isAfter(timeout)) {
+                            throw new IOException("Read timed out");
+                        }
+                    } catch (InterruptedException e) {
+                        throw new IOException("Read interrupted", e);
                     }
-                } catch (InterruptedException e) {
-                    throw new IOException("Read interrupted", e);
                 }
+            } catch (IOException e) {
+                caughtException = e;
             }
-        } catch (IOException e) {
-            caughtException = e;
-        }
 
-        caughtException = caughtException != null ? caughtException
-                : this.session == null ? new IOException("WebSocket session closed") : null;
+            String responseOuter = !responses.isEmpty() ? responses.remove(0) : "";
+            logger.debug("hub '{}' received characters:{}", hubId, responseOuter.length());
+            logger.trace("hub '{}' received:{}", hubId, responseOuter);
 
-        logger.debug("hub '{}' received characters:{}", hubId, responseOuter.length());
-        logger.trace("hub '{}' received:{}", hubId, responseOuter);
-
-        // if an IOException was caught above, re-throw it again
-        if (caughtException != null) {
-            throw caughtException;
-        }
-
-        try {
-            Response responseDto = gson.fromJson(responseOuter, Response.class);
-            if (responseDto == null) {
-                throw new JsonSyntaxException("Response DTO is invalid");
-            }
-            if (!NeoHubBindingConstants.HM_SET_COMMAND_RESPONSE.equals(responseDto.message_type)) {
-                throw new JsonSyntaxException("DTO 'message_type' field is invalid");
+            // don't throw an exception if already closing
+            if (closing) {
+                return "{}";
             }
-            String responseJson = responseDto.response;
-            if (responseJson == null) {
-                throw new JsonSyntaxException("DTO 'response' field is null");
+
+            // if an IOException was caught above, re-throw it again
+            caughtException = websocketException != null ? websocketException : caughtException;
+            if (caughtException != null) {
+                throw caughtException;
             }
-            responseJson = jsonUnEscape(responseJson).strip();
-            if (!JsonParser.parseString(responseJson).isJsonObject()) {
-                throw new JsonSyntaxException("DTO 'response' field is not a JSON object");
+
+            try {
+                Response responseDto = gson.fromJson(responseOuter, Response.class);
+                if (responseDto == null) {
+                    throw new JsonSyntaxException("Response DTO is invalid");
+                }
+                if (!NeoHubBindingConstants.HM_SET_COMMAND_RESPONSE.equals(responseDto.message_type)) {
+                    throw new JsonSyntaxException("DTO 'message_type' field is invalid");
+                }
+                String responseJson = responseDto.response;
+                if (responseJson == null) {
+                    throw new JsonSyntaxException("DTO 'response' field is null");
+                }
+                responseJson = jsonUnEscape(responseJson).strip();
+                if (!JsonParser.parseString(responseJson).isJsonObject()) {
+                    throw new JsonSyntaxException("DTO 'response' field is not a JSON object");
+                }
+                return responseJson;
+            } catch (JsonSyntaxException e) {
+                logger.debug("hub '{}' {}; response:{}", hubId, e.getMessage(), responseOuter);
+                throw new NeoHubException("Invalid response");
             }
-            return responseJson;
-        } catch (JsonSyntaxException e) {
-            logger.debug("hub '{}' {}; response:{}", hubId, e.getMessage(), responseOuter);
-            throw new NeoHubException("Invalid response");
+        } catch (InterruptedException e) {
+            throw new NeoHubException("Throttler was interrupted unexpectedly");
         }
     }
 
     @Override
     public void close() {
-        closeSession();
+        closing = true;
+        Session session = this.session;
+        if (session != null) {
+            session.close();
+            this.session = null;
+        }
+        try {
+            trustManagerRegistration.unregister();
+        } catch (Exception e) {
+        }
         try {
             webSocketClient.stop();
         } catch (Exception e) {
@@ -242,25 +257,29 @@ public class NeoHubWebSocket extends NeoHubSocketBase {
     @OnWebSocketConnect
     public void onConnect(Session session) {
         logger.debug("hub '{}' onConnect() ok", hubId);
-        this.session = session;
     }
 
     @OnWebSocketClose
     public void onClose(int statusCode, String reason) {
-        logger.debug("hub '{}' onClose() statusCode:{}, reason:{}", hubId, statusCode, reason);
-        responsePending = false;
-        this.session = null;
+        String closeMessage = String.format("onClose() statusCode:%d, reason:%s", statusCode, reason);
+        logger.debug("hub '{}' {}", hubId, closeMessage);
+        websocketException = new IOException(closeMessage);
     }
 
     @OnWebSocketError
-    public void onError(Throwable cause) {
-        logger.debug("hub '{}' onError() cause:{}", hubId, cause.getMessage());
-        closeSession();
+    public void onError(@Nullable Throwable cause) {
+        logger.debug("hub '{}' onError() cause:{}", hubId, cause != null ? cause.getMessage() : "null");
+        websocketException = cause instanceof IOException ioCause ? ioCause : new IOException(cause);
     }
 
     @OnWebSocketMessage
-    public void onMessage(String msg) {
-        responseOuter = msg.strip();
-        responsePending = false;
+    public synchronized void onMessage(String msg) {
+        int responseCount = responses.size();
+        if (responseCount > 0) {
+            String errorMessage = String.format("onMessage() too many responses:%d", responseCount);
+            logger.debug("hub '{}' {}", hubId, errorMessage);
+            websocketException = new IOException(errorMessage);
+        }
+        responses.add(msg.strip());
     }
 }