2 * Copyright (c) 2010-2020 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.miio.internal.transport;
15 import java.io.IOException;
16 import java.net.DatagramPacket;
17 import java.net.DatagramSocket;
18 import java.net.InetAddress;
19 import java.net.SocketException;
20 import java.net.SocketTimeoutException;
21 import java.util.Arrays;
22 import java.util.Calendar;
23 import java.util.List;
24 import java.util.NoSuchElementException;
25 import java.util.concurrent.ConcurrentLinkedQueue;
26 import java.util.concurrent.CopyOnWriteArrayList;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.atomic.AtomicInteger;
30 import org.eclipse.jdt.annotation.NonNullByDefault;
31 import org.eclipse.jdt.annotation.Nullable;
32 import org.openhab.binding.miio.internal.Message;
33 import org.openhab.binding.miio.internal.MiIoBindingConstants;
34 import org.openhab.binding.miio.internal.MiIoCommand;
35 import org.openhab.binding.miio.internal.MiIoCrypto;
36 import org.openhab.binding.miio.internal.MiIoCryptoException;
37 import org.openhab.binding.miio.internal.MiIoMessageListener;
38 import org.openhab.binding.miio.internal.MiIoSendCommand;
39 import org.openhab.binding.miio.internal.Utils;
40 import org.openhab.core.thing.ThingStatus;
41 import org.openhab.core.thing.ThingStatusDetail;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
45 import com.google.gson.JsonElement;
46 import com.google.gson.JsonObject;
47 import com.google.gson.JsonParser;
48 import com.google.gson.JsonSyntaxException;
51 * The {@link MiIoAsyncCommunication} is responsible for communications with the Mi IO devices
53 * @author Marcel Verpaalen - Initial contribution
56 public class MiIoAsyncCommunication {
58 private static final int MSG_BUFFER_SIZE = 2048;
60 private final Logger logger = LoggerFactory.getLogger(MiIoAsyncCommunication.class);
62 private final String ip;
63 private final byte[] token;
64 private byte[] deviceId;
65 private @Nullable DatagramSocket socket;
67 private List<MiIoMessageListener> listeners = new CopyOnWriteArrayList<>();
69 private AtomicInteger id = new AtomicInteger(-1);
70 private int timeDelta;
71 private int timeStamp;
72 private final JsonParser parser;
73 private @Nullable MessageSenderThread senderThread;
74 private boolean connected;
75 private ThingStatusDetail status = ThingStatusDetail.NONE;
76 private int errorCounter;
78 private boolean needPing = true;
79 private static final int MAX_ERRORS = 3;
80 private static final int MAX_ID = 15000;
82 private ConcurrentLinkedQueue<MiIoSendCommand> concurrentLinkedQueue = new ConcurrentLinkedQueue<>();
84 public MiIoAsyncCommunication(String ip, byte[] token, byte[] did, int id, int timeout) {
88 this.timeout = timeout;
90 parser = new JsonParser();
94 protected List<MiIoMessageListener> getListeners() {
99 * Registers a {@link MiIoMessageListener} to be called back, when data is received.
100 * If no {@link MessageSenderThread} exists, when the method is called, it is being set up.
102 * @param listener {@link MiIoMessageListener} to be called back
104 public synchronized void registerListener(MiIoMessageListener listener) {
107 if (!getListeners().contains(listener)) {
108 logger.trace("Adding socket listener {}", listener);
109 getListeners().add(listener);
114 * Unregisters a {@link MiIoMessageListener}. If there are no listeners left,
115 * the {@link MessageSenderThread} is being closed.
117 * @param listener {@link MiIoMessageListener} to be unregistered
119 public synchronized void unregisterListener(MiIoMessageListener listener) {
120 getListeners().remove(listener);
121 if (getListeners().isEmpty()) {
122 concurrentLinkedQueue.clear();
127 public int queueCommand(MiIoCommand command) throws MiIoCryptoException, IOException {
128 return queueCommand(command, "[]");
131 public int queueCommand(MiIoCommand command, String params) throws MiIoCryptoException, IOException {
132 return queueCommand(command.getCommand(), params);
135 public int queueCommand(String command, String params)
136 throws MiIoCryptoException, IOException, JsonSyntaxException {
138 JsonObject fullCommand = new JsonObject();
139 int cmdId = id.incrementAndGet();
140 if (cmdId > MAX_ID) {
143 fullCommand.addProperty("id", cmdId);
144 fullCommand.addProperty("method", command);
145 fullCommand.add("params", parser.parse(params));
146 MiIoSendCommand sendCmd = new MiIoSendCommand(cmdId, MiIoCommand.getCommand(command),
147 fullCommand.toString());
148 concurrentLinkedQueue.add(sendCmd);
149 if (logger.isDebugEnabled()) {
150 // Obfuscate part of the token to allow sharing of the logfiles
151 String tokenText = Utils.obfuscateToken(Utils.getHex(token));
152 logger.debug("Command added to Queue {} -> {} (Device: {} token: {} Queue: {})", fullCommand.toString(),
153 ip, Utils.getHex(deviceId), tokenText, concurrentLinkedQueue.size());
159 } catch (JsonSyntaxException e) {
160 logger.warn("Send command '{}' with parameters {} -> {} (Device: {}) gave error {}", command, params, ip,
161 Utils.getHex(deviceId), e.getMessage());
166 MiIoSendCommand sendMiIoSendCommand(MiIoSendCommand miIoSendCommand) {
167 String errorMsg = "Unknown Error while sending command";
168 String decryptedResponse = "";
170 decryptedResponse = sendCommand(miIoSendCommand.getCommandString(), token, ip, deviceId);
171 // hack due to avoid invalid json errors from some misbehaving device firmwares
172 decryptedResponse = decryptedResponse.replace(",,", ",");
173 JsonElement response;
174 response = parser.parse(decryptedResponse);
175 if (response.isJsonObject()) {
177 logger.trace("Received JSON message {}", response.toString());
178 miIoSendCommand.setResponse(response.getAsJsonObject());
179 return miIoSendCommand;
181 errorMsg = "Received message is invalid JSON";
182 logger.debug("{}: {}", errorMsg, decryptedResponse);
184 } catch (MiIoCryptoException | IOException e) {
185 logger.debug("Send command '{}' -> {} (Device: {}) gave error {}", miIoSendCommand.getCommandString(), ip,
186 Utils.getHex(deviceId), e.getMessage());
187 errorMsg = e.getMessage();
188 } catch (JsonSyntaxException e) {
189 logger.warn("Could not parse '{}' <- {} (Device: {}) gave error {}", decryptedResponse,
190 miIoSendCommand.getCommandString(), Utils.getHex(deviceId), e.getMessage());
191 errorMsg = "Received message is invalid JSON";
193 JsonObject erroResp = new JsonObject();
194 erroResp.addProperty("error", errorMsg);
195 miIoSendCommand.setResponse(erroResp);
196 return miIoSendCommand;
199 public synchronized void startReceiver() {
200 MessageSenderThread senderThread = this.senderThread;
201 if (senderThread == null || !senderThread.isAlive()) {
202 senderThread = new MessageSenderThread();
203 senderThread.start();
204 this.senderThread = senderThread;
209 * The {@link MessageSenderThread} is responsible for consuming messages from the queue and sending these to the
213 private class MessageSenderThread extends Thread {
214 public MessageSenderThread() {
215 super("Mi IO MessageSenderThread");
221 logger.debug("Starting Mi IO MessageSenderThread");
222 while (!interrupted()) {
224 if (concurrentLinkedQueue.isEmpty()) {
228 MiIoSendCommand queuedMessage = concurrentLinkedQueue.remove();
229 MiIoSendCommand miIoSendCommand = sendMiIoSendCommand(queuedMessage);
230 for (MiIoMessageListener listener : listeners) {
231 logger.trace("inform listener {}, data {} from {}", listener, queuedMessage, miIoSendCommand);
233 listener.onMessageReceived(miIoSendCommand);
234 } catch (Exception e) {
235 logger.debug("Could not inform listener {}: {}: ", listener, e.getMessage(), e);
238 } catch (NoSuchElementException e) {
240 } catch (InterruptedException e) {
241 // That's our signal to stop
243 } catch (Exception e) {
244 logger.warn("Error while polling/sending message", e);
248 logger.debug("Finished Mi IO MessageSenderThread");
252 private String sendCommand(String command, byte[] token, String ip, byte[] deviceId)
253 throws MiIoCryptoException, IOException {
255 encr = MiIoCrypto.encrypt(command.getBytes(), token);
256 timeStamp = (int) TimeUnit.MILLISECONDS.toSeconds(Calendar.getInstance().getTime().getTime());
257 byte[] sendMsg = Message.createMsgData(encr, token, deviceId, timeStamp + timeDelta);
258 Message miIoResponseMsg = sendData(sendMsg, ip);
259 if (miIoResponseMsg == null) {
260 if (logger.isTraceEnabled()) {
261 logger.trace("No response from device {} at {} for command {}.\r\n{}", Utils.getHex(deviceId), ip,
262 command, (new Message(sendMsg)).toSting());
264 logger.debug("No response from device {} at {} for command {}.", Utils.getHex(deviceId), ip, command);
267 if (errorCounter > MAX_ERRORS) {
268 status = ThingStatusDetail.CONFIGURATION_ERROR;
271 return "{\"error\":\"No Response\"}";
273 if (!miIoResponseMsg.isChecksumValid()) {
274 return "{\"error\":\"Message has invalid checksum\"}";
276 if (errorCounter > 0) {
278 status = ThingStatusDetail.NONE;
279 updateStatus(ThingStatus.ONLINE, status);
284 String decryptedResponse = new String(MiIoCrypto.decrypt(miIoResponseMsg.getData(), token), "UTF-8").trim();
285 logger.trace("Received response from {}: {}", ip, decryptedResponse);
286 return decryptedResponse;
289 public @Nullable Message sendPing(String ip) throws IOException {
290 for (int i = 0; i < 3; i++) {
291 logger.debug("Sending Ping {} ({})", Utils.getHex(deviceId), ip);
292 Message resp = sendData(MiIoBindingConstants.DISCOVER_STRING, ip);
302 private void pingFail() {
303 logger.debug("Ping {} ({}) failed", Utils.getHex(deviceId), ip);
305 status = ThingStatusDetail.COMMUNICATION_ERROR;
306 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR);
309 private void pingSuccess() {
310 logger.debug("Ping {} ({}) success", Utils.getHex(deviceId), ip);
313 status = ThingStatusDetail.NONE;
314 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE);
316 if (ThingStatusDetail.CONFIGURATION_ERROR.equals(status)) {
317 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR);
319 status = ThingStatusDetail.NONE;
320 updateStatus(ThingStatus.ONLINE, status);
325 private void updateStatus(ThingStatus status, ThingStatusDetail statusDetail) {
326 for (MiIoMessageListener listener : listeners) {
327 logger.trace("inform listener {}, data {} from {}", listener, status, statusDetail);
329 listener.onStatusUpdated(status, statusDetail);
330 } catch (Exception e) {
331 logger.debug("Could not inform listener {}: {}", listener, e.getMessage(), e);
336 private @Nullable Message sendData(byte[] sendMsg, String ip) throws IOException {
337 byte[] response = comms(sendMsg, ip);
338 if (response.length >= 32) {
339 Message miIoResponse = new Message(response);
340 timeStamp = (int) TimeUnit.MILLISECONDS.toSeconds(Calendar.getInstance().getTime().getTime());
341 timeDelta = miIoResponse.getTimestampAsInt() - timeStamp;
342 logger.trace("Message Details:{} ", miIoResponse.toSting());
345 logger.trace("Reponse length <32 : {}", response.length);
350 private synchronized byte[] comms(byte[] message, String ip) throws IOException {
351 InetAddress ipAddress = InetAddress.getByName(ip);
352 DatagramSocket clientSocket = getSocket();
353 DatagramPacket receivePacket = new DatagramPacket(new byte[MSG_BUFFER_SIZE], MSG_BUFFER_SIZE);
355 logger.trace("Connection {}:{}", ip, clientSocket.getLocalPort());
356 byte[] sendData = new byte[MSG_BUFFER_SIZE];
358 DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length, ipAddress,
359 MiIoBindingConstants.PORT);
360 clientSocket.send(sendPacket);
361 sendPacket.setData(new byte[MSG_BUFFER_SIZE]);
362 clientSocket.receive(receivePacket);
363 byte[] response = Arrays.copyOfRange(receivePacket.getData(), receivePacket.getOffset(),
364 receivePacket.getOffset() + receivePacket.getLength());
366 } catch (SocketTimeoutException e) {
367 logger.debug("Communication error for Mi device at {}: {}", ip, e.getMessage());
373 private DatagramSocket getSocket() throws SocketException {
375 DatagramSocket socket = this.socket;
376 if (socket == null || socket.isClosed()) {
377 socket = new DatagramSocket();
378 socket.setSoTimeout(timeout);
379 logger.debug("Opening socket on port: {} ", socket.getLocalPort());
380 this.socket = socket;
387 public void close() {
389 final MessageSenderThread senderThread = this.senderThread;
390 if (senderThread != null) {
391 senderThread.interrupt();
393 } catch (SecurityException e) {
394 logger.debug("Error while closing: {} ", e.getMessage());
399 public void closeSocket() {
401 final DatagramSocket socket = this.socket;
402 if (socket != null) {
403 logger.debug("Closing socket for port: {} ", socket.getLocalPort());
407 } catch (SecurityException e) {
408 logger.debug("Error while closing: {} ", e.getMessage());
416 return id.incrementAndGet();
420 * @param id the id to set
422 public void setId(int id) {
427 * Time delta between device time and server time
431 public int getTimeDelta() {
435 public byte[] getDeviceId() {
439 public void setDeviceId(byte[] deviceId) {
440 this.deviceId = deviceId;
443 public int getQueueLength() {
444 return concurrentLinkedQueue.size();