]> git.basschouten.com Git - openhab-addons.git/blob
6720d590358687039e28a881c6bba68bcda42ea6
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.miio.internal.transport;
14
15 import java.io.IOException;
16 import java.net.DatagramPacket;
17 import java.net.DatagramSocket;
18 import java.net.InetAddress;
19 import java.net.SocketException;
20 import java.net.SocketTimeoutException;
21 import java.util.Arrays;
22 import java.util.Calendar;
23 import java.util.List;
24 import java.util.NoSuchElementException;
25 import java.util.concurrent.ConcurrentLinkedQueue;
26 import java.util.concurrent.CopyOnWriteArrayList;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.atomic.AtomicInteger;
29
30 import org.eclipse.jdt.annotation.NonNullByDefault;
31 import org.eclipse.jdt.annotation.Nullable;
32 import org.openhab.binding.miio.internal.Message;
33 import org.openhab.binding.miio.internal.MiIoBindingConstants;
34 import org.openhab.binding.miio.internal.MiIoCommand;
35 import org.openhab.binding.miio.internal.MiIoCrypto;
36 import org.openhab.binding.miio.internal.MiIoCryptoException;
37 import org.openhab.binding.miio.internal.MiIoMessageListener;
38 import org.openhab.binding.miio.internal.MiIoSendCommand;
39 import org.openhab.binding.miio.internal.Utils;
40 import org.openhab.core.thing.ThingStatus;
41 import org.openhab.core.thing.ThingStatusDetail;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 import com.google.gson.JsonElement;
46 import com.google.gson.JsonObject;
47 import com.google.gson.JsonParser;
48 import com.google.gson.JsonSyntaxException;
49
50 /**
51  * The {@link MiIoAsyncCommunication} is responsible for communications with the Mi IO devices
52  *
53  * @author Marcel Verpaalen - Initial contribution
54  */
55 @NonNullByDefault
56 public class MiIoAsyncCommunication {
57
58     private static final int MSG_BUFFER_SIZE = 2048;
59
60     private final Logger logger = LoggerFactory.getLogger(MiIoAsyncCommunication.class);
61
62     private final String ip;
63     private final byte[] token;
64     private byte[] deviceId;
65     private @Nullable DatagramSocket socket;
66
67     private List<MiIoMessageListener> listeners = new CopyOnWriteArrayList<>();
68
69     private AtomicInteger id = new AtomicInteger(-1);
70     private int timeDelta;
71     private int timeStamp;
72     private final JsonParser parser;
73     private @Nullable MessageSenderThread senderThread;
74     private boolean connected;
75     private ThingStatusDetail status = ThingStatusDetail.NONE;
76     private int errorCounter;
77     private int timeout;
78     private boolean needPing = true;
79     private static final int MAX_ERRORS = 3;
80     private static final int MAX_ID = 15000;
81
82     private ConcurrentLinkedQueue<MiIoSendCommand> concurrentLinkedQueue = new ConcurrentLinkedQueue<>();
83
84     public MiIoAsyncCommunication(String ip, byte[] token, byte[] did, int id, int timeout) {
85         this.ip = ip;
86         this.token = token;
87         this.deviceId = did;
88         this.timeout = timeout;
89         setId(id);
90         parser = new JsonParser();
91         startReceiver();
92     }
93
94     protected List<MiIoMessageListener> getListeners() {
95         return listeners;
96     }
97
98     /**
99      * Registers a {@link MiIoMessageListener} to be called back, when data is received.
100      * If no {@link MessageSenderThread} exists, when the method is called, it is being set up.
101      *
102      * @param listener {@link MiIoMessageListener} to be called back
103      */
104     public synchronized void registerListener(MiIoMessageListener listener) {
105         needPing = true;
106         startReceiver();
107         if (!getListeners().contains(listener)) {
108             logger.trace("Adding socket listener {}", listener);
109             getListeners().add(listener);
110         }
111     }
112
113     /**
114      * Unregisters a {@link MiIoMessageListener}. If there are no listeners left,
115      * the {@link MessageSenderThread} is being closed.
116      *
117      * @param listener {@link MiIoMessageListener} to be unregistered
118      */
119     public synchronized void unregisterListener(MiIoMessageListener listener) {
120         getListeners().remove(listener);
121         if (getListeners().isEmpty()) {
122             concurrentLinkedQueue.clear();
123             close();
124         }
125     }
126
127     public int queueCommand(MiIoCommand command) throws MiIoCryptoException, IOException {
128         return queueCommand(command, "[]");
129     }
130
131     public int queueCommand(MiIoCommand command, String params) throws MiIoCryptoException, IOException {
132         return queueCommand(command.getCommand(), params);
133     }
134
135     public int queueCommand(String command, String params)
136             throws MiIoCryptoException, IOException, JsonSyntaxException {
137         try {
138             JsonObject fullCommand = new JsonObject();
139             int cmdId = id.incrementAndGet();
140             if (cmdId > MAX_ID) {
141                 id.set(0);
142             }
143             fullCommand.addProperty("id", cmdId);
144             fullCommand.addProperty("method", command);
145             fullCommand.add("params", parser.parse(params));
146             MiIoSendCommand sendCmd = new MiIoSendCommand(cmdId, MiIoCommand.getCommand(command),
147                     fullCommand.toString());
148             concurrentLinkedQueue.add(sendCmd);
149             if (logger.isDebugEnabled()) {
150                 // Obfuscate part of the token to allow sharing of the logfiles
151                 String tokenText = Utils.obfuscateToken(Utils.getHex(token));
152                 logger.debug("Command added to Queue {} -> {} (Device: {} token: {} Queue: {})", fullCommand.toString(),
153                         ip, Utils.getHex(deviceId), tokenText, concurrentLinkedQueue.size());
154             }
155             if (needPing) {
156                 sendPing(ip);
157             }
158             return cmdId;
159         } catch (JsonSyntaxException e) {
160             logger.warn("Send command '{}' with parameters {} -> {} (Device: {}) gave error {}", command, params, ip,
161                     Utils.getHex(deviceId), e.getMessage());
162             throw e;
163         }
164     }
165
166     MiIoSendCommand sendMiIoSendCommand(MiIoSendCommand miIoSendCommand) {
167         String errorMsg = "Unknown Error while sending command";
168         String decryptedResponse = "";
169         try {
170             decryptedResponse = sendCommand(miIoSendCommand.getCommandString(), token, ip, deviceId);
171             // hack due to avoid invalid json errors from some misbehaving device firmwares
172             decryptedResponse = decryptedResponse.replace(",,", ",");
173             JsonElement response;
174             response = parser.parse(decryptedResponse);
175             if (response.isJsonObject()) {
176                 needPing = false;
177                 logger.trace("Received  JSON message {}", response.toString());
178                 miIoSendCommand.setResponse(response.getAsJsonObject());
179                 return miIoSendCommand;
180             } else {
181                 errorMsg = "Received message is invalid JSON";
182                 logger.debug("{}: {}", errorMsg, decryptedResponse);
183             }
184         } catch (MiIoCryptoException | IOException e) {
185             logger.debug("Send command '{}'  -> {} (Device: {}) gave error {}", miIoSendCommand.getCommandString(), ip,
186                     Utils.getHex(deviceId), e.getMessage());
187             errorMsg = e.getMessage();
188         } catch (JsonSyntaxException e) {
189             logger.warn("Could not parse '{}' <- {} (Device: {}) gave error {}", decryptedResponse,
190                     miIoSendCommand.getCommandString(), Utils.getHex(deviceId), e.getMessage());
191             errorMsg = "Received message is invalid JSON";
192         }
193         JsonObject erroResp = new JsonObject();
194         erroResp.addProperty("error", errorMsg);
195         miIoSendCommand.setResponse(erroResp);
196         return miIoSendCommand;
197     }
198
199     public synchronized void startReceiver() {
200         MessageSenderThread senderThread = this.senderThread;
201         if (senderThread == null || !senderThread.isAlive()) {
202             senderThread = new MessageSenderThread();
203             senderThread.start();
204             this.senderThread = senderThread;
205         }
206     }
207
208     /**
209      * The {@link MessageSenderThread} is responsible for consuming messages from the queue and sending these to the
210      * device
211      *
212      */
213     private class MessageSenderThread extends Thread {
214         public MessageSenderThread() {
215             super("Mi IO MessageSenderThread");
216             setDaemon(true);
217         }
218
219         @Override
220         public void run() {
221             logger.debug("Starting Mi IO MessageSenderThread");
222             while (!interrupted()) {
223                 try {
224                     if (concurrentLinkedQueue.isEmpty()) {
225                         Thread.sleep(100);
226                         continue;
227                     }
228                     MiIoSendCommand queuedMessage = concurrentLinkedQueue.remove();
229                     MiIoSendCommand miIoSendCommand = sendMiIoSendCommand(queuedMessage);
230                     for (MiIoMessageListener listener : listeners) {
231                         logger.trace("inform listener {}, data {} from {}", listener, queuedMessage, miIoSendCommand);
232                         try {
233                             listener.onMessageReceived(miIoSendCommand);
234                         } catch (Exception e) {
235                             logger.debug("Could not inform listener {}: {}: ", listener, e.getMessage(), e);
236                         }
237                     }
238                 } catch (NoSuchElementException e) {
239                     // ignore
240                 } catch (InterruptedException e) {
241                     // That's our signal to stop
242                     break;
243                 } catch (Exception e) {
244                     logger.warn("Error while polling/sending message", e);
245                 }
246             }
247             closeSocket();
248             logger.debug("Finished Mi IO MessageSenderThread");
249         }
250     }
251
252     private String sendCommand(String command, byte[] token, String ip, byte[] deviceId)
253             throws MiIoCryptoException, IOException {
254         byte[] encr;
255         encr = MiIoCrypto.encrypt(command.getBytes(), token);
256         timeStamp = (int) TimeUnit.MILLISECONDS.toSeconds(Calendar.getInstance().getTime().getTime());
257         byte[] sendMsg = Message.createMsgData(encr, token, deviceId, timeStamp + timeDelta);
258         Message miIoResponseMsg = sendData(sendMsg, ip);
259         if (miIoResponseMsg == null) {
260             if (logger.isTraceEnabled()) {
261                 logger.trace("No response from device {} at {} for command {}.\r\n{}", Utils.getHex(deviceId), ip,
262                         command, (new Message(sendMsg)).toSting());
263             } else {
264                 logger.debug("No response from device {} at {} for command {}.", Utils.getHex(deviceId), ip, command);
265             }
266             errorCounter++;
267             if (errorCounter > MAX_ERRORS) {
268                 status = ThingStatusDetail.CONFIGURATION_ERROR;
269                 sendPing(ip);
270             }
271             return "{\"error\":\"No Response\"}";
272         }
273         if (!miIoResponseMsg.isChecksumValid()) {
274             return "{\"error\":\"Message has invalid checksum\"}";
275         }
276         if (errorCounter > 0) {
277             errorCounter = 0;
278             status = ThingStatusDetail.NONE;
279             updateStatus(ThingStatus.ONLINE, status);
280         }
281         if (!connected) {
282             pingSuccess();
283         }
284         String decryptedResponse = new String(MiIoCrypto.decrypt(miIoResponseMsg.getData(), token), "UTF-8").trim();
285         logger.trace("Received response from {}: {}", ip, decryptedResponse);
286         return decryptedResponse;
287     }
288
289     public @Nullable Message sendPing(String ip) throws IOException {
290         for (int i = 0; i < 3; i++) {
291             logger.debug("Sending Ping {} ({})", Utils.getHex(deviceId), ip);
292             Message resp = sendData(MiIoBindingConstants.DISCOVER_STRING, ip);
293             if (resp != null) {
294                 pingSuccess();
295                 return resp;
296             }
297         }
298         pingFail();
299         return null;
300     }
301
302     private void pingFail() {
303         logger.debug("Ping {} ({}) failed", Utils.getHex(deviceId), ip);
304         connected = false;
305         status = ThingStatusDetail.COMMUNICATION_ERROR;
306         updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR);
307     }
308
309     private void pingSuccess() {
310         logger.debug("Ping {} ({}) success", Utils.getHex(deviceId), ip);
311         if (!connected) {
312             connected = true;
313             status = ThingStatusDetail.NONE;
314             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE);
315         } else {
316             if (ThingStatusDetail.CONFIGURATION_ERROR.equals(status)) {
317                 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR);
318             } else {
319                 status = ThingStatusDetail.NONE;
320                 updateStatus(ThingStatus.ONLINE, status);
321             }
322         }
323     }
324
325     private void updateStatus(ThingStatus status, ThingStatusDetail statusDetail) {
326         for (MiIoMessageListener listener : listeners) {
327             logger.trace("inform listener {}, data {} from {}", listener, status, statusDetail);
328             try {
329                 listener.onStatusUpdated(status, statusDetail);
330             } catch (Exception e) {
331                 logger.debug("Could not inform listener {}: {}", listener, e.getMessage(), e);
332             }
333         }
334     }
335
336     private @Nullable Message sendData(byte[] sendMsg, String ip) throws IOException {
337         byte[] response = comms(sendMsg, ip);
338         if (response.length >= 32) {
339             Message miIoResponse = new Message(response);
340             timeStamp = (int) TimeUnit.MILLISECONDS.toSeconds(Calendar.getInstance().getTime().getTime());
341             timeDelta = miIoResponse.getTimestampAsInt() - timeStamp;
342             logger.trace("Message Details:{} ", miIoResponse.toSting());
343             return miIoResponse;
344         } else {
345             logger.trace("Reponse length <32 : {}", response.length);
346             return null;
347         }
348     }
349
350     private synchronized byte[] comms(byte[] message, String ip) throws IOException {
351         InetAddress ipAddress = InetAddress.getByName(ip);
352         DatagramSocket clientSocket = getSocket();
353         DatagramPacket receivePacket = new DatagramPacket(new byte[MSG_BUFFER_SIZE], MSG_BUFFER_SIZE);
354         try {
355             logger.trace("Connection {}:{}", ip, clientSocket.getLocalPort());
356             byte[] sendData = new byte[MSG_BUFFER_SIZE];
357             sendData = message;
358             DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length, ipAddress,
359                     MiIoBindingConstants.PORT);
360             clientSocket.send(sendPacket);
361             sendPacket.setData(new byte[MSG_BUFFER_SIZE]);
362             clientSocket.receive(receivePacket);
363             byte[] response = Arrays.copyOfRange(receivePacket.getData(), receivePacket.getOffset(),
364                     receivePacket.getOffset() + receivePacket.getLength());
365             return response;
366         } catch (SocketTimeoutException e) {
367             logger.debug("Communication error for Mi device at {}: {}", ip, e.getMessage());
368             needPing = true;
369             return new byte[0];
370         }
371     }
372
373     private DatagramSocket getSocket() throws SocketException {
374         @Nullable
375         DatagramSocket socket = this.socket;
376         if (socket == null || socket.isClosed()) {
377             socket = new DatagramSocket();
378             socket.setSoTimeout(timeout);
379             logger.debug("Opening socket on port: {} ", socket.getLocalPort());
380             this.socket = socket;
381             return socket;
382         } else {
383             return socket;
384         }
385     }
386
387     public void close() {
388         try {
389             final MessageSenderThread senderThread = this.senderThread;
390             if (senderThread != null) {
391                 senderThread.interrupt();
392             }
393         } catch (SecurityException e) {
394             logger.debug("Error while closing: {} ", e.getMessage());
395         }
396         closeSocket();
397     }
398
399     public void closeSocket() {
400         try {
401             final DatagramSocket socket = this.socket;
402             if (socket != null) {
403                 logger.debug("Closing socket for port: {} ", socket.getLocalPort());
404                 socket.close();
405                 this.socket = null;
406             }
407         } catch (SecurityException e) {
408             logger.debug("Error while closing: {} ", e.getMessage());
409         }
410     }
411
412     /**
413      * @return the id
414      */
415     public int getId() {
416         return id.incrementAndGet();
417     }
418
419     /**
420      * @param id the id to set
421      */
422     public void setId(int id) {
423         this.id.set(id);
424     }
425
426     /**
427      * Time delta between device time and server time
428      *
429      * @return delta
430      */
431     public int getTimeDelta() {
432         return timeDelta;
433     }
434
435     public byte[] getDeviceId() {
436         return deviceId;
437     }
438
439     public void setDeviceId(byte[] deviceId) {
440         this.deviceId = deviceId;
441     }
442
443     public int getQueueLength() {
444         return concurrentLinkedQueue.size();
445     }
446 }