]> git.basschouten.com Git - openhab-addons.git/blob
a93b019e420a0ae9bafa8b54669e06c041a834ef
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2024 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mycroft.internal.api;
14
15 import java.io.IOException;
16 import java.net.URI;
17 import java.util.HashSet;
18 import java.util.Map;
19 import java.util.Objects;
20 import java.util.Optional;
21 import java.util.Set;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.Future;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.atomic.AtomicInteger;
26 import java.util.stream.Stream;
27
28 import org.eclipse.jdt.annotation.NonNullByDefault;
29 import org.eclipse.jdt.annotation.Nullable;
30 import org.eclipse.jetty.websocket.api.Session;
31 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
32 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
33 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
34 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
35 import org.eclipse.jetty.websocket.api.annotations.WebSocket;
36 import org.eclipse.jetty.websocket.client.WebSocketClient;
37 import org.openhab.binding.mycroft.internal.api.dto.BaseMessage;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 import com.google.gson.Gson;
42 import com.google.gson.GsonBuilder;
43
44 /**
45  * Establishes and keeps a websocket connection to the Mycroft bus
46  *
47  * @author Gwendal Roulleau - Initial contribution. Inspired by the deconz binding.
48  */
49 @WebSocket
50 @NonNullByDefault
51 public class MycroftConnection {
52     private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
53     private final Logger logger = LoggerFactory.getLogger(MycroftConnection.class);
54
55     private final WebSocketClient client;
56     private final String socketName;
57     private final Gson gson;
58
59     private final MycroftConnectionListener connectionListener;
60     private final Map<MessageType, Set<MycroftMessageListener<? extends BaseMessage>>> listeners = new ConcurrentHashMap<>();
61
62     private ConnectionState connectionState = ConnectionState.DISCONNECTED;
63     private @Nullable Session session;
64
65     private static final int TIMEOUT_MILLISECONDS = 3000;
66
67     public MycroftConnection(MycroftConnectionListener listener, WebSocketClient client) {
68         this.connectionListener = listener;
69         this.client = client;
70         this.client.setConnectTimeout(TIMEOUT_MILLISECONDS);
71         this.client.setMaxIdleTimeout(0);
72         this.socketName = "Websocket-Mycroft$" + System.currentTimeMillis() + "-" + INSTANCE_COUNTER.incrementAndGet();
73
74         GsonBuilder gsonBuilder = new GsonBuilder();
75         gsonBuilder.registerTypeAdapter(MessageType.class, new MessageTypeConverter());
76         gson = gsonBuilder.create();
77     }
78
79     public MycroftConnection(MycroftConnectionListener listener) {
80         this(listener, new WebSocketClient());
81     }
82
83     public void start(String ip, int port) {
84         if (connectionState == ConnectionState.CONNECTED) {
85             return;
86         } else if (connectionState == ConnectionState.CONNECTING) {
87             logger.debug("{} already connecting", socketName);
88             return;
89         } else if (connectionState == ConnectionState.DISCONNECTING) {
90             logger.warn("{} trying to re-connect while still disconnecting", socketName);
91         }
92         Future<Session> futureConnect = null;
93         try {
94             URI destUri = URI.create("ws://" + ip + ":" + port + "/core");
95             client.start();
96             logger.debug("Trying to connect {} to {}", socketName, destUri);
97             futureConnect = client.connect(this, destUri);
98             futureConnect.get(TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS);
99         } catch (Exception e) {
100             if (futureConnect != null) {
101                 futureConnect.cancel(true);
102             }
103             connectionListener
104                     .connectionLost("Error while connecting: " + (e.getMessage() != null ? e.getMessage() : "unknown"));
105         }
106     }
107
108     public void close() {
109         try {
110             connectionState = ConnectionState.DISCONNECTING;
111             client.stop();
112         } catch (Exception e) {
113             logger.debug("{} encountered an error while closing connection", socketName, e);
114         }
115         client.destroy();
116     }
117
118     /**
119      * The listener registered in this method will be called when a corresponding message will be detected
120      * on the Mycroft bus.
121      *
122      * @param messageType The message type to listen to.
123      * @param listener The listener will receive a callback when the requested message type will be detected on the bus.
124      */
125     public void registerListener(MessageType messageType, MycroftMessageListener<? extends BaseMessage> listener) {
126         Set<MycroftMessageListener<? extends BaseMessage>> messageTypeListeners = listeners.get(messageType);
127         if (messageTypeListeners == null) {
128             messageTypeListeners = new HashSet<MycroftMessageListener<? extends BaseMessage>>();
129             listeners.put(messageType, messageTypeListeners);
130         }
131         messageTypeListeners.add(listener);
132     }
133
134     public void unregisterListener(MessageType messageType, MycroftMessageListener<?> listener) {
135         Optional.ofNullable(listeners.get(messageType))
136                 .ifPresent((messageTypeListeners) -> messageTypeListeners.remove(listener));
137     }
138
139     public void sendMessage(BaseMessage message) throws IOException {
140         sendMessage(gson.toJson(message));
141     }
142
143     public void sendMessage(String message) throws IOException {
144         final Session storedSession = this.session;
145         try {
146             if (storedSession != null) {
147                 storedSession.getRemote().sendString(message);
148             } else {
149                 throw new IOException("Session is not initialized");
150             }
151         } catch (IOException e) {
152             if (storedSession != null && storedSession.isOpen()) {
153                 storedSession.close(-1, "Sending message error");
154             }
155             throw e;
156         }
157     }
158
159     @OnWebSocketConnect
160     public void onConnect(Session session) {
161         connectionState = ConnectionState.CONNECTED;
162         logger.debug("{} successfully connected to {}: {}", socketName, session.getRemoteAddress().getAddress(),
163                 session.hashCode());
164         connectionListener.connectionEstablished();
165         this.session = session;
166     }
167
168     @OnWebSocketMessage
169     public void onMessage(Session session, String message) {
170         if (!session.equals(this.session)) {
171             handleWrongSession(session, message);
172             return;
173         }
174         logger.trace("{} received raw data: {}", socketName, message);
175
176         try {
177             // get the base message information :
178             BaseMessage mycroftMessage = gson.fromJson(message, BaseMessage.class);
179             Objects.requireNonNull(mycroftMessage);
180             // now that we have the message type, we can use a second and more precise parsing:
181             if (mycroftMessage.type != MessageType.any) {
182                 mycroftMessage = gson.fromJson(message, mycroftMessage.type.getMessageTypeClass());
183                 Objects.requireNonNull(mycroftMessage);
184             }
185             // adding the raw message:
186             mycroftMessage.message = message;
187
188             final BaseMessage finalMessage = mycroftMessage;
189             Stream.concat(listeners.getOrDefault(MessageType.any, new HashSet<>()).stream(),
190                     listeners.getOrDefault(mycroftMessage.type, new HashSet<>()).stream()).forEach(listener -> {
191                         listener.baseMessageReceived(finalMessage);
192                     });
193         } catch (RuntimeException e) {
194             // we need to catch all processing exceptions, otherwise they could affect the connection
195             logger.debug("{} encountered an error while processing the message {}: {}", socketName, message,
196                     e.getMessage());
197         }
198     }
199
200     @OnWebSocketError
201     public void onError(@Nullable Session session, Throwable cause) {
202         if (session == null || !session.equals(this.session)) {
203             handleWrongSession(session, "Connection error: " + cause.getMessage());
204             return;
205         }
206         logger.debug("{} connection error, closing: {}", socketName, cause.getMessage());
207
208         Session storedSession = this.session;
209         if (storedSession != null && storedSession.isOpen()) {
210             storedSession.close(-1, "Processing error");
211         }
212     }
213
214     @OnWebSocketClose
215     public void onClose(Session session, int statusCode, String reason) {
216         if (!session.equals(this.session)) {
217             handleWrongSession(session, "Connection closed: " + statusCode + " / " + reason);
218             return;
219         }
220         logger.trace("{} closed connection: {} / {}", socketName, statusCode, reason);
221         connectionState = ConnectionState.DISCONNECTED;
222         this.session = null;
223         connectionListener.connectionLost(reason);
224     }
225
226     private void handleWrongSession(@Nullable Session session, String message) {
227         if (session == null) {
228             logger.debug("received and discarded message for null session : {}", message);
229         } else {
230             logger.debug("{} received and discarded message for other session {}: {}.", socketName, session.hashCode(),
231                     message);
232         }
233     }
234
235     /**
236      * check connection state (successfully connected)
237      *
238      * @return true if connected, false if connecting, disconnecting or disconnected
239      */
240     public boolean isConnected() {
241         return connectionState == ConnectionState.CONNECTED;
242     }
243
244     /**
245      * used internally to represent the connection state
246      */
247     private enum ConnectionState {
248         CONNECTING,
249         CONNECTED,
250         DISCONNECTING,
251         DISCONNECTED
252     }
253 }