]> git.basschouten.com Git - openhab-addons.git/blob
bc0335865e21e6b66c01ec05464ec26c017af058
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2021 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.miio.internal.transport;
14
15 import java.io.IOException;
16 import java.net.DatagramPacket;
17 import java.net.DatagramSocket;
18 import java.net.InetAddress;
19 import java.net.SocketException;
20 import java.net.SocketTimeoutException;
21 import java.nio.charset.StandardCharsets;
22 import java.time.Instant;
23 import java.util.Arrays;
24 import java.util.Calendar;
25 import java.util.List;
26 import java.util.NoSuchElementException;
27 import java.util.concurrent.ConcurrentLinkedQueue;
28 import java.util.concurrent.CopyOnWriteArrayList;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicInteger;
31
32 import org.eclipse.jdt.annotation.NonNullByDefault;
33 import org.eclipse.jdt.annotation.Nullable;
34 import org.openhab.binding.miio.internal.Message;
35 import org.openhab.binding.miio.internal.MiIoBindingConstants;
36 import org.openhab.binding.miio.internal.MiIoCommand;
37 import org.openhab.binding.miio.internal.MiIoCrypto;
38 import org.openhab.binding.miio.internal.MiIoCryptoException;
39 import org.openhab.binding.miio.internal.MiIoMessageListener;
40 import org.openhab.binding.miio.internal.MiIoSendCommand;
41 import org.openhab.binding.miio.internal.Utils;
42 import org.openhab.binding.miio.internal.cloud.CloudConnector;
43 import org.openhab.binding.miio.internal.cloud.MiCloudException;
44 import org.openhab.core.thing.ThingStatus;
45 import org.openhab.core.thing.ThingStatusDetail;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 import com.google.gson.JsonElement;
50 import com.google.gson.JsonObject;
51 import com.google.gson.JsonParser;
52 import com.google.gson.JsonSyntaxException;
53
54 /**
55  * The {@link MiIoAsyncCommunication} is responsible for communications with the Mi IO devices
56  *
57  * @author Marcel Verpaalen - Initial contribution
58  */
59 @NonNullByDefault
60 public class MiIoAsyncCommunication {
61
62     private static final int MSG_BUFFER_SIZE = 2048;
63
64     private final Logger logger = LoggerFactory.getLogger(MiIoAsyncCommunication.class);
65
66     private final String ip;
67     private final byte[] token;
68     private String deviceId;
69     private @Nullable DatagramSocket socket;
70
71     private List<MiIoMessageListener> listeners = new CopyOnWriteArrayList<>();
72
73     private AtomicInteger id = new AtomicInteger(-1);
74     private int timeDelta;
75     private int timeStamp;
76     private @Nullable MessageSenderThread senderThread;
77     private boolean connected;
78     private ThingStatusDetail status = ThingStatusDetail.NONE;
79     private int errorCounter;
80     private int timeout;
81     private boolean needPing = true;
82     private static final int MAX_ERRORS = 3;
83     private static final int MAX_ID = 15000;
84     private final CloudConnector cloudConnector;
85
86     private ConcurrentLinkedQueue<MiIoSendCommand> concurrentLinkedQueue = new ConcurrentLinkedQueue<>();
87
88     public MiIoAsyncCommunication(String ip, byte[] token, String did, int id, int timeout,
89             CloudConnector cloudConnector) {
90         this.ip = ip;
91         this.token = token;
92         this.deviceId = did;
93         this.timeout = timeout;
94         this.cloudConnector = cloudConnector;
95         setId(id);
96         startReceiver();
97     }
98
99     protected List<MiIoMessageListener> getListeners() {
100         return listeners;
101     }
102
103     /**
104      * Registers a {@link MiIoMessageListener} to be called back, when data is received.
105      * If no {@link MessageSenderThread} exists, when the method is called, it is being set up.
106      *
107      * @param listener {@link MiIoMessageListener} to be called back
108      */
109     public synchronized void registerListener(MiIoMessageListener listener) {
110         needPing = true;
111         startReceiver();
112         if (!getListeners().contains(listener)) {
113             logger.trace("Adding socket listener {}", listener);
114             getListeners().add(listener);
115         }
116     }
117
118     /**
119      * Unregisters a {@link MiIoMessageListener}. If there are no listeners left,
120      * the {@link MessageSenderThread} is being closed.
121      *
122      * @param listener {@link MiIoMessageListener} to be unregistered
123      */
124     public synchronized void unregisterListener(MiIoMessageListener listener) {
125         getListeners().remove(listener);
126         if (getListeners().isEmpty()) {
127             concurrentLinkedQueue.clear();
128             close();
129         }
130     }
131
132     public int queueCommand(MiIoCommand command, String cloudServer) throws MiIoCryptoException, IOException {
133         return queueCommand(command, "[]", cloudServer);
134     }
135
136     public int queueCommand(MiIoCommand command, String params, String cloudServer)
137             throws MiIoCryptoException, IOException {
138         return queueCommand(command.getCommand(), params, cloudServer);
139     }
140
141     public int queueCommand(String command, String params, String cloudServer)
142             throws MiIoCryptoException, IOException, JsonSyntaxException {
143         try {
144             JsonObject fullCommand = new JsonObject();
145             int cmdId = id.incrementAndGet();
146             if (cmdId > MAX_ID) {
147                 id.set(0);
148             }
149             fullCommand.addProperty("id", cmdId);
150             fullCommand.addProperty("method", command);
151             fullCommand.add("params", JsonParser.parseString(params));
152             MiIoSendCommand sendCmd = new MiIoSendCommand(cmdId, MiIoCommand.getCommand(command), fullCommand,
153                     cloudServer);
154             concurrentLinkedQueue.add(sendCmd);
155             if (logger.isDebugEnabled()) {
156                 // Obfuscate part of the token to allow sharing of the logfiles
157                 String tokenText = Utils.obfuscateToken(Utils.getHex(token));
158                 logger.debug("Command added to Queue {} -> {} (Device: {} token: {} Queue: {}).{}{}",
159                         fullCommand.toString(), ip, deviceId, tokenText, concurrentLinkedQueue.size(),
160                         cloudServer.isBlank() ? "" : " Send via cloudserver: ", cloudServer);
161             }
162             if (needPing && cloudServer.isBlank()) {
163                 sendPing(ip);
164             }
165             return cmdId;
166         } catch (JsonSyntaxException e) {
167             logger.warn("Send command '{}' with parameters {} -> {} (Device: {}) gave error {}", command, params, ip,
168                     deviceId, e.getMessage());
169             throw e;
170         }
171     }
172
173     MiIoSendCommand sendMiIoSendCommand(MiIoSendCommand miIoSendCommand) {
174         String errorMsg = "Unknown Error while sending command";
175         String decryptedResponse = "";
176         try {
177             if (miIoSendCommand.getCloudServer().isBlank()) {
178                 decryptedResponse = sendCommand(miIoSendCommand.getCommandString(), token, ip, deviceId);
179             } else {
180                 decryptedResponse = cloudConnector.sendRPCCommand(Utils.getHexId(deviceId),
181                         miIoSendCommand.getCloudServer(), miIoSendCommand);
182                 logger.debug("Command {} send via cloudserver {}", miIoSendCommand.getCommandString(),
183                         miIoSendCommand.getCloudServer());
184                 updateStatus(ThingStatus.ONLINE, ThingStatusDetail.NONE);
185             }
186             // hack due to avoid invalid json errors from some misbehaving device firmwares
187             decryptedResponse = decryptedResponse.replace(",,", ",");
188             JsonElement response;
189             response = JsonParser.parseString(decryptedResponse);
190             if (!response.isJsonObject()) {
191                 errorMsg = "Received message is not a JSON object ";
192             } else {
193                 needPing = false;
194                 logger.trace("Received  JSON message {}", response.toString());
195                 JsonObject resJson = response.getAsJsonObject();
196                 if (resJson.has("id")) {
197                     int id = resJson.get("id").getAsInt();
198                     if (id == miIoSendCommand.getId()) {
199                         miIoSendCommand.setResponse(response.getAsJsonObject());
200                         return miIoSendCommand;
201                     } else {
202                         if (id < miIoSendCommand.getId()) {
203                             errorMsg = String.format(
204                                     "Received message out of sync, extend timeout time. Expected id: %d, received id: %d",
205                                     miIoSendCommand.getId(), id);
206                         } else {
207                             errorMsg = String.format("Received message out of sync. Expected id: %d, received id: %d",
208                                     miIoSendCommand.getId(), id);
209                         }
210                     }
211                 } else {
212                     errorMsg = "Received message is without id";
213                 }
214
215             }
216             logger.debug("{}: {}", errorMsg, decryptedResponse);
217         } catch (MiIoCryptoException | IOException e) {
218             logger.debug("Send command '{}'  -> {} (Device: {}) gave error {}", miIoSendCommand.getCommandString(), ip,
219                     deviceId, e.getMessage());
220             errorMsg = e.getMessage();
221         } catch (JsonSyntaxException e) {
222             logger.warn("Could not parse '{}' <- {} (Device: {}) gave error {}", decryptedResponse,
223                     miIoSendCommand.getCommandString(), deviceId, e.getMessage());
224             errorMsg = "Received message is invalid JSON";
225         } catch (MiCloudException e) {
226             logger.debug("Send command '{}'  -> cloudserver '{}' (Device: {}) gave error {}",
227                     miIoSendCommand.getCommandString(), miIoSendCommand.getCloudServer(), deviceId, e.getMessage());
228             errorMsg = e.getMessage();
229             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR);
230         }
231         JsonObject erroResp = new JsonObject();
232         erroResp.addProperty("error", errorMsg);
233         miIoSendCommand.setResponse(erroResp);
234         return miIoSendCommand;
235     }
236
237     public synchronized void startReceiver() {
238         MessageSenderThread senderThread = this.senderThread;
239         if (senderThread == null || !senderThread.isAlive()) {
240             senderThread = new MessageSenderThread();
241             senderThread.start();
242             this.senderThread = senderThread;
243         }
244     }
245
246     /**
247      * The {@link MessageSenderThread} is responsible for consuming messages from the queue and sending these to the
248      * device
249      *
250      */
251     private class MessageSenderThread extends Thread {
252         public MessageSenderThread() {
253             super("Mi IO MessageSenderThread");
254             setDaemon(true);
255         }
256
257         @Override
258         public void run() {
259             logger.debug("Starting Mi IO MessageSenderThread");
260             while (!interrupted()) {
261                 try {
262                     if (concurrentLinkedQueue.isEmpty()) {
263                         Thread.sleep(100);
264                         continue;
265                     }
266                     MiIoSendCommand queuedMessage = concurrentLinkedQueue.remove();
267                     MiIoSendCommand miIoSendCommand = sendMiIoSendCommand(queuedMessage);
268                     for (MiIoMessageListener listener : listeners) {
269                         logger.trace("inform listener {}, data {} from {}", listener, queuedMessage, miIoSendCommand);
270                         try {
271                             listener.onMessageReceived(miIoSendCommand);
272                         } catch (Exception e) {
273                             logger.debug("Could not inform listener {}: {}: ", listener, e.getMessage(), e);
274                         }
275                     }
276                 } catch (NoSuchElementException e) {
277                     // ignore
278                 } catch (InterruptedException e) {
279                     // That's our signal to stop
280                     break;
281                 } catch (Exception e) {
282                     logger.warn("Error while polling/sending message", e);
283                 }
284             }
285             closeSocket();
286             logger.debug("Finished Mi IO MessageSenderThread");
287         }
288     }
289
290     private String sendCommand(String command, byte[] token, String ip, String deviceId)
291             throws MiIoCryptoException, IOException {
292         byte[] sendMsg = new byte[0];
293         if (!command.isBlank()) {
294             byte[] encr;
295             encr = MiIoCrypto.encrypt(command.getBytes(StandardCharsets.UTF_8), token);
296             timeStamp = (int) Instant.now().getEpochSecond();
297             sendMsg = Message.createMsgData(encr, token, Utils.hexStringToByteArray(Utils.getHexId(deviceId)),
298                     timeStamp + timeDelta);
299         }
300         Message miIoResponseMsg = sendData(sendMsg, ip);
301         if (miIoResponseMsg == null) {
302             if (logger.isTraceEnabled()) {
303                 logger.trace("No response from device {} at {} for command {}.\r\n{}", deviceId, ip, command,
304                         (new Message(sendMsg)).toSting());
305             } else {
306                 logger.debug("No response from device {} at {} for command {}.", deviceId, ip, command);
307             }
308             errorCounter++;
309             if (errorCounter > MAX_ERRORS) {
310                 status = ThingStatusDetail.CONFIGURATION_ERROR;
311                 sendPing(ip);
312             }
313             return "{\"error\":\"No Response\"}";
314         }
315         if (!miIoResponseMsg.isChecksumValid()) {
316             return "{\"error\":\"Message has invalid checksum\"}";
317         }
318         if (errorCounter > 0) {
319             errorCounter = 0;
320             status = ThingStatusDetail.NONE;
321             updateStatus(ThingStatus.ONLINE, status);
322         }
323         if (!connected) {
324             pingSuccess();
325         }
326         String decryptedResponse = new String(MiIoCrypto.decrypt(miIoResponseMsg.getData(), token), "UTF-8").trim();
327         logger.trace("Received response from {}: {}", ip, decryptedResponse);
328         return decryptedResponse;
329     }
330
331     public @Nullable Message sendPing(String ip) throws IOException {
332         for (int i = 0; i < 3; i++) {
333             logger.debug("Sending Ping to device '{}' ({})", deviceId, ip);
334             Message resp = sendData(MiIoBindingConstants.DISCOVER_STRING, ip);
335             if (resp != null) {
336                 pingSuccess();
337                 return resp;
338             }
339         }
340         pingFail();
341         return null;
342     }
343
344     private void pingFail() {
345         logger.debug("Ping to device '{}' ({}) failed", deviceId, ip);
346         connected = false;
347         status = ThingStatusDetail.COMMUNICATION_ERROR;
348         updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR);
349     }
350
351     private void pingSuccess() {
352         logger.debug("Ping to device '{}' ({}) success", deviceId, ip);
353         if (!connected) {
354             connected = true;
355             status = ThingStatusDetail.NONE;
356             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE);
357         } else {
358             if (ThingStatusDetail.CONFIGURATION_ERROR.equals(status)) {
359                 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR);
360             } else {
361                 status = ThingStatusDetail.NONE;
362                 updateStatus(ThingStatus.ONLINE, status);
363             }
364         }
365     }
366
367     private void updateStatus(ThingStatus status, ThingStatusDetail statusDetail) {
368         for (MiIoMessageListener listener : listeners) {
369             logger.trace("inform listener {}, data {} from {}", listener, status, statusDetail);
370             try {
371                 listener.onStatusUpdated(status, statusDetail);
372             } catch (Exception e) {
373                 logger.debug("Could not inform listener {}: {}", listener, e.getMessage(), e);
374             }
375         }
376     }
377
378     private @Nullable Message sendData(byte[] sendMsg, String ip) throws IOException {
379         byte[] response = comms(sendMsg, ip);
380         if (response.length >= 32) {
381             Message miIoResponse = new Message(response);
382             timeStamp = (int) TimeUnit.MILLISECONDS.toSeconds(Calendar.getInstance().getTime().getTime());
383             timeDelta = miIoResponse.getTimestampAsInt() - timeStamp;
384             logger.trace("Message Details:{} ", miIoResponse.toSting());
385             return miIoResponse;
386         } else {
387             logger.trace("Reponse length <32 : {}", response.length);
388             return null;
389         }
390     }
391
392     private synchronized byte[] comms(byte[] message, String ip) throws IOException {
393         InetAddress ipAddress = InetAddress.getByName(ip);
394         DatagramSocket clientSocket = getSocket();
395         DatagramPacket receivePacket = new DatagramPacket(new byte[MSG_BUFFER_SIZE], MSG_BUFFER_SIZE);
396         try {
397             logger.trace("Connection {}:{}", ip, clientSocket.getLocalPort());
398             if (message.length > 0) {
399                 byte[] sendData = new byte[MSG_BUFFER_SIZE];
400                 sendData = message;
401                 DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length, ipAddress,
402                         MiIoBindingConstants.PORT);
403                 clientSocket.send(sendPacket);
404                 sendPacket.setData(new byte[MSG_BUFFER_SIZE]);
405             }
406             clientSocket.receive(receivePacket);
407             byte[] response = Arrays.copyOfRange(receivePacket.getData(), receivePacket.getOffset(),
408                     receivePacket.getOffset() + receivePacket.getLength());
409             return response;
410         } catch (SocketTimeoutException e) {
411             logger.debug("Communication error for Mi device at {}: {}", ip, e.getMessage());
412             needPing = true;
413             return new byte[0];
414         }
415     }
416
417     private DatagramSocket getSocket() throws SocketException {
418         @Nullable
419         DatagramSocket socket = this.socket;
420         if (socket == null || socket.isClosed()) {
421             socket = new DatagramSocket();
422             socket.setSoTimeout(timeout);
423             logger.debug("Opening socket on port: {} ", socket.getLocalPort());
424             this.socket = socket;
425             return socket;
426         } else {
427             return socket;
428         }
429     }
430
431     public void close() {
432         try {
433             final MessageSenderThread senderThread = this.senderThread;
434             if (senderThread != null) {
435                 senderThread.interrupt();
436             }
437         } catch (SecurityException e) {
438             logger.debug("Error while closing: {} ", e.getMessage());
439         }
440         closeSocket();
441     }
442
443     public void closeSocket() {
444         try {
445             final DatagramSocket socket = this.socket;
446             if (socket != null) {
447                 logger.debug("Closing socket for port: {} ", socket.getLocalPort());
448                 socket.close();
449                 this.socket = null;
450             }
451         } catch (SecurityException e) {
452             logger.debug("Error while closing: {} ", e.getMessage());
453         }
454     }
455
456     /**
457      * @return the id
458      */
459     public int getId() {
460         return id.incrementAndGet();
461     }
462
463     /**
464      * @param id the id to set
465      */
466     public void setId(int id) {
467         this.id.set(id);
468     }
469
470     /**
471      * Time delta between device time and server time
472      *
473      * @return delta
474      */
475     public int getTimeDelta() {
476         return timeDelta;
477     }
478
479     public String getDeviceId() {
480         return deviceId;
481     }
482
483     public void setDeviceId(String deviceId) {
484         this.deviceId = deviceId;
485     }
486
487     public int getQueueLength() {
488         return concurrentLinkedQueue.size();
489     }
490 }