]> git.basschouten.com Git - openhab-addons.git/blob
9b31fa2d31e58dd68563d9fb9f9d2b81a0c0896a
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.deconz.internal.netutils;
14
15 import java.net.URI;
16 import java.util.Map;
17 import java.util.Objects;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.concurrent.ScheduledFuture;
21 import java.util.concurrent.TimeUnit;
22 import java.util.concurrent.atomic.AtomicInteger;
23
24 import org.eclipse.jdt.annotation.NonNullByDefault;
25 import org.eclipse.jdt.annotation.Nullable;
26 import org.eclipse.jetty.websocket.api.Session;
27 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
28 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
29 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
30 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
31 import org.eclipse.jetty.websocket.api.annotations.WebSocket;
32 import org.eclipse.jetty.websocket.client.WebSocketClient;
33 import org.openhab.binding.deconz.internal.dto.DeconzBaseMessage;
34 import org.openhab.binding.deconz.internal.types.ResourceType;
35 import org.openhab.core.common.ThreadPoolManager;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 import com.google.gson.Gson;
40
41 /**
42  * Establishes and keeps a websocket connection to the deCONZ software.
43  *
44  * The connection is closed by deCONZ now and then and needs to be re-established.
45  *
46  * @author David Graeff - Initial contribution
47  */
48 @WebSocket
49 @NonNullByDefault
50 public class WebSocketConnection {
51     private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
52     private final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class);
53     private final ScheduledExecutorService scheduler = ThreadPoolManager.getScheduledPool("thingHandler");
54
55     private final WebSocketClient client;
56     private final String socketName;
57     private final Gson gson;
58     private int watchdogInterval;
59
60     private final WebSocketConnectionListener connectionListener;
61     private final Map<String, WebSocketMessageListener> listeners = new ConcurrentHashMap<>();
62
63     private ConnectionState connectionState = ConnectionState.DISCONNECTED;
64     private @Nullable ScheduledFuture<?> watchdogJob;
65
66     private @Nullable Session session;
67
68     public WebSocketConnection(WebSocketConnectionListener listener, WebSocketClient client, Gson gson,
69             int watchdogInterval) {
70         this.connectionListener = listener;
71         this.client = client;
72         this.client.setMaxIdleTimeout(0);
73         this.gson = gson;
74         this.socketName = "Websocket$" + System.currentTimeMillis() + "-" + INSTANCE_COUNTER.incrementAndGet();
75         this.watchdogInterval = watchdogInterval;
76     }
77
78     public void setWatchdogInterval(int watchdogInterval) {
79         this.watchdogInterval = watchdogInterval;
80     }
81
82     public void start(String ip) {
83         if (connectionState == ConnectionState.CONNECTED) {
84             return;
85         } else if (connectionState == ConnectionState.CONNECTING) {
86             logger.debug("{} already connecting", socketName);
87             return;
88         } else if (connectionState == ConnectionState.DISCONNECTING) {
89             logger.warn("{} trying to re-connect while still disconnecting", socketName);
90             return;
91         }
92         try {
93             connectionState = ConnectionState.CONNECTING;
94             URI destUri = URI.create("ws://" + ip);
95             client.start();
96             logger.debug("Trying to connect {} to {}", socketName, destUri);
97             client.connect(this, destUri).get();
98         } catch (Exception e) {
99             String reason = "Error while connecting: " + e.getMessage();
100             if (e.getMessage() == null) {
101                 logger.warn("{}: {}", socketName, reason, e);
102             } else {
103                 logger.warn("{}: {}", socketName, reason);
104             }
105             connectionListener.webSocketConnectionLost(reason);
106         }
107     }
108
109     private void startOrResetWatchdogTimer() {
110         stopWatchdogTimer(); // stop already running timer
111         watchdogJob = scheduler.schedule(
112                 () -> connectionListener.webSocketConnectionLost(
113                         "Watchdog timed out after " + watchdogInterval + "s. Websocket seems to be dead."),
114                 watchdogInterval, TimeUnit.SECONDS);
115     }
116
117     private void stopWatchdogTimer() {
118         ScheduledFuture<?> watchdogTimer = this.watchdogJob;
119         if (watchdogTimer != null) {
120             watchdogTimer.cancel(false);
121             this.watchdogJob = null;
122         }
123     }
124
125     /**
126      * dispose the websocket (close connection and destroy client)
127      *
128      */
129     public void dispose() {
130         stopWatchdogTimer();
131         try {
132             connectionState = ConnectionState.DISCONNECTING;
133             client.stop();
134         } catch (Exception e) {
135             logger.debug("{} encountered an error while closing connection", socketName, e);
136         }
137         client.destroy();
138         connectionState = ConnectionState.DISCONNECTED;
139     }
140
141     public void registerListener(ResourceType resourceType, String sensorID, WebSocketMessageListener listener) {
142         listeners.put(getListenerId(resourceType, sensorID), listener);
143     }
144
145     public void unregisterListener(ResourceType resourceType, String sensorID) {
146         listeners.remove(getListenerId(resourceType, sensorID));
147     }
148
149     @SuppressWarnings("unused")
150     @OnWebSocketConnect
151     public void onConnect(Session session) {
152         connectionState = ConnectionState.CONNECTED;
153         logger.debug("{} successfully connected to {}: {}", socketName, session.getRemoteAddress().getAddress(),
154                 session.hashCode());
155         connectionListener.webSocketConnectionEstablished();
156         startOrResetWatchdogTimer();
157         this.session = session;
158     }
159
160     @SuppressWarnings("unused")
161     @OnWebSocketMessage
162     public void onMessage(Session session, String message) {
163         if (!session.equals(this.session)) {
164             handleWrongSession(session, message);
165             return;
166         }
167         startOrResetWatchdogTimer();
168         logger.trace("{} received raw data: {}", socketName, message);
169
170         try {
171             DeconzBaseMessage changedMessage = Objects.requireNonNull(gson.fromJson(message, DeconzBaseMessage.class));
172             if (changedMessage.r == ResourceType.UNKNOWN) {
173                 logger.trace("Received message has unknown resource type. Skipping message.");
174                 return;
175             }
176
177             ResourceType resourceType = changedMessage.r;
178             String resourceId = changedMessage.id;
179
180             if (resourceType == ResourceType.SCENES) {
181                 // scene recalls
182                 resourceType = ResourceType.GROUPS;
183                 resourceId = changedMessage.gid;
184             }
185
186             WebSocketMessageListener listener = listeners.get(getListenerId(resourceType, resourceId));
187             if (listener == null) {
188                 logger.trace(
189                         "Couldn't find listener for id {} with resource type {}. Either no thing for this id has been defined or this is a bug.",
190                         changedMessage.id, changedMessage.r);
191                 return;
192             }
193
194             // we still need the original resource type here
195             Class<? extends DeconzBaseMessage> expectedMessageType = changedMessage.r.getExpectedMessageType();
196             if (expectedMessageType == null) {
197                 logger.warn(
198                         "BUG! Could not get expected message type for resource type {}. Please report this incident.",
199                         changedMessage.r);
200                 return;
201             }
202
203             DeconzBaseMessage deconzMessage = Objects.requireNonNull(gson.fromJson(message, expectedMessageType));
204             listener.messageReceived(deconzMessage);
205         } catch (RuntimeException e) {
206             // we need to catch all processing exceptions, otherwise they could affect the connection
207             logger.warn("{} encountered an error while processing the message {}: {}", socketName, message,
208                     e.getMessage());
209         }
210     }
211
212     @SuppressWarnings("unused")
213     @OnWebSocketError
214     public void onError(@Nullable Session session, Throwable cause) {
215         if (session != null && !session.equals(this.session)) {
216             handleWrongSession(session, "Connection error: " + cause.getMessage());
217             return;
218         }
219         logger.warn("{} connection errored, closing: {}", socketName, cause.getMessage());
220
221         stopWatchdogTimer();
222         Session storedSession = this.session;
223         if (storedSession != null && storedSession.isOpen()) {
224             storedSession.close(-1, "Processing error");
225         }
226     }
227
228     @SuppressWarnings("unused")
229     @OnWebSocketClose
230     public void onClose(Session session, int statusCode, String reason) {
231         if (!session.equals(this.session)) {
232             handleWrongSession(session, "Connection closed: " + statusCode + " / " + reason);
233             return;
234         }
235         logger.trace("{} closed connection: {} / {}", socketName, statusCode, reason);
236         connectionState = ConnectionState.DISCONNECTED;
237         stopWatchdogTimer();
238         this.session = null;
239         connectionListener.webSocketConnectionLost(reason);
240     }
241
242     private void handleWrongSession(Session session, String message) {
243         logger.warn("{}{} received and discarded message for other or session {}: {}.", socketName, session.hashCode(),
244                 session.hashCode(), message);
245         if (session.isOpen()) {
246             // Close the session if it is still open. It should already be closed anyway
247             session.close();
248         }
249     }
250
251     /**
252      * check connection state (successfully connected)
253      *
254      * @return true if connected, false if connecting, disconnecting or disconnected
255      */
256     public boolean isConnected() {
257         return connectionState == ConnectionState.CONNECTED;
258     }
259
260     /**
261      * create a unique identifier for a listener
262      *
263      * @param resourceType the listener resource-type (LIGHT, SENSOR, ...)
264      * @param id the listener id (same as deconz-id)
265      * @return a unique string for this listener
266      */
267     private String getListenerId(ResourceType resourceType, String id) {
268         return resourceType.name() + "$" + id;
269     }
270
271     /**
272      * used internally to represent the connection state
273      */
274     private enum ConnectionState {
275         CONNECTING,
276         CONNECTED,
277         DISCONNECTING,
278         DISCONNECTED
279     }
280 }