]> git.basschouten.com Git - openhab-addons.git/blob
9fb9ccd32e2fefa2c4fb419b19b19e99fe8b0369
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.miio.internal.transport;
14
15 import java.io.IOException;
16 import java.net.DatagramPacket;
17 import java.net.DatagramSocket;
18 import java.net.InetAddress;
19 import java.net.SocketException;
20 import java.net.SocketTimeoutException;
21 import java.util.Arrays;
22 import java.util.Calendar;
23 import java.util.List;
24 import java.util.NoSuchElementException;
25 import java.util.concurrent.ConcurrentLinkedQueue;
26 import java.util.concurrent.CopyOnWriteArrayList;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.atomic.AtomicInteger;
29
30 import org.eclipse.jdt.annotation.NonNullByDefault;
31 import org.eclipse.jdt.annotation.Nullable;
32 import org.openhab.binding.miio.internal.Message;
33 import org.openhab.binding.miio.internal.MiIoBindingConstants;
34 import org.openhab.binding.miio.internal.MiIoCommand;
35 import org.openhab.binding.miio.internal.MiIoCrypto;
36 import org.openhab.binding.miio.internal.MiIoCryptoException;
37 import org.openhab.binding.miio.internal.MiIoMessageListener;
38 import org.openhab.binding.miio.internal.MiIoSendCommand;
39 import org.openhab.binding.miio.internal.Utils;
40 import org.openhab.core.thing.ThingStatus;
41 import org.openhab.core.thing.ThingStatusDetail;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 import com.google.gson.JsonElement;
46 import com.google.gson.JsonObject;
47 import com.google.gson.JsonParser;
48 import com.google.gson.JsonSyntaxException;
49
50 /**
51  * The {@link MiIoAsyncCommunication} is responsible for communications with the Mi IO devices
52  *
53  * @author Marcel Verpaalen - Initial contribution
54  */
55 @NonNullByDefault
56 public class MiIoAsyncCommunication {
57
58     private static final int MSG_BUFFER_SIZE = 2048;
59
60     private final Logger logger = LoggerFactory.getLogger(MiIoAsyncCommunication.class);
61
62     private final String ip;
63     private final byte[] token;
64     private byte[] deviceId;
65     private @Nullable DatagramSocket socket;
66
67     private List<MiIoMessageListener> listeners = new CopyOnWriteArrayList<>();
68
69     private AtomicInteger id = new AtomicInteger(-1);
70     private int timeDelta;
71     private int timeStamp;
72     private final JsonParser parser;
73     private @Nullable MessageSenderThread senderThread;
74     private boolean connected;
75     private ThingStatusDetail status = ThingStatusDetail.NONE;
76     private int errorCounter;
77     private int timeout;
78     private boolean needPing = true;
79     private static final int MAX_ERRORS = 3;
80     private static final int MAX_ID = 15000;
81
82     private ConcurrentLinkedQueue<MiIoSendCommand> concurrentLinkedQueue = new ConcurrentLinkedQueue<>();
83
84     public MiIoAsyncCommunication(String ip, byte[] token, byte[] did, int id, int timeout) {
85         this.ip = ip;
86         this.token = token;
87         this.deviceId = did;
88         this.timeout = timeout;
89         setId(id);
90         parser = new JsonParser();
91         startReceiver();
92     }
93
94     protected List<MiIoMessageListener> getListeners() {
95         return listeners;
96     }
97
98     /**
99      * Registers a {@link MiIoMessageListener} to be called back, when data is received.
100      * If no {@link MessageSenderThread} exists, when the method is called, it is being set up.
101      *
102      * @param listener {@link MiIoMessageListener} to be called back
103      */
104     public synchronized void registerListener(MiIoMessageListener listener) {
105         needPing = true;
106         startReceiver();
107         if (!getListeners().contains(listener)) {
108             logger.trace("Adding socket listener {}", listener);
109             getListeners().add(listener);
110         }
111     }
112
113     /**
114      * Unregisters a {@link MiIoMessageListener}. If there are no listeners left,
115      * the {@link MessageSenderThread} is being closed.
116      *
117      * @param listener {@link MiIoMessageListener} to be unregistered
118      */
119     public synchronized void unregisterListener(MiIoMessageListener listener) {
120         getListeners().remove(listener);
121         if (getListeners().isEmpty()) {
122             concurrentLinkedQueue.clear();
123             close();
124         }
125     }
126
127     public int queueCommand(MiIoCommand command) throws MiIoCryptoException, IOException {
128         return queueCommand(command, "[]");
129     }
130
131     public int queueCommand(MiIoCommand command, String params) throws MiIoCryptoException, IOException {
132         return queueCommand(command.getCommand(), params);
133     }
134
135     public int queueCommand(String command, String params)
136             throws MiIoCryptoException, IOException, JsonSyntaxException {
137         try {
138             JsonObject fullCommand = new JsonObject();
139             int cmdId = id.incrementAndGet();
140             if (cmdId > MAX_ID) {
141                 id.set(0);
142             }
143             fullCommand.addProperty("id", cmdId);
144             fullCommand.addProperty("method", command);
145             fullCommand.add("params", parser.parse(params));
146             MiIoSendCommand sendCmd = new MiIoSendCommand(cmdId, MiIoCommand.getCommand(command), fullCommand);
147             concurrentLinkedQueue.add(sendCmd);
148             if (logger.isDebugEnabled()) {
149                 // Obfuscate part of the token to allow sharing of the logfiles
150                 String tokenText = Utils.obfuscateToken(Utils.getHex(token));
151                 logger.debug("Command added to Queue {} -> {} (Device: {} token: {} Queue: {})", fullCommand.toString(),
152                         ip, Utils.getHex(deviceId), tokenText, concurrentLinkedQueue.size());
153             }
154             if (needPing) {
155                 sendPing(ip);
156             }
157             return cmdId;
158         } catch (JsonSyntaxException e) {
159             logger.warn("Send command '{}' with parameters {} -> {} (Device: {}) gave error {}", command, params, ip,
160                     Utils.getHex(deviceId), e.getMessage());
161             throw e;
162         }
163     }
164
165     MiIoSendCommand sendMiIoSendCommand(MiIoSendCommand miIoSendCommand) {
166         String errorMsg = "Unknown Error while sending command";
167         String decryptedResponse = "";
168         try {
169             decryptedResponse = sendCommand(miIoSendCommand.getCommandString(), token, ip, deviceId);
170             // hack due to avoid invalid json errors from some misbehaving device firmwares
171             decryptedResponse = decryptedResponse.replace(",,", ",");
172             JsonElement response;
173             response = parser.parse(decryptedResponse);
174             if (response.isJsonObject()) {
175                 needPing = false;
176                 logger.trace("Received  JSON message {}", response.toString());
177                 miIoSendCommand.setResponse(response.getAsJsonObject());
178                 return miIoSendCommand;
179             } else {
180                 errorMsg = "Received message is invalid JSON";
181                 logger.debug("{}: {}", errorMsg, decryptedResponse);
182             }
183         } catch (MiIoCryptoException | IOException e) {
184             logger.debug("Send command '{}'  -> {} (Device: {}) gave error {}", miIoSendCommand.getCommandString(), ip,
185                     Utils.getHex(deviceId), e.getMessage());
186             errorMsg = e.getMessage();
187         } catch (JsonSyntaxException e) {
188             logger.warn("Could not parse '{}' <- {} (Device: {}) gave error {}", decryptedResponse,
189                     miIoSendCommand.getCommandString(), Utils.getHex(deviceId), e.getMessage());
190             errorMsg = "Received message is invalid JSON";
191         }
192         JsonObject erroResp = new JsonObject();
193         erroResp.addProperty("error", errorMsg);
194         miIoSendCommand.setResponse(erroResp);
195         return miIoSendCommand;
196     }
197
198     public synchronized void startReceiver() {
199         MessageSenderThread senderThread = this.senderThread;
200         if (senderThread == null || !senderThread.isAlive()) {
201             senderThread = new MessageSenderThread();
202             senderThread.start();
203             this.senderThread = senderThread;
204         }
205     }
206
207     /**
208      * The {@link MessageSenderThread} is responsible for consuming messages from the queue and sending these to the
209      * device
210      *
211      */
212     private class MessageSenderThread extends Thread {
213         public MessageSenderThread() {
214             super("Mi IO MessageSenderThread");
215             setDaemon(true);
216         }
217
218         @Override
219         public void run() {
220             logger.debug("Starting Mi IO MessageSenderThread");
221             while (!interrupted()) {
222                 try {
223                     if (concurrentLinkedQueue.isEmpty()) {
224                         Thread.sleep(100);
225                         continue;
226                     }
227                     MiIoSendCommand queuedMessage = concurrentLinkedQueue.remove();
228                     MiIoSendCommand miIoSendCommand = sendMiIoSendCommand(queuedMessage);
229                     for (MiIoMessageListener listener : listeners) {
230                         logger.trace("inform listener {}, data {} from {}", listener, queuedMessage, miIoSendCommand);
231                         try {
232                             listener.onMessageReceived(miIoSendCommand);
233                         } catch (Exception e) {
234                             logger.debug("Could not inform listener {}: {}: ", listener, e.getMessage(), e);
235                         }
236                     }
237                 } catch (NoSuchElementException e) {
238                     // ignore
239                 } catch (InterruptedException e) {
240                     // That's our signal to stop
241                     break;
242                 } catch (Exception e) {
243                     logger.warn("Error while polling/sending message", e);
244                 }
245             }
246             closeSocket();
247             logger.debug("Finished Mi IO MessageSenderThread");
248         }
249     }
250
251     private String sendCommand(String command, byte[] token, String ip, byte[] deviceId)
252             throws MiIoCryptoException, IOException {
253         byte[] encr;
254         encr = MiIoCrypto.encrypt(command.getBytes(), token);
255         timeStamp = (int) TimeUnit.MILLISECONDS.toSeconds(Calendar.getInstance().getTime().getTime());
256         byte[] sendMsg = Message.createMsgData(encr, token, deviceId, timeStamp + timeDelta);
257         Message miIoResponseMsg = sendData(sendMsg, ip);
258         if (miIoResponseMsg == null) {
259             if (logger.isTraceEnabled()) {
260                 logger.trace("No response from device {} at {} for command {}.\r\n{}", Utils.getHex(deviceId), ip,
261                         command, (new Message(sendMsg)).toSting());
262             } else {
263                 logger.debug("No response from device {} at {} for command {}.", Utils.getHex(deviceId), ip, command);
264             }
265             errorCounter++;
266             if (errorCounter > MAX_ERRORS) {
267                 status = ThingStatusDetail.CONFIGURATION_ERROR;
268                 sendPing(ip);
269             }
270             return "{\"error\":\"No Response\"}";
271         }
272         if (!miIoResponseMsg.isChecksumValid()) {
273             return "{\"error\":\"Message has invalid checksum\"}";
274         }
275         if (errorCounter > 0) {
276             errorCounter = 0;
277             status = ThingStatusDetail.NONE;
278             updateStatus(ThingStatus.ONLINE, status);
279         }
280         if (!connected) {
281             pingSuccess();
282         }
283         String decryptedResponse = new String(MiIoCrypto.decrypt(miIoResponseMsg.getData(), token), "UTF-8").trim();
284         logger.trace("Received response from {}: {}", ip, decryptedResponse);
285         return decryptedResponse;
286     }
287
288     public @Nullable Message sendPing(String ip) throws IOException {
289         for (int i = 0; i < 3; i++) {
290             logger.debug("Sending Ping {} ({})", Utils.getHex(deviceId), ip);
291             Message resp = sendData(MiIoBindingConstants.DISCOVER_STRING, ip);
292             if (resp != null) {
293                 pingSuccess();
294                 return resp;
295             }
296         }
297         pingFail();
298         return null;
299     }
300
301     private void pingFail() {
302         logger.debug("Ping {} ({}) failed", Utils.getHex(deviceId), ip);
303         connected = false;
304         status = ThingStatusDetail.COMMUNICATION_ERROR;
305         updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR);
306     }
307
308     private void pingSuccess() {
309         logger.debug("Ping {} ({}) success", Utils.getHex(deviceId), ip);
310         if (!connected) {
311             connected = true;
312             status = ThingStatusDetail.NONE;
313             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE);
314         } else {
315             if (ThingStatusDetail.CONFIGURATION_ERROR.equals(status)) {
316                 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR);
317             } else {
318                 status = ThingStatusDetail.NONE;
319                 updateStatus(ThingStatus.ONLINE, status);
320             }
321         }
322     }
323
324     private void updateStatus(ThingStatus status, ThingStatusDetail statusDetail) {
325         for (MiIoMessageListener listener : listeners) {
326             logger.trace("inform listener {}, data {} from {}", listener, status, statusDetail);
327             try {
328                 listener.onStatusUpdated(status, statusDetail);
329             } catch (Exception e) {
330                 logger.debug("Could not inform listener {}: {}", listener, e.getMessage(), e);
331             }
332         }
333     }
334
335     private @Nullable Message sendData(byte[] sendMsg, String ip) throws IOException {
336         byte[] response = comms(sendMsg, ip);
337         if (response.length >= 32) {
338             Message miIoResponse = new Message(response);
339             timeStamp = (int) TimeUnit.MILLISECONDS.toSeconds(Calendar.getInstance().getTime().getTime());
340             timeDelta = miIoResponse.getTimestampAsInt() - timeStamp;
341             logger.trace("Message Details:{} ", miIoResponse.toSting());
342             return miIoResponse;
343         } else {
344             logger.trace("Reponse length <32 : {}", response.length);
345             return null;
346         }
347     }
348
349     private synchronized byte[] comms(byte[] message, String ip) throws IOException {
350         InetAddress ipAddress = InetAddress.getByName(ip);
351         DatagramSocket clientSocket = getSocket();
352         DatagramPacket receivePacket = new DatagramPacket(new byte[MSG_BUFFER_SIZE], MSG_BUFFER_SIZE);
353         try {
354             logger.trace("Connection {}:{}", ip, clientSocket.getLocalPort());
355             byte[] sendData = new byte[MSG_BUFFER_SIZE];
356             sendData = message;
357             DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length, ipAddress,
358                     MiIoBindingConstants.PORT);
359             clientSocket.send(sendPacket);
360             sendPacket.setData(new byte[MSG_BUFFER_SIZE]);
361             clientSocket.receive(receivePacket);
362             byte[] response = Arrays.copyOfRange(receivePacket.getData(), receivePacket.getOffset(),
363                     receivePacket.getOffset() + receivePacket.getLength());
364             return response;
365         } catch (SocketTimeoutException e) {
366             logger.debug("Communication error for Mi device at {}: {}", ip, e.getMessage());
367             needPing = true;
368             return new byte[0];
369         }
370     }
371
372     private DatagramSocket getSocket() throws SocketException {
373         @Nullable
374         DatagramSocket socket = this.socket;
375         if (socket == null || socket.isClosed()) {
376             socket = new DatagramSocket();
377             socket.setSoTimeout(timeout);
378             logger.debug("Opening socket on port: {} ", socket.getLocalPort());
379             this.socket = socket;
380             return socket;
381         } else {
382             return socket;
383         }
384     }
385
386     public void close() {
387         try {
388             final MessageSenderThread senderThread = this.senderThread;
389             if (senderThread != null) {
390                 senderThread.interrupt();
391             }
392         } catch (SecurityException e) {
393             logger.debug("Error while closing: {} ", e.getMessage());
394         }
395         closeSocket();
396     }
397
398     public void closeSocket() {
399         try {
400             final DatagramSocket socket = this.socket;
401             if (socket != null) {
402                 logger.debug("Closing socket for port: {} ", socket.getLocalPort());
403                 socket.close();
404                 this.socket = null;
405             }
406         } catch (SecurityException e) {
407             logger.debug("Error while closing: {} ", e.getMessage());
408         }
409     }
410
411     /**
412      * @return the id
413      */
414     public int getId() {
415         return id.incrementAndGet();
416     }
417
418     /**
419      * @param id the id to set
420      */
421     public void setId(int id) {
422         this.id.set(id);
423     }
424
425     /**
426      * Time delta between device time and server time
427      *
428      * @return delta
429      */
430     public int getTimeDelta() {
431         return timeDelta;
432     }
433
434     public byte[] getDeviceId() {
435         return deviceId;
436     }
437
438     public void setDeviceId(byte[] deviceId) {
439         this.deviceId = deviceId;
440     }
441
442     public int getQueueLength() {
443         return concurrentLinkedQueue.size();
444     }
445 }