]> git.basschouten.com Git - openhab-addons.git/blob
22cc0747ec94a9a6e0da86b9be59773069852c3f
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2021 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.miio.internal.transport;
14
15 import java.io.IOException;
16 import java.net.DatagramPacket;
17 import java.net.DatagramSocket;
18 import java.net.InetAddress;
19 import java.net.SocketException;
20 import java.net.SocketTimeoutException;
21 import java.nio.charset.StandardCharsets;
22 import java.time.Instant;
23 import java.util.Arrays;
24 import java.util.Calendar;
25 import java.util.List;
26 import java.util.NoSuchElementException;
27 import java.util.concurrent.ConcurrentLinkedQueue;
28 import java.util.concurrent.CopyOnWriteArrayList;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicInteger;
31
32 import org.eclipse.jdt.annotation.NonNullByDefault;
33 import org.eclipse.jdt.annotation.Nullable;
34 import org.openhab.binding.miio.internal.Message;
35 import org.openhab.binding.miio.internal.MiIoBindingConstants;
36 import org.openhab.binding.miio.internal.MiIoCommand;
37 import org.openhab.binding.miio.internal.MiIoCrypto;
38 import org.openhab.binding.miio.internal.MiIoCryptoException;
39 import org.openhab.binding.miio.internal.MiIoMessageListener;
40 import org.openhab.binding.miio.internal.MiIoSendCommand;
41 import org.openhab.binding.miio.internal.Utils;
42 import org.openhab.binding.miio.internal.cloud.CloudConnector;
43 import org.openhab.binding.miio.internal.cloud.MiCloudException;
44 import org.openhab.core.thing.ThingStatus;
45 import org.openhab.core.thing.ThingStatusDetail;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 import com.google.gson.JsonElement;
50 import com.google.gson.JsonObject;
51 import com.google.gson.JsonParser;
52 import com.google.gson.JsonSyntaxException;
53
54 /**
55  * The {@link MiIoAsyncCommunication} is responsible for communications with the Mi IO devices
56  *
57  * @author Marcel Verpaalen - Initial contribution
58  */
59 @NonNullByDefault
60 public class MiIoAsyncCommunication {
61
62     private static final int MSG_BUFFER_SIZE = 2048;
63
64     private final Logger logger = LoggerFactory.getLogger(MiIoAsyncCommunication.class);
65
66     private final String ip;
67     private final byte[] token;
68     private byte[] deviceId;
69     private @Nullable DatagramSocket socket;
70
71     private List<MiIoMessageListener> listeners = new CopyOnWriteArrayList<>();
72
73     private AtomicInteger id = new AtomicInteger(-1);
74     private int timeDelta;
75     private int timeStamp;
76     private final JsonParser parser;
77     private @Nullable MessageSenderThread senderThread;
78     private boolean connected;
79     private ThingStatusDetail status = ThingStatusDetail.NONE;
80     private int errorCounter;
81     private int timeout;
82     private boolean needPing = true;
83     private static final int MAX_ERRORS = 3;
84     private static final int MAX_ID = 15000;
85     private final CloudConnector cloudConnector;
86
87     private ConcurrentLinkedQueue<MiIoSendCommand> concurrentLinkedQueue = new ConcurrentLinkedQueue<>();
88
89     public MiIoAsyncCommunication(String ip, byte[] token, byte[] did, int id, int timeout,
90             CloudConnector cloudConnector) {
91         this.ip = ip;
92         this.token = token;
93         this.deviceId = did;
94         this.timeout = timeout;
95         this.cloudConnector = cloudConnector;
96         setId(id);
97         parser = new JsonParser();
98         startReceiver();
99     }
100
101     protected List<MiIoMessageListener> getListeners() {
102         return listeners;
103     }
104
105     /**
106      * Registers a {@link MiIoMessageListener} to be called back, when data is received.
107      * If no {@link MessageSenderThread} exists, when the method is called, it is being set up.
108      *
109      * @param listener {@link MiIoMessageListener} to be called back
110      */
111     public synchronized void registerListener(MiIoMessageListener listener) {
112         needPing = true;
113         startReceiver();
114         if (!getListeners().contains(listener)) {
115             logger.trace("Adding socket listener {}", listener);
116             getListeners().add(listener);
117         }
118     }
119
120     /**
121      * Unregisters a {@link MiIoMessageListener}. If there are no listeners left,
122      * the {@link MessageSenderThread} is being closed.
123      *
124      * @param listener {@link MiIoMessageListener} to be unregistered
125      */
126     public synchronized void unregisterListener(MiIoMessageListener listener) {
127         getListeners().remove(listener);
128         if (getListeners().isEmpty()) {
129             concurrentLinkedQueue.clear();
130             close();
131         }
132     }
133
134     public int queueCommand(MiIoCommand command, String cloudServer) throws MiIoCryptoException, IOException {
135         return queueCommand(command, "[]", cloudServer);
136     }
137
138     public int queueCommand(MiIoCommand command, String params, String cloudServer)
139             throws MiIoCryptoException, IOException {
140         return queueCommand(command.getCommand(), params, cloudServer);
141     }
142
143     public int queueCommand(String command, String params, String cloudServer)
144             throws MiIoCryptoException, IOException, JsonSyntaxException {
145         try {
146             JsonObject fullCommand = new JsonObject();
147             int cmdId = id.incrementAndGet();
148             if (cmdId > MAX_ID) {
149                 id.set(0);
150             }
151             fullCommand.addProperty("id", cmdId);
152             fullCommand.addProperty("method", command);
153             fullCommand.add("params", parser.parse(params));
154             MiIoSendCommand sendCmd = new MiIoSendCommand(cmdId, MiIoCommand.getCommand(command), fullCommand,
155                     cloudServer);
156             concurrentLinkedQueue.add(sendCmd);
157             if (logger.isDebugEnabled()) {
158                 // Obfuscate part of the token to allow sharing of the logfiles
159                 String tokenText = Utils.obfuscateToken(Utils.getHex(token));
160                 logger.debug("Command added to Queue {} -> {} (Device: {} token: {} Queue: {}).{}{}",
161                         fullCommand.toString(), ip, Utils.getHex(deviceId), tokenText, concurrentLinkedQueue.size(),
162                         cloudServer.isBlank() ? "" : " Send via cloudserver: ", cloudServer);
163             }
164             if (needPing && cloudServer.isBlank()) {
165                 sendPing(ip);
166             }
167             return cmdId;
168         } catch (JsonSyntaxException e) {
169             logger.warn("Send command '{}' with parameters {} -> {} (Device: {}) gave error {}", command, params, ip,
170                     Utils.getHex(deviceId), e.getMessage());
171             throw e;
172         }
173     }
174
175     MiIoSendCommand sendMiIoSendCommand(MiIoSendCommand miIoSendCommand) {
176         String errorMsg = "Unknown Error while sending command";
177         String decryptedResponse = "";
178         try {
179             if (miIoSendCommand.getCloudServer().isBlank()) {
180                 decryptedResponse = sendCommand(miIoSendCommand.getCommandString(), token, ip, deviceId);
181             } else {
182                 decryptedResponse = cloudConnector.sendRPCCommand(Utils.getHex(deviceId),
183                         miIoSendCommand.getCloudServer(), miIoSendCommand);
184                 logger.debug("Command {} send via cloudserver {}", miIoSendCommand.getCommandString(),
185                         miIoSendCommand.getCloudServer());
186                 updateStatus(ThingStatus.ONLINE, ThingStatusDetail.NONE);
187             }
188             // hack due to avoid invalid json errors from some misbehaving device firmwares
189             decryptedResponse = decryptedResponse.replace(",,", ",");
190             JsonElement response;
191             response = parser.parse(decryptedResponse);
192             if (!response.isJsonObject()) {
193                 errorMsg = "Received message is not a JSON object ";
194             } else {
195                 needPing = false;
196                 logger.trace("Received  JSON message {}", response.toString());
197                 JsonObject resJson = response.getAsJsonObject();
198                 if (resJson.has("id")) {
199                     int id = resJson.get("id").getAsInt();
200                     if (id == miIoSendCommand.getId()) {
201                         miIoSendCommand.setResponse(response.getAsJsonObject());
202                         return miIoSendCommand;
203                     } else {
204                         if (id < miIoSendCommand.getId()) {
205                             errorMsg = String.format(
206                                     "Received message out of sync, extend timeout time. Expected id: %d, received id: %d",
207                                     miIoSendCommand.getId(), id);
208                         } else {
209                             errorMsg = String.format("Received message out of sync. Expected id: %d, received id: %d",
210                                     miIoSendCommand.getId(), id);
211                         }
212                     }
213                 } else {
214                     errorMsg = "Received message is without id";
215                 }
216
217             }
218             logger.debug("{}: {}", errorMsg, decryptedResponse);
219         } catch (MiIoCryptoException | IOException e) {
220             logger.debug("Send command '{}'  -> {} (Device: {}) gave error {}", miIoSendCommand.getCommandString(), ip,
221                     Utils.getHex(deviceId), e.getMessage());
222             errorMsg = e.getMessage();
223         } catch (JsonSyntaxException e) {
224             logger.warn("Could not parse '{}' <- {} (Device: {}) gave error {}", decryptedResponse,
225                     miIoSendCommand.getCommandString(), Utils.getHex(deviceId), e.getMessage());
226             errorMsg = "Received message is invalid JSON";
227         } catch (MiCloudException e) {
228             logger.debug("Send command '{}'  -> cloudserver '{}' (Device: {}) gave error {}",
229                     miIoSendCommand.getCommandString(), miIoSendCommand.getCloudServer(), Utils.getHex(deviceId),
230                     e.getMessage());
231             errorMsg = e.getMessage();
232             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR);
233         }
234         JsonObject erroResp = new JsonObject();
235         erroResp.addProperty("error", errorMsg);
236         miIoSendCommand.setResponse(erroResp);
237         return miIoSendCommand;
238     }
239
240     public synchronized void startReceiver() {
241         MessageSenderThread senderThread = this.senderThread;
242         if (senderThread == null || !senderThread.isAlive()) {
243             senderThread = new MessageSenderThread();
244             senderThread.start();
245             this.senderThread = senderThread;
246         }
247     }
248
249     /**
250      * The {@link MessageSenderThread} is responsible for consuming messages from the queue and sending these to the
251      * device
252      *
253      */
254     private class MessageSenderThread extends Thread {
255         public MessageSenderThread() {
256             super("Mi IO MessageSenderThread");
257             setDaemon(true);
258         }
259
260         @Override
261         public void run() {
262             logger.debug("Starting Mi IO MessageSenderThread");
263             while (!interrupted()) {
264                 try {
265                     if (concurrentLinkedQueue.isEmpty()) {
266                         Thread.sleep(100);
267                         continue;
268                     }
269                     MiIoSendCommand queuedMessage = concurrentLinkedQueue.remove();
270                     MiIoSendCommand miIoSendCommand = sendMiIoSendCommand(queuedMessage);
271                     for (MiIoMessageListener listener : listeners) {
272                         logger.trace("inform listener {}, data {} from {}", listener, queuedMessage, miIoSendCommand);
273                         try {
274                             listener.onMessageReceived(miIoSendCommand);
275                         } catch (Exception e) {
276                             logger.debug("Could not inform listener {}: {}: ", listener, e.getMessage(), e);
277                         }
278                     }
279                 } catch (NoSuchElementException e) {
280                     // ignore
281                 } catch (InterruptedException e) {
282                     // That's our signal to stop
283                     break;
284                 } catch (Exception e) {
285                     logger.warn("Error while polling/sending message", e);
286                 }
287             }
288             closeSocket();
289             logger.debug("Finished Mi IO MessageSenderThread");
290         }
291     }
292
293     private String sendCommand(String command, byte[] token, String ip, byte[] deviceId)
294             throws MiIoCryptoException, IOException {
295         byte[] sendMsg = new byte[0];
296         if (!command.isBlank()) {
297             byte[] encr;
298             encr = MiIoCrypto.encrypt(command.getBytes(StandardCharsets.UTF_8), token);
299             timeStamp = (int) Instant.now().getEpochSecond();
300             sendMsg = Message.createMsgData(encr, token, deviceId, timeStamp + timeDelta);
301         }
302         Message miIoResponseMsg = sendData(sendMsg, ip);
303         if (miIoResponseMsg == null) {
304             if (logger.isTraceEnabled()) {
305                 logger.trace("No response from device {} at {} for command {}.\r\n{}", Utils.getHex(deviceId), ip,
306                         command, (new Message(sendMsg)).toSting());
307             } else {
308                 logger.debug("No response from device {} at {} for command {}.", Utils.getHex(deviceId), ip, command);
309             }
310             errorCounter++;
311             if (errorCounter > MAX_ERRORS) {
312                 status = ThingStatusDetail.CONFIGURATION_ERROR;
313                 sendPing(ip);
314             }
315             return "{\"error\":\"No Response\"}";
316         }
317         if (!miIoResponseMsg.isChecksumValid()) {
318             return "{\"error\":\"Message has invalid checksum\"}";
319         }
320         if (errorCounter > 0) {
321             errorCounter = 0;
322             status = ThingStatusDetail.NONE;
323             updateStatus(ThingStatus.ONLINE, status);
324         }
325         if (!connected) {
326             pingSuccess();
327         }
328         String decryptedResponse = new String(MiIoCrypto.decrypt(miIoResponseMsg.getData(), token), "UTF-8").trim();
329         logger.trace("Received response from {}: {}", ip, decryptedResponse);
330         return decryptedResponse;
331     }
332
333     public @Nullable Message sendPing(String ip) throws IOException {
334         for (int i = 0; i < 3; i++) {
335             logger.debug("Sending Ping {} ({})", Utils.getHex(deviceId), ip);
336             Message resp = sendData(MiIoBindingConstants.DISCOVER_STRING, ip);
337             if (resp != null) {
338                 pingSuccess();
339                 return resp;
340             }
341         }
342         pingFail();
343         return null;
344     }
345
346     private void pingFail() {
347         logger.debug("Ping {} ({}) failed", Utils.getHex(deviceId), ip);
348         connected = false;
349         status = ThingStatusDetail.COMMUNICATION_ERROR;
350         updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR);
351     }
352
353     private void pingSuccess() {
354         logger.debug("Ping {} ({}) success", Utils.getHex(deviceId), ip);
355         if (!connected) {
356             connected = true;
357             status = ThingStatusDetail.NONE;
358             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE);
359         } else {
360             if (ThingStatusDetail.CONFIGURATION_ERROR.equals(status)) {
361                 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR);
362             } else {
363                 status = ThingStatusDetail.NONE;
364                 updateStatus(ThingStatus.ONLINE, status);
365             }
366         }
367     }
368
369     private void updateStatus(ThingStatus status, ThingStatusDetail statusDetail) {
370         for (MiIoMessageListener listener : listeners) {
371             logger.trace("inform listener {}, data {} from {}", listener, status, statusDetail);
372             try {
373                 listener.onStatusUpdated(status, statusDetail);
374             } catch (Exception e) {
375                 logger.debug("Could not inform listener {}: {}", listener, e.getMessage(), e);
376             }
377         }
378     }
379
380     private @Nullable Message sendData(byte[] sendMsg, String ip) throws IOException {
381         byte[] response = comms(sendMsg, ip);
382         if (response.length >= 32) {
383             Message miIoResponse = new Message(response);
384             timeStamp = (int) TimeUnit.MILLISECONDS.toSeconds(Calendar.getInstance().getTime().getTime());
385             timeDelta = miIoResponse.getTimestampAsInt() - timeStamp;
386             logger.trace("Message Details:{} ", miIoResponse.toSting());
387             return miIoResponse;
388         } else {
389             logger.trace("Reponse length <32 : {}", response.length);
390             return null;
391         }
392     }
393
394     private synchronized byte[] comms(byte[] message, String ip) throws IOException {
395         InetAddress ipAddress = InetAddress.getByName(ip);
396         DatagramSocket clientSocket = getSocket();
397         DatagramPacket receivePacket = new DatagramPacket(new byte[MSG_BUFFER_SIZE], MSG_BUFFER_SIZE);
398         try {
399             logger.trace("Connection {}:{}", ip, clientSocket.getLocalPort());
400             if (message.length > 0) {
401                 byte[] sendData = new byte[MSG_BUFFER_SIZE];
402                 sendData = message;
403                 DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length, ipAddress,
404                         MiIoBindingConstants.PORT);
405                 clientSocket.send(sendPacket);
406                 sendPacket.setData(new byte[MSG_BUFFER_SIZE]);
407             }
408             clientSocket.receive(receivePacket);
409             byte[] response = Arrays.copyOfRange(receivePacket.getData(), receivePacket.getOffset(),
410                     receivePacket.getOffset() + receivePacket.getLength());
411             return response;
412         } catch (SocketTimeoutException e) {
413             logger.debug("Communication error for Mi device at {}: {}", ip, e.getMessage());
414             needPing = true;
415             return new byte[0];
416         }
417     }
418
419     private DatagramSocket getSocket() throws SocketException {
420         @Nullable
421         DatagramSocket socket = this.socket;
422         if (socket == null || socket.isClosed()) {
423             socket = new DatagramSocket();
424             socket.setSoTimeout(timeout);
425             logger.debug("Opening socket on port: {} ", socket.getLocalPort());
426             this.socket = socket;
427             return socket;
428         } else {
429             return socket;
430         }
431     }
432
433     public void close() {
434         try {
435             final MessageSenderThread senderThread = this.senderThread;
436             if (senderThread != null) {
437                 senderThread.interrupt();
438             }
439         } catch (SecurityException e) {
440             logger.debug("Error while closing: {} ", e.getMessage());
441         }
442         closeSocket();
443     }
444
445     public void closeSocket() {
446         try {
447             final DatagramSocket socket = this.socket;
448             if (socket != null) {
449                 logger.debug("Closing socket for port: {} ", socket.getLocalPort());
450                 socket.close();
451                 this.socket = null;
452             }
453         } catch (SecurityException e) {
454             logger.debug("Error while closing: {} ", e.getMessage());
455         }
456     }
457
458     /**
459      * @return the id
460      */
461     public int getId() {
462         return id.incrementAndGet();
463     }
464
465     /**
466      * @param id the id to set
467      */
468     public void setId(int id) {
469         this.id.set(id);
470     }
471
472     /**
473      * Time delta between device time and server time
474      *
475      * @return delta
476      */
477     public int getTimeDelta() {
478         return timeDelta;
479     }
480
481     public byte[] getDeviceId() {
482         return deviceId;
483     }
484
485     public void setDeviceId(byte[] deviceId) {
486         this.deviceId = deviceId;
487     }
488
489     public int getQueueLength() {
490         return concurrentLinkedQueue.size();
491     }
492 }