2 * Copyright (c) 2010-2024 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.mercedesme.internal.server;
15 import java.io.ByteArrayOutputStream;
16 import java.io.IOException;
17 import java.io.InputStream;
19 import java.nio.ByteBuffer;
20 import java.time.Instant;
21 import java.util.ArrayList;
22 import java.util.List;
24 import org.eclipse.jdt.annotation.NonNullByDefault;
25 import org.eclipse.jdt.annotation.Nullable;
26 import org.eclipse.jetty.websocket.api.Session;
27 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
28 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
29 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
30 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
31 import org.eclipse.jetty.websocket.api.annotations.WebSocket;
32 import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
33 import org.eclipse.jetty.websocket.client.WebSocketClient;
34 import org.openhab.binding.mercedesme.internal.Constants;
35 import org.openhab.binding.mercedesme.internal.handler.AccountHandler;
36 import org.openhab.core.thing.ThingStatus;
37 import org.openhab.core.thing.ThingStatusDetail;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
41 import com.daimler.mbcarkit.proto.Client.ClientMessage;
42 import com.daimler.mbcarkit.proto.Protos.AcknowledgeAssignedVehicles;
43 import com.daimler.mbcarkit.proto.VehicleEvents;
44 import com.daimler.mbcarkit.proto.VehicleEvents.AcknowledgeVEPUpdatesByVIN;
45 import com.daimler.mbcarkit.proto.VehicleEvents.PushMessage;
46 import com.daimler.mbcarkit.proto.Vehicleapi.AcknowledgeAppTwinCommandStatusUpdatesByVIN;
47 import com.daimler.mbcarkit.proto.Vehicleapi.AppTwinCommandStatusUpdatesByVIN;
50 * {@link MBWebsocket} as socket endpoint to communicate with Mercedes
52 * @author Bernd Weymann - Initial contribution
56 public class MBWebsocket {
57 // timeout 14 Minutes - just below scheduling of 15 Minutes by AccountHandler
58 private static final int CONNECT_TIMEOUT_MS = 14 * 60 * 1000;
59 // standard runtime of Websocket
60 private static final int WS_RUNTIME_MS = 60 * 1000;
61 // addon time of 1 minute for a new send command
62 private static final int ADDON_MESSAGE_TIME_MS = 60 * 1000;
63 // check Socket time elapsed each second
64 private static final int CHECK_INTERVAL_MS = 1000;
65 // additional 5 minutes after keep alive
66 private static final int KEEP_ALIVE_ADDON = 5 * 60 * 1000;
68 private final Logger logger = LoggerFactory.getLogger(MBWebsocket.class);
69 private AccountHandler accountHandler;
70 private boolean running = false;
71 private Instant runTill = Instant.now();
72 private @Nullable Session session;
73 private List<ClientMessage> commandQueue = new ArrayList<>();
75 private boolean keepAlive = false;
77 public MBWebsocket(AccountHandler ah) {
83 * - scheduler every 15 minutes
84 * - handler sending a command
85 * - handler requesting refresh
93 runTill = Instant.now().plusMillis(WS_RUNTIME_MS);
97 WebSocketClient client = new WebSocketClient();
98 client.setMaxIdleTimeout(CONNECT_TIMEOUT_MS);
99 client.setStopTimeout(CONNECT_TIMEOUT_MS);
100 ClientUpgradeRequest request = accountHandler.getClientUpgradeRequest();
101 String websocketURL = accountHandler.getWSUri();
102 logger.trace("Websocket start {}", websocketURL);
103 if (Constants.JUNIT_TOKEN.equals(request.getHeader("Authorization"))) {
104 // avoid unit test requesting real websocket - simply return
108 client.connect(this, new URI(websocketURL), request);
109 while (keepAlive || Instant.now().isBefore(runTill)) {
111 Thread.sleep(CHECK_INTERVAL_MS);
112 } catch (InterruptedException ie) {
113 logger.trace("Websocket interrupted during sleeping - stop executing");
114 runTill = Instant.MIN;
116 // sends one message per second
118 // add additional runtime to execute and finish command
119 runTill = runTill.plusMillis(ADDON_MESSAGE_TIME_MS);
122 logger.trace("Websocket stop");
125 } catch (Throwable t) {
126 // catch Exceptions of start stop and declare communication error
127 accountHandler.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
128 "@text/mercedesme.account.status.websocket-failure");
129 logger.warn("Websocket handling exception: {}", t.getMessage());
131 synchronized (this) {
136 public void setCommand(ClientMessage cm) {
137 commandQueue.add(cm);
140 private boolean sendMessage() {
141 if (!commandQueue.isEmpty()) {
142 ClientMessage message = commandQueue.remove(0);
143 logger.trace("Send Message {}", message.getAllFields());
145 ByteArrayOutputStream baos = new ByteArrayOutputStream();
146 message.writeTo(baos);
147 if (session != null) {
148 session.getRemote().sendBytes(ByteBuffer.wrap(baos.toByteArray()));
151 } catch (IOException e) {
152 logger.warn("Error sending message {} : {}", message.getAllFields(), e.getMessage());
154 logger.info("Send Message {} done", message.getAllFields());
159 private void sendAcknowledgeMessage(ClientMessage message) {
161 ByteArrayOutputStream baos = new ByteArrayOutputStream();
162 message.writeTo(baos);
163 if (session != null) {
164 session.getRemote().sendBytes(ByteBuffer.wrap(baos.toByteArray()));
166 } catch (IOException e) {
167 logger.warn("Error sending acknowledge {} : {}", message.getAllFields(), e.getMessage());
171 public boolean isRunning() {
175 public void interrupt() {
176 synchronized (this) {
177 runTill = Instant.MIN;
182 public void keepAlive(boolean b) {
185 logger.trace("WebSocket - keep alive start");
189 // after keep alive is finished add 5 minutes to cover e.g. door events after trip is finished
190 runTill = Instant.now().plusMillis(KEEP_ALIVE_ADDON);
191 logger.trace("Websocket - keep alive stop - run till {}", runTill.toString());
202 public void onBytes(InputStream is) {
204 PushMessage pm = VehicleEvents.PushMessage.parseFrom(is);
205 if (pm.hasVepUpdates()) {
206 boolean distributed = accountHandler.distributeVepUpdates(pm.getVepUpdates().getUpdatesMap());
208 AcknowledgeVEPUpdatesByVIN ack = AcknowledgeVEPUpdatesByVIN.newBuilder()
209 .setSequenceNumber(pm.getVepUpdates().getSequenceNumber()).build();
210 ClientMessage cm = ClientMessage.newBuilder().setAcknowledgeVepUpdatesByVin(ack).build();
211 sendAcknowledgeMessage(cm);
213 } else if (pm.hasAssignedVehicles()) {
214 for (int i = 0; i < pm.getAssignedVehicles().getVinsCount(); i++) {
215 String vin = pm.getAssignedVehicles().getVins(0);
216 accountHandler.discovery(vin);
218 AcknowledgeAssignedVehicles ack = AcknowledgeAssignedVehicles.newBuilder().build();
219 ClientMessage cm = ClientMessage.newBuilder().setAcknowledgeAssignedVehicles(ack).build();
220 sendAcknowledgeMessage(cm);
221 } else if (pm.hasApptwinCommandStatusUpdatesByVin()) {
222 AppTwinCommandStatusUpdatesByVIN csubv = pm.getApptwinCommandStatusUpdatesByVin();
223 accountHandler.commandStatusUpdate(csubv.getUpdatesByVinMap());
224 AcknowledgeAppTwinCommandStatusUpdatesByVIN ack = AcknowledgeAppTwinCommandStatusUpdatesByVIN
225 .newBuilder().setSequenceNumber(csubv.getSequenceNumber()).build();
226 ClientMessage cm = ClientMessage.newBuilder().setAcknowledgeApptwinCommandStatusUpdateByVin(ack)
228 sendAcknowledgeMessage(cm);
229 } else if (pm.hasApptwinPendingCommandRequest()) {
230 logger.trace("Pending Command {}", pm.getApptwinPendingCommandRequest().getAllFields());
231 } else if (pm.hasDebugMessage()) {
232 logger.trace("MB Debug Message: {}", pm.getDebugMessage().getMessage());
234 logger.trace("MB Message: {} not handled", pm.getAllFields());
236 } catch (IOException e) {
237 // don't report thing status errors here.
238 // Sometimes messages cannot be decoded which doesn't effect the overall functionality
239 logger.trace("IOException {}", e.getMessage());
240 } catch (Error err) {
241 logger.trace("Error caught {}", err.getMessage());
246 public void onDisconnect(Session session, int statusCode, String reason) {
247 logger.debug("Disconnected from server. Status {} Reason {}", statusCode, reason);
249 // ensure execution stop if disconnect was triggered from server side
254 public void onConnect(Session session) {
255 accountHandler.updateStatus(ThingStatus.ONLINE);
256 this.session = session;
260 public void onError(Throwable t) {
261 logger.warn("onError {}", t.getMessage());
262 accountHandler.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
263 "@text/mercedesme.account.status.websocket-failure");