2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.tesla.internal.handler;
15 import java.io.BufferedReader;
16 import java.io.ByteArrayInputStream;
17 import java.io.IOException;
18 import java.io.InputStreamReader;
20 import java.nio.ByteBuffer;
21 import java.nio.charset.CodingErrorAction;
22 import java.nio.charset.StandardCharsets;
23 import java.util.concurrent.Future;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.atomic.AtomicInteger;
27 import org.eclipse.jdt.annotation.Nullable;
28 import org.eclipse.jetty.websocket.api.Session;
29 import org.eclipse.jetty.websocket.api.StatusCode;
30 import org.eclipse.jetty.websocket.api.WebSocketListener;
31 import org.eclipse.jetty.websocket.api.WebSocketPingPongListener;
32 import org.eclipse.jetty.websocket.client.WebSocketClient;
33 import org.openhab.binding.tesla.internal.protocol.Event;
34 import org.openhab.core.io.net.http.WebSocketFactory;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
38 import com.google.gson.Gson;
41 * The {@link TeslaEventEndpoint} is responsible managing a websocket connection to a specific URI, most notably the
42 * Tesla event stream infrastructure. Consumers can register an {@link EventHandler} in order to receive data that was
43 * received by the websocket endpoint. The {@link TeslaEventEndpoint} can also implements a ping/pong mechanism to keep
46 * @author Karel Goderis - Initial contribution
48 public class TeslaEventEndpoint implements WebSocketListener, WebSocketPingPongListener {
50 private static final int TIMEOUT_MILLISECONDS = 3000;
51 private static final int IDLE_TIMEOUT_MILLISECONDS = 30000;
53 private final Logger logger = LoggerFactory.getLogger(TeslaEventEndpoint.class);
55 private String endpointId;
56 protected WebSocketFactory webSocketFactory;
57 private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
59 private WebSocketClient client;
60 private ConnectionState connectionState = ConnectionState.CLOSED;
61 private @Nullable Session session;
62 private EventHandler eventHandler;
63 private final Gson gson = new Gson();
65 public TeslaEventEndpoint(WebSocketFactory webSocketFactory) {
67 this.endpointId = "TeslaEventEndpoint-" + INSTANCE_COUNTER.incrementAndGet();
69 client = webSocketFactory.createWebSocketClient(endpointId);
70 this.client.setConnectTimeout(TIMEOUT_MILLISECONDS);
71 this.client.setMaxIdleTimeout(IDLE_TIMEOUT_MILLISECONDS);
72 } catch (Exception e) {
73 throw new RuntimeException(e);
77 public void connect(URI endpointURI) {
78 if (connectionState == ConnectionState.CONNECTED) {
80 } else if (connectionState == ConnectionState.CONNECTING) {
81 logger.debug("{} : Already connecting to {}", endpointId, endpointURI);
83 } else if (connectionState == ConnectionState.CLOSING) {
84 logger.warn("{} : Connecting to {} while already closing the connection", endpointId, endpointURI);
87 Future<Session> futureConnect = null;
89 if (!client.isRunning()) {
90 logger.debug("{} : Starting the client to connect to {}", endpointId, endpointURI);
93 logger.debug("{} : The client to connect to {} is already running", endpointId, endpointURI);
96 logger.debug("{} : Connecting to {}", endpointId, endpointURI);
97 connectionState = ConnectionState.CONNECTING;
98 futureConnect = client.connect(this, endpointURI);
99 futureConnect.get(TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS);
100 } catch (Exception e) {
101 logger.error("An exception occurred while connecting the Event Endpoint : '{}'", e.getMessage());
102 if (futureConnect != null) {
103 futureConnect.cancel(true);
109 public void onWebSocketConnect(Session session) {
110 logger.debug("{} : Connected to {} with hash {}", endpointId, session.getRemoteAddress().getAddress(),
112 connectionState = ConnectionState.CONNECTED;
113 this.session = session;
116 public void close() {
118 connectionState = ConnectionState.CLOSING;
119 if (session != null && session.isOpen()) {
120 logger.debug("{} : Closing the session", endpointId);
121 session.close(StatusCode.NORMAL, "bye");
123 } catch (Exception e) {
124 logger.error("{} : An exception occurred while closing the session : {}", endpointId, e.getMessage());
125 connectionState = ConnectionState.CLOSED;
130 public void onWebSocketClose(int statusCode, String reason) {
131 logger.debug("{} : Closed the session with status {} for reason {}", endpointId, statusCode, reason);
132 connectionState = ConnectionState.CLOSED;
137 public void onWebSocketText(String message) {
142 public void onWebSocketBinary(byte[] payload, int offset, int length) {
143 BufferedReader in = new BufferedReader(
144 new InputStreamReader(new ByteArrayInputStream(payload), StandardCharsets.UTF_8.newDecoder()
145 .onMalformedInput(CodingErrorAction.REPORT).onUnmappableCharacter(CodingErrorAction.REPORT)));
148 while ((str = in.readLine()) != null) {
149 logger.trace("{} : Received raw data '{}'", endpointId, str);
150 if (this.eventHandler != null) {
152 Event event = gson.fromJson(str, Event.class);
153 this.eventHandler.handleEvent(event);
154 } catch (RuntimeException e) {
155 logger.error("{} : An exception occurred while processing raw data : {}", endpointId,
160 } catch (IOException e) {
161 logger.error("{} : An exception occurred while receiving raw data : {}", endpointId, e.getMessage());
166 public void onWebSocketError(Throwable cause) {
167 logger.error("{} : An error occurred in the session : {}", endpointId, cause.getMessage());
168 if (session != null && session.isOpen()) {
169 session.close(StatusCode.ABNORMAL, "Session Error");
173 public void sendMessage(String message) throws IOException {
175 if (session != null) {
176 logger.debug("{} : Sending raw data '{}'", endpointId, message);
177 session.getRemote().sendString(message);
179 throw new IOException("Session is not initialized");
181 } catch (IOException e) {
182 if (session != null && session.isOpen()) {
183 session.close(StatusCode.ABNORMAL, "Send Message Error");
191 if (session != null) {
192 ByteBuffer buffer = ByteBuffer.allocate(8).putLong(System.nanoTime()).flip();
193 session.getRemote().sendPing(buffer);
195 } catch (IOException e) {
196 logger.error("{} : An exception occurred while pinging the remote end : {}", endpointId, e.getMessage());
201 public void onWebSocketPing(ByteBuffer payload) {
202 ByteBuffer buffer = ByteBuffer.allocate(8).putLong(System.nanoTime()).flip();
204 if (session != null) {
205 session.getRemote().sendPing(buffer);
207 } catch (IOException e) {
208 logger.error("{} : An exception occurred while processing a ping message : {}", endpointId, e.getMessage());
213 public void onWebSocketPong(ByteBuffer payload) {
214 long start = payload.getLong();
215 long roundTrip = System.nanoTime() - start;
217 logger.trace("{} : Received a Pong with a roundtrip of {} milliseconds", endpointId,
218 TimeUnit.MILLISECONDS.convert(roundTrip, TimeUnit.NANOSECONDS));
221 public void addEventHandler(EventHandler eventHandler) {
222 this.eventHandler = eventHandler;
225 public boolean isConnected() {
226 return connectionState == ConnectionState.CONNECTED;
229 public static interface EventHandler {
230 public void handleEvent(Event event);
233 private enum ConnectionState {