]> git.basschouten.com Git - openhab-addons.git/blob
7be4bd454a231d6b8f8fddb7cb16c4a9ae4470d3
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2021 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.miio.internal.transport;
14
15 import java.io.IOException;
16 import java.net.DatagramPacket;
17 import java.net.DatagramSocket;
18 import java.net.InetAddress;
19 import java.net.SocketException;
20 import java.net.SocketTimeoutException;
21 import java.nio.charset.StandardCharsets;
22 import java.time.Instant;
23 import java.util.Arrays;
24 import java.util.Calendar;
25 import java.util.List;
26 import java.util.NoSuchElementException;
27 import java.util.concurrent.ConcurrentLinkedQueue;
28 import java.util.concurrent.CopyOnWriteArrayList;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicInteger;
31
32 import org.eclipse.jdt.annotation.NonNullByDefault;
33 import org.eclipse.jdt.annotation.Nullable;
34 import org.openhab.binding.miio.internal.Message;
35 import org.openhab.binding.miio.internal.MiIoBindingConstants;
36 import org.openhab.binding.miio.internal.MiIoCommand;
37 import org.openhab.binding.miio.internal.MiIoCrypto;
38 import org.openhab.binding.miio.internal.MiIoCryptoException;
39 import org.openhab.binding.miio.internal.MiIoMessageListener;
40 import org.openhab.binding.miio.internal.MiIoSendCommand;
41 import org.openhab.binding.miio.internal.Utils;
42 import org.openhab.binding.miio.internal.cloud.CloudConnector;
43 import org.openhab.binding.miio.internal.cloud.MiCloudException;
44 import org.openhab.core.thing.ThingStatus;
45 import org.openhab.core.thing.ThingStatusDetail;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 import com.google.gson.JsonElement;
50 import com.google.gson.JsonObject;
51 import com.google.gson.JsonParser;
52 import com.google.gson.JsonSyntaxException;
53
54 /**
55  * The {@link MiIoAsyncCommunication} is responsible for communications with the Mi IO devices
56  *
57  * @author Marcel Verpaalen - Initial contribution
58  */
59 @NonNullByDefault
60 public class MiIoAsyncCommunication {
61
62     private static final int MSG_BUFFER_SIZE = 2048;
63
64     private final Logger logger = LoggerFactory.getLogger(MiIoAsyncCommunication.class);
65
66     private final String ip;
67     private final byte[] token;
68     private String deviceId;
69     private @Nullable DatagramSocket socket;
70
71     private List<MiIoMessageListener> listeners = new CopyOnWriteArrayList<>();
72
73     private AtomicInteger id = new AtomicInteger(-1);
74     private int timeDelta;
75     private int timeStamp;
76     private @Nullable MessageSenderThread senderThread;
77     private boolean connected;
78     private ThingStatusDetail status = ThingStatusDetail.NONE;
79     private int errorCounter;
80     private int timeout;
81     private boolean needPing = true;
82     private static final int MAX_ERRORS = 3;
83     private static final int MAX_ID = 15000;
84     private final CloudConnector cloudConnector;
85
86     private ConcurrentLinkedQueue<MiIoSendCommand> concurrentLinkedQueue = new ConcurrentLinkedQueue<>();
87
88     public MiIoAsyncCommunication(String ip, byte[] token, String did, int id, int timeout,
89             CloudConnector cloudConnector) {
90         this.ip = ip;
91         this.token = token;
92         this.deviceId = did;
93         this.timeout = timeout;
94         this.cloudConnector = cloudConnector;
95         setId(id);
96         startReceiver();
97     }
98
99     protected List<MiIoMessageListener> getListeners() {
100         return listeners;
101     }
102
103     /**
104      * Registers a {@link MiIoMessageListener} to be called back, when data is received.
105      * If no {@link MessageSenderThread} exists, when the method is called, it is being set up.
106      *
107      * @param listener {@link MiIoMessageListener} to be called back
108      */
109     public synchronized void registerListener(MiIoMessageListener listener) {
110         needPing = true;
111         startReceiver();
112         if (!getListeners().contains(listener)) {
113             logger.trace("Adding socket listener {}", listener);
114             getListeners().add(listener);
115         }
116     }
117
118     /**
119      * Unregisters a {@link MiIoMessageListener}. If there are no listeners left,
120      * the {@link MessageSenderThread} is being closed.
121      *
122      * @param listener {@link MiIoMessageListener} to be unregistered
123      */
124     public synchronized void unregisterListener(MiIoMessageListener listener) {
125         getListeners().remove(listener);
126         if (getListeners().isEmpty()) {
127             concurrentLinkedQueue.clear();
128             close();
129         }
130     }
131
132     public int queueCommand(MiIoCommand command, String cloudServer) throws MiIoCryptoException, IOException {
133         return queueCommand(command, "[]", cloudServer);
134     }
135
136     public int queueCommand(MiIoCommand command, String params, String cloudServer)
137             throws MiIoCryptoException, IOException {
138         return queueCommand(command.getCommand(), params, cloudServer);
139     }
140
141     public int queueCommand(String command, String params, String cloudServer)
142             throws MiIoCryptoException, IOException, JsonSyntaxException {
143         try {
144             JsonObject fullCommand = new JsonObject();
145             int cmdId = id.incrementAndGet();
146             if (cmdId > MAX_ID) {
147                 id.set(0);
148             }
149             if (command.startsWith("{") && command.endsWith("}")) {
150                 fullCommand = JsonParser.parseString(command).getAsJsonObject();
151                 fullCommand.addProperty("id", cmdId);
152                 if (!fullCommand.has("params") && !params.isBlank()) {
153                     fullCommand.add("params", JsonParser.parseString(params));
154                 }
155             } else {
156                 fullCommand.addProperty("id", cmdId);
157                 fullCommand.addProperty("method", command);
158                 fullCommand.add("params", JsonParser.parseString(params));
159             }
160             MiIoSendCommand sendCmd = new MiIoSendCommand(cmdId, MiIoCommand.getCommand(command), fullCommand,
161                     cloudServer);
162             concurrentLinkedQueue.add(sendCmd);
163             if (logger.isDebugEnabled()) {
164                 // Obfuscate part of the token to allow sharing of the logfiles
165                 String tokenText = Utils.obfuscateToken(Utils.getHex(token));
166                 logger.debug("Command added to Queue {} -> {} (Device: {} token: {} Queue: {}).{}{}",
167                         fullCommand.toString(), ip, deviceId, tokenText, concurrentLinkedQueue.size(),
168                         cloudServer.isBlank() ? "" : " Send via cloudserver: ", cloudServer);
169             }
170             if (needPing && cloudServer.isBlank()) {
171                 sendPing(ip);
172             }
173             return cmdId;
174         } catch (JsonSyntaxException | IllegalStateException e) {
175             logger.warn("Send command '{}' with parameters {} -> {} (Device: {}) gave error {}", command, params, ip,
176                     deviceId, e.getMessage());
177             throw e;
178         }
179     }
180
181     MiIoSendCommand sendMiIoSendCommand(MiIoSendCommand miIoSendCommand) {
182         String errorMsg = "Unknown Error while sending command";
183         String decryptedResponse = "";
184         try {
185             if (miIoSendCommand.getCloudServer().isBlank()) {
186                 decryptedResponse = sendCommand(miIoSendCommand.getCommandString(), token, ip, deviceId);
187             } else {
188                 if (!miIoSendCommand.getMethod().startsWith("/")) {
189                     decryptedResponse = cloudConnector.sendRPCCommand(Utils.getHexId(deviceId),
190                             miIoSendCommand.getCloudServer(), miIoSendCommand);
191                     logger.debug("Command {} send via cloudserver {}", miIoSendCommand.getCommandString(),
192                             miIoSendCommand.getCloudServer());
193                     updateStatus(ThingStatus.ONLINE, ThingStatusDetail.NONE);
194                 } else {
195                     String data = miIoSendCommand.getParams().isJsonArray()
196                             && miIoSendCommand.getParams().getAsJsonArray().size() > 0
197                                     ? miIoSendCommand.getParams().getAsJsonArray().get(0).toString()
198                                     : "";
199                     logger.debug("Custom cloud request send to url '{}' with data '{}'", miIoSendCommand.getMethod(),
200                             data);
201                     decryptedResponse = cloudConnector.sendCloudCommand(miIoSendCommand.getMethod(),
202                             miIoSendCommand.getCloudServer(), data);
203                     miIoSendCommand.setResponse(JsonParser.parseString(decryptedResponse).getAsJsonObject());
204                     return miIoSendCommand;
205                 }
206             }
207             // hack due to avoid invalid json errors from some misbehaving device firmwares
208             decryptedResponse = decryptedResponse.replace(",,", ",");
209             JsonElement response;
210             response = JsonParser.parseString(decryptedResponse);
211             if (!response.isJsonObject()) {
212                 errorMsg = "Received message is not a JSON object ";
213             } else {
214                 needPing = false;
215                 logger.trace("Received  JSON message {}", response.toString());
216                 JsonObject resJson = response.getAsJsonObject();
217                 if (resJson.has("id")) {
218                     int id = resJson.get("id").getAsInt();
219                     if (id == miIoSendCommand.getId()) {
220                         miIoSendCommand.setResponse(response.getAsJsonObject());
221                         return miIoSendCommand;
222                     } else {
223                         if (id < miIoSendCommand.getId()) {
224                             errorMsg = String.format(
225                                     "Received message out of sync, extend timeout time. Expected id: %d, received id: %d",
226                                     miIoSendCommand.getId(), id);
227                         } else {
228                             errorMsg = String.format("Received message out of sync. Expected id: %d, received id: %d",
229                                     miIoSendCommand.getId(), id);
230                         }
231                     }
232                 } else {
233                     errorMsg = "Received message is without id";
234                 }
235
236             }
237             logger.debug("{}: {}", errorMsg, decryptedResponse);
238         } catch (MiIoCryptoException | IOException e) {
239             logger.debug("Send command '{}'  -> {} (Device: {}) gave error {}", miIoSendCommand.getCommandString(), ip,
240                     deviceId, e.getMessage());
241             errorMsg = e.getMessage();
242         } catch (JsonSyntaxException e) {
243             logger.warn("Could not parse '{}' <- {} (Device: {}) gave error {}", decryptedResponse,
244                     miIoSendCommand.getCommandString(), deviceId, e.getMessage());
245             errorMsg = "Received message is invalid JSON";
246         } catch (MiCloudException e) {
247             logger.debug("Send command '{}'  -> cloudserver '{}' (Device: {}) gave error {}",
248                     miIoSendCommand.getCommandString(), miIoSendCommand.getCloudServer(), deviceId, e.getMessage());
249             errorMsg = e.getMessage();
250             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR);
251         }
252         JsonObject erroResp = new JsonObject();
253         erroResp.addProperty("error", errorMsg);
254         miIoSendCommand.setResponse(erroResp);
255         return miIoSendCommand;
256     }
257
258     public synchronized void startReceiver() {
259         MessageSenderThread senderThread = this.senderThread;
260         if (senderThread == null || !senderThread.isAlive()) {
261             senderThread = new MessageSenderThread();
262             senderThread.start();
263             this.senderThread = senderThread;
264         }
265     }
266
267     /**
268      * The {@link MessageSenderThread} is responsible for consuming messages from the queue and sending these to the
269      * device
270      *
271      */
272     private class MessageSenderThread extends Thread {
273         public MessageSenderThread() {
274             super("Mi IO MessageSenderThread");
275             setDaemon(true);
276         }
277
278         @Override
279         public void run() {
280             logger.debug("Starting Mi IO MessageSenderThread");
281             while (!interrupted()) {
282                 try {
283                     if (concurrentLinkedQueue.isEmpty()) {
284                         Thread.sleep(100);
285                         continue;
286                     }
287                     MiIoSendCommand queuedMessage = concurrentLinkedQueue.remove();
288                     MiIoSendCommand miIoSendCommand = sendMiIoSendCommand(queuedMessage);
289                     for (MiIoMessageListener listener : listeners) {
290                         logger.trace("inform listener {}, data {} from {}", listener, queuedMessage, miIoSendCommand);
291                         try {
292                             listener.onMessageReceived(miIoSendCommand);
293                         } catch (Exception e) {
294                             logger.debug("Could not inform listener {}: {}: ", listener, e.getMessage(), e);
295                         }
296                     }
297                 } catch (NoSuchElementException e) {
298                     // ignore
299                 } catch (InterruptedException e) {
300                     // That's our signal to stop
301                     break;
302                 } catch (Exception e) {
303                     logger.warn("Error while polling/sending message", e);
304                 }
305             }
306             closeSocket();
307             logger.debug("Finished Mi IO MessageSenderThread");
308         }
309     }
310
311     private String sendCommand(String command, byte[] token, String ip, String deviceId)
312             throws MiIoCryptoException, IOException {
313         byte[] sendMsg = new byte[0];
314         if (!command.isBlank()) {
315             byte[] encr;
316             encr = MiIoCrypto.encrypt(command.getBytes(StandardCharsets.UTF_8), token);
317             timeStamp = (int) Instant.now().getEpochSecond();
318             sendMsg = Message.createMsgData(encr, token, Utils.hexStringToByteArray(Utils.getHexId(deviceId)),
319                     timeStamp + timeDelta);
320         }
321         Message miIoResponseMsg = sendData(sendMsg, ip);
322         if (miIoResponseMsg == null) {
323             if (logger.isTraceEnabled()) {
324                 logger.trace("No response from device {} at {} for command {}.\r\n{}", deviceId, ip, command,
325                         (new Message(sendMsg)).toSting());
326             } else {
327                 logger.debug("No response from device {} at {} for command {}.", deviceId, ip, command);
328             }
329             errorCounter++;
330             if (errorCounter > MAX_ERRORS) {
331                 status = ThingStatusDetail.CONFIGURATION_ERROR;
332                 sendPing(ip);
333             }
334             return "{\"error\":\"No Response\"}";
335         }
336         if (!miIoResponseMsg.isChecksumValid()) {
337             return "{\"error\":\"Message has invalid checksum\"}";
338         }
339         if (errorCounter > 0) {
340             errorCounter = 0;
341             status = ThingStatusDetail.NONE;
342             updateStatus(ThingStatus.ONLINE, status);
343         }
344         if (!connected) {
345             pingSuccess();
346         }
347         String decryptedResponse = new String(MiIoCrypto.decrypt(miIoResponseMsg.getData(), token), "UTF-8").trim();
348         logger.trace("Received response from {}: {}", ip, decryptedResponse);
349         return decryptedResponse;
350     }
351
352     public @Nullable Message sendPing(String ip) throws IOException {
353         for (int i = 0; i < 3; i++) {
354             logger.debug("Sending Ping to device '{}' ({})", deviceId, ip);
355             Message resp = sendData(MiIoBindingConstants.DISCOVER_STRING, ip);
356             if (resp != null) {
357                 pingSuccess();
358                 return resp;
359             }
360         }
361         pingFail();
362         return null;
363     }
364
365     private void pingFail() {
366         logger.debug("Ping to device '{}' ({}) failed", deviceId, ip);
367         connected = false;
368         status = ThingStatusDetail.COMMUNICATION_ERROR;
369         updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR);
370     }
371
372     private void pingSuccess() {
373         logger.debug("Ping to device '{}' ({}) success", deviceId, ip);
374         if (!connected) {
375             connected = true;
376             status = ThingStatusDetail.NONE;
377             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE);
378         } else {
379             if (ThingStatusDetail.CONFIGURATION_ERROR.equals(status)) {
380                 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR);
381             } else {
382                 status = ThingStatusDetail.NONE;
383                 updateStatus(ThingStatus.ONLINE, status);
384             }
385         }
386     }
387
388     private void updateStatus(ThingStatus status, ThingStatusDetail statusDetail) {
389         for (MiIoMessageListener listener : listeners) {
390             logger.trace("inform listener {}, data {} from {}", listener, status, statusDetail);
391             try {
392                 listener.onStatusUpdated(status, statusDetail);
393             } catch (Exception e) {
394                 logger.debug("Could not inform listener {}: {}", listener, e.getMessage(), e);
395             }
396         }
397     }
398
399     private @Nullable Message sendData(byte[] sendMsg, String ip) throws IOException {
400         byte[] response = comms(sendMsg, ip);
401         if (response.length >= 32) {
402             Message miIoResponse = new Message(response);
403             timeStamp = (int) TimeUnit.MILLISECONDS.toSeconds(Calendar.getInstance().getTime().getTime());
404             timeDelta = miIoResponse.getTimestampAsInt() - timeStamp;
405             logger.trace("Message Details:{} ", miIoResponse.toSting());
406             return miIoResponse;
407         } else {
408             logger.trace("Reponse length <32 : {}", response.length);
409             return null;
410         }
411     }
412
413     private synchronized byte[] comms(byte[] message, String ip) throws IOException {
414         InetAddress ipAddress = InetAddress.getByName(ip);
415         DatagramSocket clientSocket = getSocket();
416         DatagramPacket receivePacket = new DatagramPacket(new byte[MSG_BUFFER_SIZE], MSG_BUFFER_SIZE);
417         try {
418             logger.trace("Connection {}:{}", ip, clientSocket.getLocalPort());
419             if (message.length > 0) {
420                 byte[] sendData = new byte[MSG_BUFFER_SIZE];
421                 sendData = message;
422                 DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length, ipAddress,
423                         MiIoBindingConstants.PORT);
424                 clientSocket.send(sendPacket);
425                 sendPacket.setData(new byte[MSG_BUFFER_SIZE]);
426             }
427             clientSocket.receive(receivePacket);
428             byte[] response = Arrays.copyOfRange(receivePacket.getData(), receivePacket.getOffset(),
429                     receivePacket.getOffset() + receivePacket.getLength());
430             return response;
431         } catch (SocketTimeoutException e) {
432             logger.debug("Communication error for Mi device at {}: {}", ip, e.getMessage());
433             needPing = true;
434             return new byte[0];
435         }
436     }
437
438     private DatagramSocket getSocket() throws SocketException {
439         @Nullable
440         DatagramSocket socket = this.socket;
441         if (socket == null || socket.isClosed()) {
442             socket = new DatagramSocket();
443             socket.setSoTimeout(timeout);
444             logger.debug("Opening socket on port: {} ", socket.getLocalPort());
445             this.socket = socket;
446             return socket;
447         } else {
448             return socket;
449         }
450     }
451
452     public void close() {
453         try {
454             final MessageSenderThread senderThread = this.senderThread;
455             if (senderThread != null) {
456                 senderThread.interrupt();
457             }
458         } catch (SecurityException e) {
459             logger.debug("Error while closing: {} ", e.getMessage());
460         }
461         closeSocket();
462     }
463
464     public void closeSocket() {
465         try {
466             final DatagramSocket socket = this.socket;
467             if (socket != null) {
468                 logger.debug("Closing socket for port: {} ", socket.getLocalPort());
469                 socket.close();
470                 this.socket = null;
471             }
472         } catch (SecurityException e) {
473             logger.debug("Error while closing: {} ", e.getMessage());
474         }
475     }
476
477     /**
478      * @return the id
479      */
480     public int getId() {
481         return id.incrementAndGet();
482     }
483
484     /**
485      * @param id the id to set
486      */
487     public void setId(int id) {
488         this.id.set(id);
489     }
490
491     /**
492      * Time delta between device time and server time
493      *
494      * @return delta
495      */
496     public int getTimeDelta() {
497         return timeDelta;
498     }
499
500     public String getDeviceId() {
501         return deviceId;
502     }
503
504     public void setDeviceId(String deviceId) {
505         this.deviceId = deviceId;
506     }
507
508     public int getQueueLength() {
509         return concurrentLinkedQueue.size();
510     }
511 }