]> git.basschouten.com Git - openhab-addons.git/blob
18c9d653c65a3a22dace5328b2bd602f41ec9772
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2021 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.miio.internal.transport;
14
15 import java.io.IOException;
16 import java.net.DatagramPacket;
17 import java.net.DatagramSocket;
18 import java.net.InetAddress;
19 import java.net.SocketException;
20 import java.net.SocketTimeoutException;
21 import java.util.Arrays;
22 import java.util.Calendar;
23 import java.util.List;
24 import java.util.NoSuchElementException;
25 import java.util.concurrent.ConcurrentLinkedQueue;
26 import java.util.concurrent.CopyOnWriteArrayList;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.atomic.AtomicInteger;
29
30 import org.eclipse.jdt.annotation.NonNullByDefault;
31 import org.eclipse.jdt.annotation.Nullable;
32 import org.openhab.binding.miio.internal.Message;
33 import org.openhab.binding.miio.internal.MiIoBindingConstants;
34 import org.openhab.binding.miio.internal.MiIoCommand;
35 import org.openhab.binding.miio.internal.MiIoCrypto;
36 import org.openhab.binding.miio.internal.MiIoCryptoException;
37 import org.openhab.binding.miio.internal.MiIoMessageListener;
38 import org.openhab.binding.miio.internal.MiIoSendCommand;
39 import org.openhab.binding.miio.internal.Utils;
40 import org.openhab.binding.miio.internal.cloud.CloudConnector;
41 import org.openhab.binding.miio.internal.cloud.MiCloudException;
42 import org.openhab.core.thing.ThingStatus;
43 import org.openhab.core.thing.ThingStatusDetail;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 import com.google.gson.JsonElement;
48 import com.google.gson.JsonObject;
49 import com.google.gson.JsonParser;
50 import com.google.gson.JsonSyntaxException;
51
52 /**
53  * The {@link MiIoAsyncCommunication} is responsible for communications with the Mi IO devices
54  *
55  * @author Marcel Verpaalen - Initial contribution
56  */
57 @NonNullByDefault
58 public class MiIoAsyncCommunication {
59
60     private static final int MSG_BUFFER_SIZE = 2048;
61
62     private final Logger logger = LoggerFactory.getLogger(MiIoAsyncCommunication.class);
63
64     private final String ip;
65     private final byte[] token;
66     private byte[] deviceId;
67     private @Nullable DatagramSocket socket;
68
69     private List<MiIoMessageListener> listeners = new CopyOnWriteArrayList<>();
70
71     private AtomicInteger id = new AtomicInteger(-1);
72     private int timeDelta;
73     private int timeStamp;
74     private final JsonParser parser;
75     private @Nullable MessageSenderThread senderThread;
76     private boolean connected;
77     private ThingStatusDetail status = ThingStatusDetail.NONE;
78     private int errorCounter;
79     private int timeout;
80     private boolean needPing = true;
81     private static final int MAX_ERRORS = 3;
82     private static final int MAX_ID = 15000;
83     private final CloudConnector cloudConnector;
84
85     private ConcurrentLinkedQueue<MiIoSendCommand> concurrentLinkedQueue = new ConcurrentLinkedQueue<>();
86
87     public MiIoAsyncCommunication(String ip, byte[] token, byte[] did, int id, int timeout,
88             CloudConnector cloudConnector) {
89         this.ip = ip;
90         this.token = token;
91         this.deviceId = did;
92         this.timeout = timeout;
93         this.cloudConnector = cloudConnector;
94         setId(id);
95         parser = new JsonParser();
96         startReceiver();
97     }
98
99     protected List<MiIoMessageListener> getListeners() {
100         return listeners;
101     }
102
103     /**
104      * Registers a {@link MiIoMessageListener} to be called back, when data is received.
105      * If no {@link MessageSenderThread} exists, when the method is called, it is being set up.
106      *
107      * @param listener {@link MiIoMessageListener} to be called back
108      */
109     public synchronized void registerListener(MiIoMessageListener listener) {
110         needPing = true;
111         startReceiver();
112         if (!getListeners().contains(listener)) {
113             logger.trace("Adding socket listener {}", listener);
114             getListeners().add(listener);
115         }
116     }
117
118     /**
119      * Unregisters a {@link MiIoMessageListener}. If there are no listeners left,
120      * the {@link MessageSenderThread} is being closed.
121      *
122      * @param listener {@link MiIoMessageListener} to be unregistered
123      */
124     public synchronized void unregisterListener(MiIoMessageListener listener) {
125         getListeners().remove(listener);
126         if (getListeners().isEmpty()) {
127             concurrentLinkedQueue.clear();
128             close();
129         }
130     }
131
132     public int queueCommand(MiIoCommand command, String cloudServer) throws MiIoCryptoException, IOException {
133         return queueCommand(command, "[]", cloudServer);
134     }
135
136     public int queueCommand(MiIoCommand command, String params, String cloudServer)
137             throws MiIoCryptoException, IOException {
138         return queueCommand(command.getCommand(), params, cloudServer);
139     }
140
141     public int queueCommand(String command, String params, String cloudServer)
142             throws MiIoCryptoException, IOException, JsonSyntaxException {
143         try {
144             JsonObject fullCommand = new JsonObject();
145             int cmdId = id.incrementAndGet();
146             if (cmdId > MAX_ID) {
147                 id.set(0);
148             }
149             fullCommand.addProperty("id", cmdId);
150             fullCommand.addProperty("method", command);
151             fullCommand.add("params", parser.parse(params));
152             MiIoSendCommand sendCmd = new MiIoSendCommand(cmdId, MiIoCommand.getCommand(command), fullCommand,
153                     cloudServer);
154             concurrentLinkedQueue.add(sendCmd);
155             if (logger.isDebugEnabled()) {
156                 // Obfuscate part of the token to allow sharing of the logfiles
157                 String tokenText = Utils.obfuscateToken(Utils.getHex(token));
158                 logger.debug("Command added to Queue {} -> {} (Device: {} token: {} Queue: {}).{}{}",
159                         fullCommand.toString(), ip, Utils.getHex(deviceId), tokenText, concurrentLinkedQueue.size(),
160                         cloudServer.isBlank() ? "" : " Send via cloudserver: ", cloudServer);
161             }
162             if (needPing && cloudServer.isBlank()) {
163                 sendPing(ip);
164             }
165             return cmdId;
166         } catch (JsonSyntaxException e) {
167             logger.warn("Send command '{}' with parameters {} -> {} (Device: {}) gave error {}", command, params, ip,
168                     Utils.getHex(deviceId), e.getMessage());
169             throw e;
170         }
171     }
172
173     MiIoSendCommand sendMiIoSendCommand(MiIoSendCommand miIoSendCommand) {
174         String errorMsg = "Unknown Error while sending command";
175         String decryptedResponse = "";
176         try {
177             if (miIoSendCommand.getCloudServer().isBlank()) {
178                 decryptedResponse = sendCommand(miIoSendCommand.getCommandString(), token, ip, deviceId);
179             } else {
180                 decryptedResponse = cloudConnector.sendRPCCommand(Utils.getHex(deviceId),
181                         miIoSendCommand.getCloudServer(), miIoSendCommand);
182                 logger.debug("Command {} send via cloudserver {}", miIoSendCommand.getCommandString(),
183                         miIoSendCommand.getCloudServer());
184                 updateStatus(ThingStatus.ONLINE, ThingStatusDetail.NONE);
185             }
186             // hack due to avoid invalid json errors from some misbehaving device firmwares
187             decryptedResponse = decryptedResponse.replace(",,", ",");
188             JsonElement response;
189             response = parser.parse(decryptedResponse);
190             if (response.isJsonObject()) {
191                 needPing = false;
192                 logger.trace("Received  JSON message {}", response.toString());
193                 miIoSendCommand.setResponse(response.getAsJsonObject());
194                 return miIoSendCommand;
195             } else {
196                 errorMsg = "Received message is invalid JSON";
197                 logger.debug("{}: {}", errorMsg, decryptedResponse);
198             }
199         } catch (MiIoCryptoException | IOException e) {
200             logger.debug("Send command '{}'  -> {} (Device: {}) gave error {}", miIoSendCommand.getCommandString(), ip,
201                     Utils.getHex(deviceId), e.getMessage());
202             errorMsg = e.getMessage();
203         } catch (JsonSyntaxException e) {
204             logger.warn("Could not parse '{}' <- {} (Device: {}) gave error {}", decryptedResponse,
205                     miIoSendCommand.getCommandString(), Utils.getHex(deviceId), e.getMessage());
206             errorMsg = "Received message is invalid JSON";
207         } catch (MiCloudException e) {
208             logger.debug("Send command '{}'  -> cloudserver '{}' (Device: {}) gave error {}",
209                     miIoSendCommand.getCommandString(), miIoSendCommand.getCloudServer(), Utils.getHex(deviceId),
210                     e.getMessage());
211             errorMsg = e.getMessage();
212             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR);
213         }
214         JsonObject erroResp = new JsonObject();
215         erroResp.addProperty("error", errorMsg);
216         miIoSendCommand.setResponse(erroResp);
217         return miIoSendCommand;
218     }
219
220     public synchronized void startReceiver() {
221         MessageSenderThread senderThread = this.senderThread;
222         if (senderThread == null || !senderThread.isAlive()) {
223             senderThread = new MessageSenderThread();
224             senderThread.start();
225             this.senderThread = senderThread;
226         }
227     }
228
229     /**
230      * The {@link MessageSenderThread} is responsible for consuming messages from the queue and sending these to the
231      * device
232      *
233      */
234     private class MessageSenderThread extends Thread {
235         public MessageSenderThread() {
236             super("Mi IO MessageSenderThread");
237             setDaemon(true);
238         }
239
240         @Override
241         public void run() {
242             logger.debug("Starting Mi IO MessageSenderThread");
243             while (!interrupted()) {
244                 try {
245                     if (concurrentLinkedQueue.isEmpty()) {
246                         Thread.sleep(100);
247                         continue;
248                     }
249                     MiIoSendCommand queuedMessage = concurrentLinkedQueue.remove();
250                     MiIoSendCommand miIoSendCommand = sendMiIoSendCommand(queuedMessage);
251                     for (MiIoMessageListener listener : listeners) {
252                         logger.trace("inform listener {}, data {} from {}", listener, queuedMessage, miIoSendCommand);
253                         try {
254                             listener.onMessageReceived(miIoSendCommand);
255                         } catch (Exception e) {
256                             logger.debug("Could not inform listener {}: {}: ", listener, e.getMessage(), e);
257                         }
258                     }
259                 } catch (NoSuchElementException e) {
260                     // ignore
261                 } catch (InterruptedException e) {
262                     // That's our signal to stop
263                     break;
264                 } catch (Exception e) {
265                     logger.warn("Error while polling/sending message", e);
266                 }
267             }
268             closeSocket();
269             logger.debug("Finished Mi IO MessageSenderThread");
270         }
271     }
272
273     private String sendCommand(String command, byte[] token, String ip, byte[] deviceId)
274             throws MiIoCryptoException, IOException {
275         byte[] encr;
276         encr = MiIoCrypto.encrypt(command.getBytes(), token);
277         timeStamp = (int) TimeUnit.MILLISECONDS.toSeconds(Calendar.getInstance().getTime().getTime());
278         byte[] sendMsg = Message.createMsgData(encr, token, deviceId, timeStamp + timeDelta);
279         Message miIoResponseMsg = sendData(sendMsg, ip);
280         if (miIoResponseMsg == null) {
281             if (logger.isTraceEnabled()) {
282                 logger.trace("No response from device {} at {} for command {}.\r\n{}", Utils.getHex(deviceId), ip,
283                         command, (new Message(sendMsg)).toSting());
284             } else {
285                 logger.debug("No response from device {} at {} for command {}.", Utils.getHex(deviceId), ip, command);
286             }
287             errorCounter++;
288             if (errorCounter > MAX_ERRORS) {
289                 status = ThingStatusDetail.CONFIGURATION_ERROR;
290                 sendPing(ip);
291             }
292             return "{\"error\":\"No Response\"}";
293         }
294         if (!miIoResponseMsg.isChecksumValid()) {
295             return "{\"error\":\"Message has invalid checksum\"}";
296         }
297         if (errorCounter > 0) {
298             errorCounter = 0;
299             status = ThingStatusDetail.NONE;
300             updateStatus(ThingStatus.ONLINE, status);
301         }
302         if (!connected) {
303             pingSuccess();
304         }
305         String decryptedResponse = new String(MiIoCrypto.decrypt(miIoResponseMsg.getData(), token), "UTF-8").trim();
306         logger.trace("Received response from {}: {}", ip, decryptedResponse);
307         return decryptedResponse;
308     }
309
310     public @Nullable Message sendPing(String ip) throws IOException {
311         for (int i = 0; i < 3; i++) {
312             logger.debug("Sending Ping {} ({})", Utils.getHex(deviceId), ip);
313             Message resp = sendData(MiIoBindingConstants.DISCOVER_STRING, ip);
314             if (resp != null) {
315                 pingSuccess();
316                 return resp;
317             }
318         }
319         pingFail();
320         return null;
321     }
322
323     private void pingFail() {
324         logger.debug("Ping {} ({}) failed", Utils.getHex(deviceId), ip);
325         connected = false;
326         status = ThingStatusDetail.COMMUNICATION_ERROR;
327         updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR);
328     }
329
330     private void pingSuccess() {
331         logger.debug("Ping {} ({}) success", Utils.getHex(deviceId), ip);
332         if (!connected) {
333             connected = true;
334             status = ThingStatusDetail.NONE;
335             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE);
336         } else {
337             if (ThingStatusDetail.CONFIGURATION_ERROR.equals(status)) {
338                 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR);
339             } else {
340                 status = ThingStatusDetail.NONE;
341                 updateStatus(ThingStatus.ONLINE, status);
342             }
343         }
344     }
345
346     private void updateStatus(ThingStatus status, ThingStatusDetail statusDetail) {
347         for (MiIoMessageListener listener : listeners) {
348             logger.trace("inform listener {}, data {} from {}", listener, status, statusDetail);
349             try {
350                 listener.onStatusUpdated(status, statusDetail);
351             } catch (Exception e) {
352                 logger.debug("Could not inform listener {}: {}", listener, e.getMessage(), e);
353             }
354         }
355     }
356
357     private @Nullable Message sendData(byte[] sendMsg, String ip) throws IOException {
358         byte[] response = comms(sendMsg, ip);
359         if (response.length >= 32) {
360             Message miIoResponse = new Message(response);
361             timeStamp = (int) TimeUnit.MILLISECONDS.toSeconds(Calendar.getInstance().getTime().getTime());
362             timeDelta = miIoResponse.getTimestampAsInt() - timeStamp;
363             logger.trace("Message Details:{} ", miIoResponse.toSting());
364             return miIoResponse;
365         } else {
366             logger.trace("Reponse length <32 : {}", response.length);
367             return null;
368         }
369     }
370
371     private synchronized byte[] comms(byte[] message, String ip) throws IOException {
372         InetAddress ipAddress = InetAddress.getByName(ip);
373         DatagramSocket clientSocket = getSocket();
374         DatagramPacket receivePacket = new DatagramPacket(new byte[MSG_BUFFER_SIZE], MSG_BUFFER_SIZE);
375         try {
376             logger.trace("Connection {}:{}", ip, clientSocket.getLocalPort());
377             byte[] sendData = new byte[MSG_BUFFER_SIZE];
378             sendData = message;
379             DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length, ipAddress,
380                     MiIoBindingConstants.PORT);
381             clientSocket.send(sendPacket);
382             sendPacket.setData(new byte[MSG_BUFFER_SIZE]);
383             clientSocket.receive(receivePacket);
384             byte[] response = Arrays.copyOfRange(receivePacket.getData(), receivePacket.getOffset(),
385                     receivePacket.getOffset() + receivePacket.getLength());
386             return response;
387         } catch (SocketTimeoutException e) {
388             logger.debug("Communication error for Mi device at {}: {}", ip, e.getMessage());
389             needPing = true;
390             return new byte[0];
391         }
392     }
393
394     private DatagramSocket getSocket() throws SocketException {
395         @Nullable
396         DatagramSocket socket = this.socket;
397         if (socket == null || socket.isClosed()) {
398             socket = new DatagramSocket();
399             socket.setSoTimeout(timeout);
400             logger.debug("Opening socket on port: {} ", socket.getLocalPort());
401             this.socket = socket;
402             return socket;
403         } else {
404             return socket;
405         }
406     }
407
408     public void close() {
409         try {
410             final MessageSenderThread senderThread = this.senderThread;
411             if (senderThread != null) {
412                 senderThread.interrupt();
413             }
414         } catch (SecurityException e) {
415             logger.debug("Error while closing: {} ", e.getMessage());
416         }
417         closeSocket();
418     }
419
420     public void closeSocket() {
421         try {
422             final DatagramSocket socket = this.socket;
423             if (socket != null) {
424                 logger.debug("Closing socket for port: {} ", socket.getLocalPort());
425                 socket.close();
426                 this.socket = null;
427             }
428         } catch (SecurityException e) {
429             logger.debug("Error while closing: {} ", e.getMessage());
430         }
431     }
432
433     /**
434      * @return the id
435      */
436     public int getId() {
437         return id.incrementAndGet();
438     }
439
440     /**
441      * @param id the id to set
442      */
443     public void setId(int id) {
444         this.id.set(id);
445     }
446
447     /**
448      * Time delta between device time and server time
449      *
450      * @return delta
451      */
452     public int getTimeDelta() {
453         return timeDelta;
454     }
455
456     public byte[] getDeviceId() {
457         return deviceId;
458     }
459
460     public void setDeviceId(byte[] deviceId) {
461         this.deviceId = deviceId;
462     }
463
464     public int getQueueLength() {
465         return concurrentLinkedQueue.size();
466     }
467 }