]> git.basschouten.com Git - openhab-addons.git/blob
1bb9018b4a92cf9a88de14d5cdda396aff4212f6
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2021 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.miio.internal.transport;
14
15 import java.io.IOException;
16 import java.net.DatagramPacket;
17 import java.net.DatagramSocket;
18 import java.net.InetAddress;
19 import java.net.SocketException;
20 import java.net.SocketTimeoutException;
21 import java.nio.charset.StandardCharsets;
22 import java.time.Instant;
23 import java.util.Arrays;
24 import java.util.Calendar;
25 import java.util.List;
26 import java.util.NoSuchElementException;
27 import java.util.concurrent.ConcurrentLinkedQueue;
28 import java.util.concurrent.CopyOnWriteArrayList;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicInteger;
31
32 import org.eclipse.jdt.annotation.NonNullByDefault;
33 import org.eclipse.jdt.annotation.Nullable;
34 import org.openhab.binding.miio.internal.Message;
35 import org.openhab.binding.miio.internal.MiIoBindingConstants;
36 import org.openhab.binding.miio.internal.MiIoCommand;
37 import org.openhab.binding.miio.internal.MiIoCrypto;
38 import org.openhab.binding.miio.internal.MiIoCryptoException;
39 import org.openhab.binding.miio.internal.MiIoMessageListener;
40 import org.openhab.binding.miio.internal.MiIoSendCommand;
41 import org.openhab.binding.miio.internal.Utils;
42 import org.openhab.binding.miio.internal.cloud.CloudConnector;
43 import org.openhab.binding.miio.internal.cloud.MiCloudException;
44 import org.openhab.core.thing.ThingStatus;
45 import org.openhab.core.thing.ThingStatusDetail;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 import com.google.gson.JsonElement;
50 import com.google.gson.JsonObject;
51 import com.google.gson.JsonParser;
52 import com.google.gson.JsonSyntaxException;
53
54 /**
55  * The {@link MiIoAsyncCommunication} is responsible for communications with the Mi IO devices
56  *
57  * @author Marcel Verpaalen - Initial contribution
58  */
59 @NonNullByDefault
60 public class MiIoAsyncCommunication {
61
62     private static final int MSG_BUFFER_SIZE = 2048;
63
64     private final Logger logger = LoggerFactory.getLogger(MiIoAsyncCommunication.class);
65
66     private final String ip;
67     private final byte[] token;
68     private String deviceId;
69     private @Nullable DatagramSocket socket;
70
71     private List<MiIoMessageListener> listeners = new CopyOnWriteArrayList<>();
72
73     private AtomicInteger id = new AtomicInteger(-1);
74     private int timeDelta;
75     private int timeStamp;
76     private @Nullable MessageSenderThread senderThread;
77     private boolean connected;
78     private ThingStatusDetail status = ThingStatusDetail.NONE;
79     private int errorCounter;
80     private int timeout;
81     private boolean needPing = true;
82     private static final int MAX_ERRORS = 3;
83     private static final int MAX_ID = 15000;
84     private final CloudConnector cloudConnector;
85
86     private ConcurrentLinkedQueue<MiIoSendCommand> concurrentLinkedQueue = new ConcurrentLinkedQueue<>();
87
88     public MiIoAsyncCommunication(String ip, byte[] token, String did, int id, int timeout,
89             CloudConnector cloudConnector) {
90         this.ip = ip;
91         this.token = token;
92         this.deviceId = did;
93         this.timeout = timeout;
94         this.cloudConnector = cloudConnector;
95         setId(id);
96     }
97
98     protected List<MiIoMessageListener> getListeners() {
99         return listeners;
100     }
101
102     /**
103      * Registers a {@link MiIoMessageListener} to be called back, when data is received.
104      * If no {@link MessageSenderThread} exists, when the method is called, it is being set up.
105      *
106      * @param listener {@link MiIoMessageListener} to be called back
107      */
108     public synchronized void registerListener(MiIoMessageListener listener) {
109         needPing = true;
110         startReceiver();
111         if (!getListeners().contains(listener)) {
112             logger.trace("Adding socket listener {}", listener);
113             getListeners().add(listener);
114         }
115     }
116
117     /**
118      * Unregisters a {@link MiIoMessageListener}. If there are no listeners left,
119      * the {@link MessageSenderThread} is being closed.
120      *
121      * @param listener {@link MiIoMessageListener} to be unregistered
122      */
123     public synchronized void unregisterListener(MiIoMessageListener listener) {
124         getListeners().remove(listener);
125         if (getListeners().isEmpty()) {
126             concurrentLinkedQueue.clear();
127             close();
128         }
129     }
130
131     public int queueCommand(String command, String params, String cloudServer, String sender)
132             throws MiIoCryptoException, IOException, JsonSyntaxException {
133         try {
134             JsonObject fullCommand = new JsonObject();
135             int cmdId = id.incrementAndGet();
136             if (cmdId > MAX_ID) {
137                 id.set(0);
138             }
139             if (command.startsWith("{") && command.endsWith("}")) {
140                 fullCommand = JsonParser.parseString(command).getAsJsonObject();
141                 fullCommand.addProperty("id", cmdId);
142                 if (!fullCommand.has("params") && !params.isBlank()) {
143                     fullCommand.add("params", JsonParser.parseString(params));
144                 }
145             } else {
146                 fullCommand.addProperty("id", cmdId);
147                 fullCommand.addProperty("method", command);
148                 fullCommand.add("params", JsonParser.parseString(params));
149             }
150             MiIoSendCommand sendCmd = new MiIoSendCommand(cmdId, MiIoCommand.getCommand(command), fullCommand,
151                     cloudServer, sender);
152             concurrentLinkedQueue.add(sendCmd);
153             if (logger.isDebugEnabled()) {
154                 // Obfuscate part of the token to allow sharing of the logfiles
155                 String tokenText = Utils.obfuscateToken(Utils.getHex(token));
156                 logger.debug("Command added to Queue {} -> {} (Device: {} token: {} Queue: {}).{}{}",
157                         fullCommand.toString(), ip, deviceId, tokenText, concurrentLinkedQueue.size(),
158                         cloudServer.isBlank() ? "" : " Send via cloudserver: ", cloudServer);
159             }
160             if (needPing && cloudServer.isBlank()) {
161                 sendPing(ip);
162             }
163             return cmdId;
164         } catch (JsonSyntaxException | IllegalStateException e) {
165             logger.warn("Send command '{}' with parameters {} -> {} (Device: {}) gave error {}", command, params, ip,
166                     deviceId, e.getMessage());
167             throw e;
168         }
169     }
170
171     MiIoSendCommand sendMiIoSendCommand(MiIoSendCommand miIoSendCommand) {
172         String errorMsg = "Unknown Error while sending command";
173         String decryptedResponse = "";
174         try {
175             if (miIoSendCommand.getCloudServer().isBlank()) {
176                 decryptedResponse = sendCommand(miIoSendCommand.getCommandString(), token, ip, deviceId);
177             } else {
178                 if (!miIoSendCommand.getMethod().startsWith("/")) {
179                     decryptedResponse = cloudConnector.sendRPCCommand(Utils.getHexId(deviceId),
180                             miIoSendCommand.getCloudServer(), miIoSendCommand);
181                     logger.debug("Command {} send via cloudserver {}", miIoSendCommand.getCommandString(),
182                             miIoSendCommand.getCloudServer());
183                     updateStatus(ThingStatus.ONLINE, ThingStatusDetail.NONE);
184                 } else {
185                     String data = miIoSendCommand.getParams().isJsonArray()
186                             && miIoSendCommand.getParams().getAsJsonArray().size() > 0
187                                     ? miIoSendCommand.getParams().getAsJsonArray().get(0).toString()
188                                     : "";
189                     logger.debug("Custom cloud request send to url '{}' with data '{}'", miIoSendCommand.getMethod(),
190                             data);
191                     decryptedResponse = cloudConnector.sendCloudCommand(miIoSendCommand.getMethod(),
192                             miIoSendCommand.getCloudServer(), data);
193                     miIoSendCommand.setResponse(JsonParser.parseString(decryptedResponse).getAsJsonObject());
194                     return miIoSendCommand;
195                 }
196             }
197             // hack due to avoid invalid json errors from some misbehaving device firmwares
198             decryptedResponse = decryptedResponse.replace(",,", ",");
199             JsonElement response;
200             response = JsonParser.parseString(decryptedResponse);
201             if (!response.isJsonObject()) {
202                 errorMsg = "Received message is not a JSON object ";
203             } else {
204                 needPing = false;
205                 logger.trace("Received  JSON message {}", response.toString());
206                 JsonObject resJson = response.getAsJsonObject();
207                 if (resJson.has("id")) {
208                     int id = resJson.get("id").getAsInt();
209                     if (id == miIoSendCommand.getId()) {
210                         miIoSendCommand.setResponse(response.getAsJsonObject());
211                         return miIoSendCommand;
212                     } else {
213                         if (id < miIoSendCommand.getId()) {
214                             errorMsg = String.format(
215                                     "Received message out of sync, extend timeout time. Expected id: %d, received id: %d",
216                                     miIoSendCommand.getId(), id);
217                         } else {
218                             errorMsg = String.format("Received message out of sync. Expected id: %d, received id: %d",
219                                     miIoSendCommand.getId(), id);
220                         }
221                     }
222                 } else {
223                     errorMsg = "Received message is without id";
224                 }
225
226             }
227             logger.debug("{}: {}", errorMsg, decryptedResponse);
228         } catch (MiIoCryptoException | IOException e) {
229             logger.debug("Send command '{}'  -> {} (Device: {}) gave error {}", miIoSendCommand.getCommandString(), ip,
230                     deviceId, e.getMessage());
231             errorMsg = e.getMessage();
232         } catch (JsonSyntaxException e) {
233             logger.warn("Could not parse '{}' <- {} (Device: {}) gave error {}", decryptedResponse,
234                     miIoSendCommand.getCommandString(), deviceId, e.getMessage());
235             errorMsg = "Received message is invalid JSON";
236         } catch (MiCloudException e) {
237             logger.debug("Send command '{}'  -> cloudserver '{}' (Device: {}) gave error {}",
238                     miIoSendCommand.getCommandString(), miIoSendCommand.getCloudServer(), deviceId, e.getMessage());
239             errorMsg = e.getMessage();
240             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR);
241         }
242         JsonObject erroResp = new JsonObject();
243         erroResp.addProperty("error", errorMsg);
244         miIoSendCommand.setResponse(erroResp);
245         return miIoSendCommand;
246     }
247
248     public synchronized void startReceiver() {
249         MessageSenderThread senderThread = this.senderThread;
250         if (senderThread == null || !senderThread.isAlive()) {
251             senderThread = new MessageSenderThread(deviceId.isBlank() ? "?" + ip : deviceId);
252             senderThread.start();
253             this.senderThread = senderThread;
254         }
255     }
256
257     /**
258      * The {@link MessageSenderThread} is responsible for consuming messages from the queue and sending these to the
259      * device
260      *
261      */
262     private class MessageSenderThread extends Thread {
263         private final String deviceId;
264
265         public MessageSenderThread(String deviceId) {
266             super("OH-binding-miio-MessageSenderThread-" + deviceId);
267             setDaemon(true);
268             this.deviceId = deviceId;
269         }
270
271         @Override
272         public void run() {
273             logger.debug("Starting Mi IO MessageSenderThread {}", deviceId);
274             while (!interrupted()) {
275                 try {
276                     if (concurrentLinkedQueue.isEmpty()) {
277                         Thread.sleep(100);
278                         continue;
279                     }
280                     MiIoSendCommand queuedMessage = concurrentLinkedQueue.remove();
281                     MiIoSendCommand miIoSendCommand = sendMiIoSendCommand(queuedMessage);
282                     for (MiIoMessageListener listener : listeners) {
283                         logger.trace("inform listener {}, data {} from {}", listener, queuedMessage, miIoSendCommand);
284                         try {
285                             listener.onMessageReceived(miIoSendCommand);
286                         } catch (Exception e) {
287                             logger.debug("Could not inform listener {}: {}: ", listener, e.getMessage(), e);
288                         }
289                     }
290                 } catch (NoSuchElementException e) {
291                     // ignore
292                 } catch (InterruptedException e) {
293                     // That's our signal to stop
294                     break;
295                 } catch (Exception e) {
296                     logger.warn("Error while polling/sending message for {}", deviceId, e);
297                 }
298             }
299             closeSocket();
300             logger.debug("Finished Mi IO MessageSenderThread {}", deviceId);
301         }
302     }
303
304     private String sendCommand(String command, byte[] token, String ip, String deviceId)
305             throws MiIoCryptoException, IOException {
306         byte[] sendMsg = new byte[0];
307         if (!command.isBlank()) {
308             byte[] encr;
309             encr = MiIoCrypto.encrypt(command.getBytes(StandardCharsets.UTF_8), token);
310             timeStamp = (int) Instant.now().getEpochSecond();
311             sendMsg = Message.createMsgData(encr, token, Utils.hexStringToByteArray(Utils.getHexId(deviceId)),
312                     timeStamp + timeDelta);
313         }
314         Message miIoResponseMsg = sendData(sendMsg, ip);
315         if (miIoResponseMsg == null) {
316             if (logger.isTraceEnabled()) {
317                 logger.trace("No response from device {} at {} for command {}.\r\n{}", deviceId, ip, command,
318                         (new Message(sendMsg)).toSting());
319             } else {
320                 logger.debug("No response from device {} at {} for command {}.", deviceId, ip, command);
321             }
322             errorCounter++;
323             if (errorCounter > MAX_ERRORS) {
324                 status = ThingStatusDetail.CONFIGURATION_ERROR;
325                 sendPing(ip);
326             }
327             return "{\"error\":\"No Response\"}";
328         }
329         if (!miIoResponseMsg.isChecksumValid()) {
330             return "{\"error\":\"Message has invalid checksum\"}";
331         }
332         if (errorCounter > 0) {
333             errorCounter = 0;
334             status = ThingStatusDetail.NONE;
335             updateStatus(ThingStatus.ONLINE, status);
336         }
337         if (!connected) {
338             pingSuccess();
339         }
340         String decryptedResponse = new String(MiIoCrypto.decrypt(miIoResponseMsg.getData(), token), "UTF-8").trim();
341         logger.trace("Received response from {}: {}", ip, decryptedResponse);
342         return decryptedResponse;
343     }
344
345     public @Nullable Message sendPing(String ip) throws IOException {
346         for (int i = 0; i < 3; i++) {
347             logger.debug("Sending Ping to device '{}' ({})", deviceId, ip);
348             Message resp = sendData(MiIoBindingConstants.DISCOVER_STRING, ip);
349             if (resp != null) {
350                 pingSuccess();
351                 return resp;
352             }
353         }
354         pingFail();
355         return null;
356     }
357
358     private void pingFail() {
359         logger.debug("Ping to device '{}' ({}) failed", deviceId, ip);
360         connected = false;
361         status = ThingStatusDetail.COMMUNICATION_ERROR;
362         updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR);
363     }
364
365     private void pingSuccess() {
366         logger.debug("Ping to device '{}' ({}) success", deviceId, ip);
367         if (!connected) {
368             connected = true;
369             status = ThingStatusDetail.NONE;
370             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE);
371         } else {
372             if (ThingStatusDetail.CONFIGURATION_ERROR.equals(status)) {
373                 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR);
374             } else {
375                 status = ThingStatusDetail.NONE;
376                 updateStatus(ThingStatus.ONLINE, status);
377             }
378         }
379     }
380
381     private void updateStatus(ThingStatus status, ThingStatusDetail statusDetail) {
382         for (MiIoMessageListener listener : listeners) {
383             logger.trace("inform listener {}, data {} from {}", listener, status, statusDetail);
384             try {
385                 listener.onStatusUpdated(status, statusDetail);
386             } catch (Exception e) {
387                 logger.debug("Could not inform listener {}: {}", listener, e.getMessage(), e);
388             }
389         }
390     }
391
392     private @Nullable Message sendData(byte[] sendMsg, String ip) throws IOException {
393         byte[] response = comms(sendMsg, ip);
394         if (response.length >= 32) {
395             Message miIoResponse = new Message(response);
396             timeStamp = (int) TimeUnit.MILLISECONDS.toSeconds(Calendar.getInstance().getTime().getTime());
397             timeDelta = miIoResponse.getTimestampAsInt() - timeStamp;
398             logger.trace("Message Details:{} ", miIoResponse.toSting());
399             return miIoResponse;
400         } else {
401             logger.trace("Reponse length <32 : {}", response.length);
402             return null;
403         }
404     }
405
406     private synchronized byte[] comms(byte[] message, String ip) throws IOException {
407         InetAddress ipAddress = InetAddress.getByName(ip);
408         DatagramSocket clientSocket = getSocket();
409         DatagramPacket receivePacket = new DatagramPacket(new byte[MSG_BUFFER_SIZE], MSG_BUFFER_SIZE);
410         try {
411             logger.trace("Connection {}:{}", ip, clientSocket.getLocalPort());
412             if (message.length > 0) {
413                 byte[] sendData = new byte[MSG_BUFFER_SIZE];
414                 sendData = message;
415                 DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length, ipAddress,
416                         MiIoBindingConstants.PORT);
417                 clientSocket.send(sendPacket);
418                 sendPacket.setData(new byte[MSG_BUFFER_SIZE]);
419             }
420             clientSocket.receive(receivePacket);
421             byte[] response = Arrays.copyOfRange(receivePacket.getData(), receivePacket.getOffset(),
422                     receivePacket.getOffset() + receivePacket.getLength());
423             return response;
424         } catch (SocketTimeoutException e) {
425             logger.debug("Communication error for Mi device at {}: {}", ip, e.getMessage());
426             needPing = true;
427             return new byte[0];
428         }
429     }
430
431     private DatagramSocket getSocket() throws SocketException {
432         @Nullable
433         DatagramSocket socket = this.socket;
434         if (socket == null || socket.isClosed()) {
435             socket = new DatagramSocket();
436             socket.setSoTimeout(timeout);
437             logger.debug("Opening socket on port: {} ({} {})", socket.getLocalPort(), deviceId, ip);
438             this.socket = socket;
439             return socket;
440         } else {
441             return socket;
442         }
443     }
444
445     public void close() {
446         try {
447             final MessageSenderThread senderThread = this.senderThread;
448             if (senderThread != null) {
449                 senderThread.interrupt();
450             }
451         } catch (SecurityException e) {
452             logger.debug("Error while closing: {} ", e.getMessage());
453         }
454         closeSocket();
455     }
456
457     public void closeSocket() {
458         try {
459             final DatagramSocket socket = this.socket;
460             if (socket != null) {
461                 logger.debug("Closing socket for port: {} ", socket.getLocalPort());
462                 socket.close();
463                 this.socket = null;
464             }
465         } catch (SecurityException e) {
466             logger.debug("Error while closing: {} ", e.getMessage());
467         }
468     }
469
470     /**
471      * @return the id
472      */
473     public int getId() {
474         return id.incrementAndGet();
475     }
476
477     /**
478      * @param id the id to set
479      */
480     public void setId(int id) {
481         this.id.set(id);
482     }
483
484     /**
485      * Time delta between device time and server time
486      *
487      * @return delta
488      */
489     public int getTimeDelta() {
490         return timeDelta;
491     }
492
493     public String getDeviceId() {
494         return deviceId;
495     }
496
497     public void setDeviceId(String deviceId) {
498         this.deviceId = deviceId;
499         MessageSenderThread senderThread = this.senderThread;
500         if (senderThread != null) {
501             senderThread.setName("OH-binding-miio-MessageSenderThread-" + deviceId);
502         }
503     }
504
505     public int getQueueLength() {
506         return concurrentLinkedQueue.size();
507     }
508 }