]> git.basschouten.com Git - openhab-addons.git/commitdiff
[deconz] refactoring and fix connection issues (#9614)
authorJ-N-K <J-N-K@users.noreply.github.com>
Thu, 31 Dec 2020 14:03:57 +0000 (15:03 +0100)
committerGitHub <noreply@github.com>
Thu, 31 Dec 2020 14:03:57 +0000 (15:03 +0100)
* fix connection attempts when thing disposed

Signed-off-by: Jan N. Klug <jan.n.klug@rub.de>
* refactor session handling

Signed-off-by: Jan N. Klug <jan.n.klug@rub.de>
* rename

Signed-off-by: Jan N. Klug <jan.n.klug@rub.de>
* refactoring

Signed-off-by: Jan N. Klug <jan.n.klug@rub.de>
bundles/org.openhab.binding.deconz/src/main/java/org/openhab/binding/deconz/internal/handler/DeconzBaseThingHandler.java
bundles/org.openhab.binding.deconz/src/main/java/org/openhab/binding/deconz/internal/handler/DeconzBridgeHandler.java
bundles/org.openhab.binding.deconz/src/main/java/org/openhab/binding/deconz/internal/netutils/WebSocketConnection.java
bundles/org.openhab.binding.deconz/src/main/java/org/openhab/binding/deconz/internal/netutils/WebSocketConnectionListener.java

index 200b811bb6c39541cd6f4fe43d65008d903d9f27..26285db53ef71a9eb1a3d102c506caf286538eb9 100644 (file)
  */
 package org.openhab.binding.deconz.internal.handler;
 
-import static org.openhab.binding.deconz.internal.Util.buildUrl;
-
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.eclipse.jdt.annotation.Nullable;
 import org.openhab.binding.deconz.internal.dto.DeconzBaseMessage;
-import org.openhab.binding.deconz.internal.netutils.AsyncHttpClient;
 import org.openhab.binding.deconz.internal.netutils.WebSocketConnection;
 import org.openhab.binding.deconz.internal.netutils.WebSocketMessageListener;
 import org.openhab.binding.deconz.internal.types.ResourceType;
@@ -47,11 +46,9 @@ public abstract class DeconzBaseThingHandler extends BaseThingHandler implements
     private final Logger logger = LoggerFactory.getLogger(DeconzBaseThingHandler.class);
     protected final ResourceType resourceType;
     protected ThingConfig config = new ThingConfig();
-    protected DeconzBridgeConfig bridgeConfig = new DeconzBridgeConfig();
     protected final Gson gson;
     private @Nullable ScheduledFuture<?> initializationJob;
     protected @Nullable WebSocketConnection connection;
-    protected @Nullable AsyncHttpClient http;
 
     public DeconzBaseThingHandler(Thing thing, Gson gson, ResourceType resourceType) {
         super(thing);
@@ -111,8 +108,6 @@ public abstract class DeconzBaseThingHandler extends BaseThingHandler implements
 
             final WebSocketConnection webSocketConnection = bridgeHandler.getWebsocketConnection();
             this.connection = webSocketConnection;
-            this.http = bridgeHandler.getHttp();
-            this.bridgeConfig = bridgeHandler.getBridgeConfig();
 
             updateStatus(ThingStatus.UNKNOWN, ThingStatusDetail.NONE);
 
@@ -182,17 +177,14 @@ public abstract class DeconzBaseThingHandler extends BaseThingHandler implements
      */
     protected void sendCommand(@Nullable Object object, Command originalCommand, ChannelUID channelUID,
             String commandUrl, @Nullable Runnable acceptProcessing) {
-        AsyncHttpClient asyncHttpClient = http;
-        if (asyncHttpClient == null) {
+        DeconzBridgeHandler bridgeHandler = getBridgeHandler();
+        if (bridgeHandler == null) {
             return;
         }
-        String url = buildUrl(bridgeConfig.host, bridgeConfig.httpPort, bridgeConfig.apikey,
-                resourceType.getIdentifier(), config.id, commandUrl);
-
-        String json = object == null ? null : gson.toJson(object);
-        logger.trace("Sending {} to {} {} via {}", json, resourceType, config.id, url);
+        String endpoint = Stream.of(resourceType.getIdentifier(), config.id, commandUrl)
+                .collect(Collectors.joining("/"));
 
-        asyncHttpClient.put(url, json, bridgeConfig.timeout).thenAccept(v -> {
+        bridgeHandler.sendObject(endpoint, object).thenAccept(v -> {
             if (acceptProcessing != null) {
                 acceptProcessing.run();
             }
index 7a4b1633df74fab61aa09d2586ab0b5c69d109e0..2a16728135e9f2168a13c5fdbdd93abace087a39 100644 (file)
@@ -70,7 +70,7 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC
     private int websocketPort = 0;
     /** Prevent a dispose/init cycle while this flag is set. Use for property updates */
     private boolean ignoreConfigurationUpdate;
-    private boolean websocketReconnect = false;
+    private boolean thingDisposing = false;
 
     private final ExpiringCacheAsync<Optional<BridgeFullState>> fullStateCache = new ExpiringCacheAsync<>(1000);
 
@@ -92,7 +92,7 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC
 
     @Override
     public Collection<Class<? extends ThingHandlerService>> getServices() {
-        return Collections.singleton(ThingDiscoveryService.class);
+        return Set.of(ThingDiscoveryService.class);
     }
 
     @Override
@@ -123,11 +123,15 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC
      * @param r The response
      */
     private void parseAPIKeyResponse(AsyncHttpClient.Result r) {
+        if (thingDisposing) {
+            // discard response if thing handler is already disposing
+            return;
+        }
         if (r.getResponseCode() == 403) {
             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_PENDING,
                     "Allow authentication for 3rd party apps. Trying again in " + POLL_FREQUENCY_SEC + " seconds");
             stopTimer();
-            scheduledFuture = scheduler.schedule(() -> requestApiKey(), POLL_FREQUENCY_SEC, TimeUnit.SECONDS);
+            scheduledFuture = scheduler.schedule(this::requestApiKey, POLL_FREQUENCY_SEC, TimeUnit.SECONDS);
         } else if (r.getResponseCode() == 200) {
             ApiKeyMessage[] response = Objects.requireNonNull(gson.fromJson(r.getBody(), ApiKeyMessage[].class));
             if (response.length == 0) {
@@ -160,7 +164,7 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC
      */
     private CompletableFuture<Optional<BridgeFullState>> refreshFullStateCache() {
         logger.trace("{} starts refreshing the fullStateCache", thing.getUID());
-        if (config.apikey == null) {
+        if (config.apikey == null || thingDisposing) {
             return CompletableFuture.completedFuture(Optional.empty());
         }
         String url = buildUrl(config.getHostWithoutPort(), config.httpPort, config.apikey);
@@ -191,6 +195,10 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC
      */
     public void initializeBridgeState() {
         getBridgeFullState().thenAccept(fullState -> fullState.ifPresentOrElse(state -> {
+            if (thingDisposing) {
+                // discard response if thing handler is already disposing
+                return;
+            }
             if (state.config.name.isEmpty()) {
                 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE,
                         "You are connected to a HUE bridge, not a deCONZ software!");
@@ -216,11 +224,12 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC
 
             // Use requested websocket port if no specific port is given
             websocketPort = config.port == 0 ? state.config.websocketport : config.port;
-            websocketReconnect = true;
             startWebsocket();
         }, () -> {
             // initial response was empty, re-trying in POLL_FREQUENCY_SEC seconds
-            scheduledFuture = scheduler.schedule(this::initializeBridgeState, POLL_FREQUENCY_SEC, TimeUnit.SECONDS);
+            if (!thingDisposing) {
+                scheduledFuture = scheduler.schedule(this::initializeBridgeState, POLL_FREQUENCY_SEC, TimeUnit.SECONDS);
+            }
         })).exceptionally(e -> {
             if (e != null) {
                 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE, e.getMessage());
@@ -237,7 +246,7 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC
      * {@link #initializeBridgeState} need to be called first to obtain the websocket port.
      */
     private void startWebsocket() {
-        if (websocket.isConnected() || websocketPort == 0 || websocketReconnect == false) {
+        if (websocket.isConnected() || websocketPort == 0 || thingDisposing) {
             return;
         }
 
@@ -251,11 +260,11 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC
      * Perform a request to the REST API for generating an API key.
      *
      */
-    private CompletableFuture<?> requestApiKey() {
+    private void requestApiKey() {
         updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_PENDING, "Requesting API Key");
         stopTimer();
         String url = buildUrl(config.getHostWithoutPort(), config.httpPort);
-        return http.post(url, "{\"devicetype\":\"openHAB\"}", config.timeout).thenAccept(this::parseAPIKeyResponse)
+        http.post(url, "{\"devicetype\":\"openHAB\"}", config.timeout).thenAccept(this::parseAPIKeyResponse)
                 .exceptionally(e -> {
                     updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, e.getMessage());
                     logger.warn("Authorisation failed", e);
@@ -265,7 +274,8 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC
 
     @Override
     public void initialize() {
-        logger.debug("Start initializing!");
+        logger.debug("Start initializing bridge {}", thing.getUID());
+        thingDisposing = false;
         config = getConfigAs(DeconzBridgeConfig.class);
         if (config.apikey == null) {
             requestApiKey();
@@ -276,23 +286,11 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC
 
     @Override
     public void dispose() {
-        websocketReconnect = false;
+        thingDisposing = true;
         stopTimer();
         websocket.close();
     }
 
-    @Override
-    public void connectionError(@Nullable Throwable e) {
-        if (e != null) {
-            updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, e.getMessage());
-        } else {
-            updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, "Unknown reason");
-        }
-        stopTimer();
-        // Wait for POLL_FREQUENCY_SEC after a connection error before trying again
-        scheduledFuture = scheduler.schedule(this::startWebsocket, POLL_FREQUENCY_SEC, TimeUnit.SECONDS);
-    }
-
     @Override
     public void connectionEstablished() {
         stopTimer();
@@ -302,7 +300,10 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC
     @Override
     public void connectionLost(String reason) {
         updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, reason);
-        startWebsocket();
+
+        stopTimer();
+        // Wait for POLL_FREQUENCY_SEC after a connection was closed before trying again
+        scheduledFuture = scheduler.schedule(this::startWebsocket, POLL_FREQUENCY_SEC, TimeUnit.SECONDS);
     }
 
     /**
@@ -313,16 +314,17 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC
     }
 
     /**
-     * Return the http connection.
+     * Send an object to the gateway
+     *
+     * @param endPoint the endpoint (e.g. "lights/2/state")
+     * @param object the object (or null if no object)
+     * @return CompletableFuture of the result
      */
-    public AsyncHttpClient getHttp() {
-        return http;
-    }
+    public CompletableFuture<AsyncHttpClient.Result> sendObject(String endPoint, @Nullable Object object) {
+        String json = object == null ? null : gson.toJson(object);
+        String url = buildUrl(config.host, config.httpPort, config.apikey, endPoint);
+        logger.trace("Sending {} via {}", json, url);
 
-    /**
-     * Return the bridge configuration.
-     */
-    public DeconzBridgeConfig getBridgeConfig() {
-        return config;
+        return http.put(url, json, config.timeout);
     }
 }
index 4d0fb4c4e1494ffa95092ef0968222b202a9745d..7bc096e7a38961557169f4eabb30578a8385c5ff 100644 (file)
@@ -16,9 +16,10 @@ import java.net.URI;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.eclipse.jdt.annotation.NonNullByDefault;
-import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.eclipse.jdt.annotation.Nullable;
 import org.eclipse.jetty.websocket.api.Session;
 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
@@ -43,6 +44,7 @@ import com.google.gson.Gson;
 @WebSocket
 @NonNullByDefault
 public class WebSocketConnection {
+    private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
     private final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class);
 
     private final WebSocketClient client;
@@ -50,16 +52,17 @@ public class WebSocketConnection {
     private final Gson gson;
 
     private final WebSocketConnectionListener connectionListener;
-    private final Map<Map.Entry<ResourceType, String>, WebSocketMessageListener> listeners = new ConcurrentHashMap<>();
+    private final Map<String, WebSocketMessageListener> listeners = new ConcurrentHashMap<>();
 
     private ConnectionState connectionState = ConnectionState.DISCONNECTED;
+    private @Nullable Session session;
 
     public WebSocketConnection(WebSocketConnectionListener listener, WebSocketClient client, Gson gson) {
         this.connectionListener = listener;
         this.client = client;
         this.client.setMaxIdleTimeout(0);
         this.gson = gson;
-        this.socketName = ((QueuedThreadPool) client.getExecutor()).getName() + "$" + this.hashCode();
+        this.socketName = "Websocket$" + System.currentTimeMillis() + "-" + INSTANCE_COUNTER.incrementAndGet();
     }
 
     public void start(String ip) {
@@ -68,6 +71,8 @@ public class WebSocketConnection {
         } else if (connectionState == ConnectionState.CONNECTING) {
             logger.debug("{} already connecting", socketName);
             return;
+        } else if (connectionState == ConnectionState.DISCONNECTING) {
+            logger.warn("{} trying to re-connect while still disconnecting", socketName);
         }
         try {
             URI destUri = URI.create("ws://" + ip);
@@ -75,7 +80,7 @@ public class WebSocketConnection {
             logger.debug("Trying to connect {} to {}", socketName, destUri);
             client.connect(this, destUri).get();
         } catch (Exception e) {
-            connectionListener.connectionError(e);
+            connectionListener.connectionLost("Error while connecting: " + e.getMessage());
         }
     }
 
@@ -90,67 +95,104 @@ public class WebSocketConnection {
     }
 
     public void registerListener(ResourceType resourceType, String sensorID, WebSocketMessageListener listener) {
-        listeners.put(Map.entry(resourceType, sensorID), listener);
+        listeners.put(getListenerId(resourceType, sensorID), listener);
     }
 
     public void unregisterListener(ResourceType resourceType, String sensorID) {
-        listeners.remove(Map.entry(resourceType, sensorID));
+        listeners.remove(getListenerId(resourceType, sensorID));
     }
 
     @SuppressWarnings("unused")
     @OnWebSocketConnect
     public void onConnect(Session session) {
         connectionState = ConnectionState.CONNECTED;
-        logger.debug("{} successfully connected to {}", socketName, session.getRemoteAddress().getAddress());
+        logger.debug("{} successfully connected to {}: {}", socketName, session.getRemoteAddress().getAddress(),
+                session.hashCode());
         connectionListener.connectionEstablished();
+        this.session = session;
     }
 
     @SuppressWarnings({ "null", "unused" })
     @OnWebSocketMessage
-    public void onMessage(String message) {
-        logger.trace("Raw data received by websocket {}: {}", socketName, message);
-
-        DeconzBaseMessage changedMessage = Objects.requireNonNull(gson.fromJson(message, DeconzBaseMessage.class));
-        if (changedMessage.r == ResourceType.UNKNOWN) {
-            logger.trace("Received message has unknown resource type. Skipping message.");
+    public void onMessage(Session session, String message) {
+        if (!session.equals(this.session)) {
+            handleWrongSession(session, message);
             return;
         }
+        logger.trace("{} received raw data: {}", socketName, message);
 
-        WebSocketMessageListener listener = listeners.get(Map.entry(changedMessage.r, changedMessage.id));
-        if (listener == null) {
-            logger.debug(
-                    "Couldn't find listener for id {} with resource type {}. Either no thing for this id has been defined or this is a bug.",
-                    changedMessage.id, changedMessage.r);
-            return;
+        try {
+            DeconzBaseMessage changedMessage = Objects.requireNonNull(gson.fromJson(message, DeconzBaseMessage.class));
+            if (changedMessage.r == ResourceType.UNKNOWN) {
+                logger.trace("Received message has unknown resource type. Skipping message.");
+                return;
+            }
+
+            WebSocketMessageListener listener = listeners.get(getListenerId(changedMessage.r, changedMessage.id));
+            if (listener == null) {
+                logger.debug(
+                        "Couldn't find listener for id {} with resource type {}. Either no thing for this id has been defined or this is a bug.",
+                        changedMessage.id, changedMessage.r);
+                return;
+            }
+
+            Class<? extends DeconzBaseMessage> expectedMessageType = changedMessage.r.getExpectedMessageType();
+            if (expectedMessageType == null) {
+                logger.warn(
+                        "BUG! Could not get expected message type for resource type {}. Please report this incident.",
+                        changedMessage.r);
+                return;
+            }
+
+            DeconzBaseMessage deconzMessage = gson.fromJson(message, expectedMessageType);
+            if (deconzMessage != null) {
+                listener.messageReceived(changedMessage.id, deconzMessage);
+
+            }
+        } catch (RuntimeException e) {
+            // we need to catch all processing exceptions, otherwise they could affect the connection
+            logger.warn("{} encountered an error while processing the message {}: {}", socketName, message,
+                    e.getMessage());
         }
+    }
 
-        Class<? extends DeconzBaseMessage> expectedMessageType = changedMessage.r.getExpectedMessageType();
-        if (expectedMessageType == null) {
-            logger.warn("BUG! Could not get expected message type for resource type {}. Please report this incident.",
-                    changedMessage.r);
+    @SuppressWarnings("unused")
+    @OnWebSocketError
+    public void onError(Session session, Throwable cause) {
+        if (!session.equals(this.session)) {
+            handleWrongSession(session, "Connection error: " + cause.getMessage());
             return;
         }
+        logger.warn("{} connection errored, closing: {}", socketName, cause.getMessage());
 
-        DeconzBaseMessage deconzMessage = gson.fromJson(message, expectedMessageType);
-        if (deconzMessage != null) {
-            listener.messageReceived(changedMessage.id, deconzMessage);
+        Session storedSession = this.session;
+        if (storedSession != null && storedSession.isOpen()) {
+            storedSession.close(-1, "Processing error");
         }
     }
 
-    @SuppressWarnings("unused")
-    @OnWebSocketError
-    public void onError(Throwable cause) {
-        connectionState = ConnectionState.DISCONNECTED;
-        connectionListener.connectionError(cause);
-    }
-
     @SuppressWarnings("unused")
     @OnWebSocketClose
-    public void onClose(int statusCode, String reason) {
+    public void onClose(Session session, int statusCode, String reason) {
+        if (!session.equals(this.session)) {
+            handleWrongSession(session, "Connection closed: " + statusCode + " / " + reason);
+            return;
+        }
+        logger.trace("{} closed connection: {} / {}", socketName, statusCode, reason);
         connectionState = ConnectionState.DISCONNECTED;
+        this.session = null;
         connectionListener.connectionLost(reason);
     }
 
+    private void handleWrongSession(Session session, String message) {
+        logger.warn("{}/{} received and discarded message for other session {}: {}.", socketName, session.hashCode(),
+                session.hashCode(), message);
+        if (session.isOpen()) {
+            // Close the session if it is still open. It should already be closed anyway
+            session.close();
+        }
+    }
+
     /**
      * check connection state (successfully connected)
      *
@@ -160,6 +202,17 @@ public class WebSocketConnection {
         return connectionState == ConnectionState.CONNECTED;
     }
 
+    /**
+     * create a unique identifier for a listener
+     *
+     * @param resourceType the listener resource-type (LIGHT, SENSOR, ...)
+     * @param id the listener id (same as deconz-id)
+     * @return a unique string for this listener
+     */
+    private String getListenerId(ResourceType resourceType, String id) {
+        return resourceType.name() + "$" + id;
+    }
+
     /**
      * used internally to represent the connection state
      */
index 01749714e1d21f12c4692c84d15503d6925ccdcc..ccf689544014cf8f8f05d50ac0fc3e46c3638d1f 100644 (file)
@@ -21,13 +21,6 @@ import org.eclipse.jdt.annotation.NonNullByDefault;
  */
 @NonNullByDefault
 public interface WebSocketConnectionListener {
-    /**
-     * An error occurred during connection or while connecting.
-     *
-     * @param e The error
-     */
-    void connectionError(Throwable e);
-
     /**
      * Connection successfully established.
      */