]> git.basschouten.com Git - openhab-addons.git/blob
29d5ffff5d071785fa59a0d9ac3f5a9b6b5c7e35
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2024 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mercedesme.internal.server;
14
15 import java.io.ByteArrayOutputStream;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.net.URI;
19 import java.nio.ByteBuffer;
20 import java.time.Instant;
21 import java.util.ArrayList;
22 import java.util.List;
23
24 import org.eclipse.jdt.annotation.NonNullByDefault;
25 import org.eclipse.jdt.annotation.Nullable;
26 import org.eclipse.jetty.websocket.api.Session;
27 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
28 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
29 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
30 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
31 import org.eclipse.jetty.websocket.api.annotations.WebSocket;
32 import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
33 import org.eclipse.jetty.websocket.client.WebSocketClient;
34 import org.openhab.binding.mercedesme.internal.Constants;
35 import org.openhab.binding.mercedesme.internal.handler.AccountHandler;
36 import org.openhab.core.thing.ThingStatus;
37 import org.openhab.core.thing.ThingStatusDetail;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 import com.daimler.mbcarkit.proto.Client.ClientMessage;
42 import com.daimler.mbcarkit.proto.Protos.AcknowledgeAssignedVehicles;
43 import com.daimler.mbcarkit.proto.VehicleEvents;
44 import com.daimler.mbcarkit.proto.VehicleEvents.AcknowledgeVEPUpdatesByVIN;
45 import com.daimler.mbcarkit.proto.VehicleEvents.PushMessage;
46 import com.daimler.mbcarkit.proto.Vehicleapi.AcknowledgeAppTwinCommandStatusUpdatesByVIN;
47 import com.daimler.mbcarkit.proto.Vehicleapi.AppTwinCommandStatusUpdatesByVIN;
48
49 /**
50  * {@link MBWebsocket} as socket endpoint to communicate with Mercedes
51  *
52  * @author Bernd Weymann - Initial contribution
53  */
54 @WebSocket
55 @NonNullByDefault
56 public class MBWebsocket {
57     // timeout 14 Minutes - just below scheduling of 15 Minutes by AccountHandler
58     private static final int CONNECT_TIMEOUT_MS = 14 * 60 * 1000;
59     // standard runtime of Websocket
60     private static final int WS_RUNTIME_MS = 60 * 1000;
61     // addon time of 1 minute for a new send command
62     private static final int ADDON_MESSAGE_TIME_MS = 60 * 1000;
63     // check Socket time elapsed each second
64     private static final int CHECK_INTERVAL_MS = 1000;
65     // additional 5 minutes after keep alive
66     private static final int KEEP_ALIVE_ADDON = 5 * 60 * 1000;
67
68     private final Logger logger = LoggerFactory.getLogger(MBWebsocket.class);
69     private AccountHandler accountHandler;
70     private boolean running = false;
71     private Instant runTill = Instant.now();
72     private @Nullable Session session;
73     private List<ClientMessage> commandQueue = new ArrayList<>();
74
75     private boolean keepAlive = false;
76
77     public MBWebsocket(AccountHandler ah) {
78         accountHandler = ah;
79     }
80
81     /**
82      * Is called by
83      * - scheduler every 15 minutes
84      * - handler sending a command
85      * - handler requesting refresh
86      */
87     public void run() {
88         synchronized (this) {
89             if (running) {
90                 return;
91             } else {
92                 running = true;
93                 runTill = Instant.now().plusMillis(WS_RUNTIME_MS);
94             }
95         }
96         try {
97             WebSocketClient client = new WebSocketClient();
98             client.setMaxIdleTimeout(CONNECT_TIMEOUT_MS);
99             client.setStopTimeout(CONNECT_TIMEOUT_MS);
100             ClientUpgradeRequest request = accountHandler.getClientUpgradeRequest();
101             String websocketURL = accountHandler.getWSUri();
102             logger.trace("Websocket start {}", websocketURL);
103             if (Constants.JUNIT_TOKEN.equals(request.getHeader("Authorization"))) {
104                 // avoid unit test requesting real websocket - simply return
105                 return;
106             }
107             client.start();
108             client.connect(this, new URI(websocketURL), request);
109             while (keepAlive || Instant.now().isBefore(runTill)) {
110                 try {
111                     Thread.sleep(CHECK_INTERVAL_MS);
112                 } catch (InterruptedException ie) {
113                     logger.trace("Websocket interrupted during sleeping - stop executing");
114                     runTill = Instant.MIN;
115                 }
116                 // sends one message per second
117                 if (sendMessage()) {
118                     // add additional runtime to execute and finish command
119                     runTill = runTill.plusMillis(ADDON_MESSAGE_TIME_MS);
120                 }
121             }
122             logger.trace("Websocket stop");
123             client.stop();
124             client.destroy();
125         } catch (Throwable t) {
126             // catch Exceptions of start stop and declare communication error
127             accountHandler.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
128                     "@text/mercedesme.account.status.websocket-failure");
129             logger.warn("Websocket handling exception: {}", t.getMessage());
130         }
131         synchronized (this) {
132             running = false;
133         }
134     }
135
136     public void setCommand(ClientMessage cm) {
137         commandQueue.add(cm);
138     }
139
140     private boolean sendMessage() {
141         if (!commandQueue.isEmpty()) {
142             ClientMessage message = commandQueue.remove(0);
143             logger.trace("Send Message {}", message.getAllFields());
144             try {
145                 ByteArrayOutputStream baos = new ByteArrayOutputStream();
146                 message.writeTo(baos);
147                 if (session != null) {
148                     session.getRemote().sendBytes(ByteBuffer.wrap(baos.toByteArray()));
149                 }
150                 return true;
151             } catch (IOException e) {
152                 logger.warn("Error sending message {} : {}", message.getAllFields(), e.getMessage());
153             }
154             logger.info("Send Message {} done", message.getAllFields());
155         }
156         return false;
157     }
158
159     private void sendAcknowledgeMessage(ClientMessage message) {
160         try {
161             ByteArrayOutputStream baos = new ByteArrayOutputStream();
162             message.writeTo(baos);
163             if (session != null) {
164                 session.getRemote().sendBytes(ByteBuffer.wrap(baos.toByteArray()));
165             }
166         } catch (IOException e) {
167             logger.warn("Error sending acknowledge {} : {}", message.getAllFields(), e.getMessage());
168         }
169     }
170
171     public boolean isRunning() {
172         return running;
173     }
174
175     public void interrupt() {
176         synchronized (this) {
177             runTill = Instant.MIN;
178             keepAlive = false;
179         }
180     }
181
182     public void keepAlive(boolean b) {
183         if (!keepAlive) {
184             if (b) {
185                 logger.trace("WebSocket - keep alive start");
186             }
187         } else {
188             if (!b) {
189                 // after keep alive is finished add 5 minutes to cover e.g. door events after trip is finished
190                 runTill = Instant.now().plusMillis(KEEP_ALIVE_ADDON);
191                 logger.trace("Websocket - keep alive stop - run till {}", runTill.toString());
192             }
193         }
194         keepAlive = b;
195     }
196
197     /**
198      * endpoints
199      */
200
201     @OnWebSocketMessage
202     public void onBytes(InputStream is) {
203         try {
204             PushMessage pm = VehicleEvents.PushMessage.parseFrom(is);
205             if (pm.hasVepUpdates()) {
206                 boolean distributed = accountHandler.distributeVepUpdates(pm.getVepUpdates().getUpdatesMap());
207                 if (distributed) {
208                     AcknowledgeVEPUpdatesByVIN ack = AcknowledgeVEPUpdatesByVIN.newBuilder()
209                             .setSequenceNumber(pm.getVepUpdates().getSequenceNumber()).build();
210                     ClientMessage cm = ClientMessage.newBuilder().setAcknowledgeVepUpdatesByVin(ack).build();
211                     sendAcknowledgeMessage(cm);
212                 }
213             } else if (pm.hasAssignedVehicles()) {
214                 for (int i = 0; i < pm.getAssignedVehicles().getVinsCount(); i++) {
215                     String vin = pm.getAssignedVehicles().getVins(0);
216                     accountHandler.discovery(vin);
217                 }
218                 AcknowledgeAssignedVehicles ack = AcknowledgeAssignedVehicles.newBuilder().build();
219                 ClientMessage cm = ClientMessage.newBuilder().setAcknowledgeAssignedVehicles(ack).build();
220                 sendAcknowledgeMessage(cm);
221             } else if (pm.hasApptwinCommandStatusUpdatesByVin()) {
222                 AppTwinCommandStatusUpdatesByVIN csubv = pm.getApptwinCommandStatusUpdatesByVin();
223                 accountHandler.commandStatusUpdate(csubv.getUpdatesByVinMap());
224                 AcknowledgeAppTwinCommandStatusUpdatesByVIN ack = AcknowledgeAppTwinCommandStatusUpdatesByVIN
225                         .newBuilder().setSequenceNumber(csubv.getSequenceNumber()).build();
226                 ClientMessage cm = ClientMessage.newBuilder().setAcknowledgeApptwinCommandStatusUpdateByVin(ack)
227                         .build();
228                 sendAcknowledgeMessage(cm);
229             } else if (pm.hasApptwinPendingCommandRequest()) {
230                 logger.trace("Pending Command {}", pm.getApptwinPendingCommandRequest().getAllFields());
231             } else if (pm.hasDebugMessage()) {
232                 logger.trace("MB Debug Message: {}", pm.getDebugMessage().getMessage());
233             } else {
234                 logger.trace("MB Message: {} not handled", pm.getAllFields());
235             }
236         } catch (IOException e) {
237             // don't report thing status errors here.
238             // Sometimes messages cannot be decoded which doesn't effect the overall functionality
239             logger.trace("IOException {}", e.getMessage());
240         } catch (Error err) {
241             logger.trace("Error caught {}", err.getMessage());
242         }
243     }
244
245     @OnWebSocketClose
246     public void onDisconnect(Session session, int statusCode, String reason) {
247         logger.debug("Disconnected from server. Status {} Reason {}", statusCode, reason);
248         this.session = null;
249         // ensure execution stop if disconnect was triggered from server side
250         interrupt();
251     }
252
253     @OnWebSocketConnect
254     public void onConnect(Session session) {
255         accountHandler.updateStatus(ThingStatus.ONLINE);
256         this.session = session;
257     }
258
259     @OnWebSocketError
260     public void onError(Throwable t) {
261         logger.warn("onError {}", t.getMessage());
262         accountHandler.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
263                 "@text/mercedesme.account.status.websocket-failure");
264     }
265 }