2 * Copyright (c) 2010-2021 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.miio.internal.transport;
15 import java.io.IOException;
16 import java.net.DatagramPacket;
17 import java.net.DatagramSocket;
18 import java.net.InetAddress;
19 import java.net.SocketException;
20 import java.net.SocketTimeoutException;
21 import java.nio.charset.StandardCharsets;
22 import java.time.Instant;
23 import java.util.Arrays;
24 import java.util.Calendar;
25 import java.util.List;
26 import java.util.NoSuchElementException;
27 import java.util.concurrent.ConcurrentLinkedQueue;
28 import java.util.concurrent.CopyOnWriteArrayList;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicInteger;
32 import org.eclipse.jdt.annotation.NonNullByDefault;
33 import org.eclipse.jdt.annotation.Nullable;
34 import org.openhab.binding.miio.internal.Message;
35 import org.openhab.binding.miio.internal.MiIoBindingConstants;
36 import org.openhab.binding.miio.internal.MiIoCommand;
37 import org.openhab.binding.miio.internal.MiIoCrypto;
38 import org.openhab.binding.miio.internal.MiIoCryptoException;
39 import org.openhab.binding.miio.internal.MiIoMessageListener;
40 import org.openhab.binding.miio.internal.MiIoSendCommand;
41 import org.openhab.binding.miio.internal.Utils;
42 import org.openhab.binding.miio.internal.cloud.CloudConnector;
43 import org.openhab.binding.miio.internal.cloud.MiCloudException;
44 import org.openhab.core.thing.ThingStatus;
45 import org.openhab.core.thing.ThingStatusDetail;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
49 import com.google.gson.JsonElement;
50 import com.google.gson.JsonObject;
51 import com.google.gson.JsonParser;
52 import com.google.gson.JsonSyntaxException;
55 * The {@link MiIoAsyncCommunication} is responsible for communications with the Mi IO devices
57 * @author Marcel Verpaalen - Initial contribution
60 public class MiIoAsyncCommunication {
62 private static final int MSG_BUFFER_SIZE = 2048;
64 private final Logger logger = LoggerFactory.getLogger(MiIoAsyncCommunication.class);
66 private final String ip;
67 private final byte[] token;
68 private String deviceId;
69 private @Nullable DatagramSocket socket;
71 private List<MiIoMessageListener> listeners = new CopyOnWriteArrayList<>();
73 private AtomicInteger id = new AtomicInteger(-1);
74 private int timeDelta;
75 private int timeStamp;
76 private @Nullable MessageSenderThread senderThread;
77 private boolean connected;
78 private ThingStatusDetail status = ThingStatusDetail.NONE;
79 private int errorCounter;
81 private boolean needPing = true;
82 private static final int MAX_ERRORS = 3;
83 private static final int MAX_ID = 15000;
84 private final CloudConnector cloudConnector;
86 private ConcurrentLinkedQueue<MiIoSendCommand> concurrentLinkedQueue = new ConcurrentLinkedQueue<>();
88 public MiIoAsyncCommunication(String ip, byte[] token, String did, int id, int timeout,
89 CloudConnector cloudConnector) {
93 this.timeout = timeout;
94 this.cloudConnector = cloudConnector;
98 protected List<MiIoMessageListener> getListeners() {
103 * Registers a {@link MiIoMessageListener} to be called back, when data is received.
104 * If no {@link MessageSenderThread} exists, when the method is called, it is being set up.
106 * @param listener {@link MiIoMessageListener} to be called back
108 public synchronized void registerListener(MiIoMessageListener listener) {
111 if (!getListeners().contains(listener)) {
112 logger.trace("Adding socket listener {}", listener);
113 getListeners().add(listener);
118 * Unregisters a {@link MiIoMessageListener}. If there are no listeners left,
119 * the {@link MessageSenderThread} is being closed.
121 * @param listener {@link MiIoMessageListener} to be unregistered
123 public synchronized void unregisterListener(MiIoMessageListener listener) {
124 getListeners().remove(listener);
125 if (getListeners().isEmpty()) {
126 concurrentLinkedQueue.clear();
131 public int queueCommand(String command, String params, String cloudServer, String sender)
132 throws MiIoCryptoException, IOException, JsonSyntaxException {
134 JsonObject fullCommand = new JsonObject();
135 int cmdId = id.incrementAndGet();
136 if (cmdId > MAX_ID) {
139 if (command.startsWith("{") && command.endsWith("}")) {
140 fullCommand = JsonParser.parseString(command).getAsJsonObject();
141 fullCommand.addProperty("id", cmdId);
142 if (!fullCommand.has("params") && !params.isBlank()) {
143 fullCommand.add("params", JsonParser.parseString(params));
146 fullCommand.addProperty("id", cmdId);
147 fullCommand.addProperty("method", command);
148 fullCommand.add("params", JsonParser.parseString(params));
150 MiIoSendCommand sendCmd = new MiIoSendCommand(cmdId, MiIoCommand.getCommand(command), fullCommand,
151 cloudServer, sender);
152 concurrentLinkedQueue.add(sendCmd);
153 if (logger.isDebugEnabled()) {
154 // Obfuscate part of the token to allow sharing of the logfiles
155 String tokenText = Utils.obfuscateToken(Utils.getHex(token));
156 logger.debug("Command added to Queue {} -> {} (Device: {} token: {} Queue: {}).{}{}",
157 fullCommand.toString(), ip, deviceId, tokenText, concurrentLinkedQueue.size(),
158 cloudServer.isBlank() ? "" : " Send via cloudserver: ", cloudServer);
160 if (needPing && cloudServer.isBlank()) {
164 } catch (JsonSyntaxException | IllegalStateException e) {
165 logger.warn("Send command '{}' with parameters {} -> {} (Device: {}) gave error {}", command, params, ip,
166 deviceId, e.getMessage());
171 MiIoSendCommand sendMiIoSendCommand(MiIoSendCommand miIoSendCommand) {
172 String errorMsg = "Unknown Error while sending command";
173 String decryptedResponse = "";
175 if (miIoSendCommand.getCloudServer().isBlank()) {
176 decryptedResponse = sendCommand(miIoSendCommand.getCommandString(), token, ip, deviceId);
178 if (!miIoSendCommand.getMethod().startsWith("/")) {
179 decryptedResponse = cloudConnector.sendRPCCommand(Utils.getHexId(deviceId),
180 miIoSendCommand.getCloudServer(), miIoSendCommand);
181 logger.debug("Command {} send via cloudserver {}", miIoSendCommand.getCommandString(),
182 miIoSendCommand.getCloudServer());
183 updateStatus(ThingStatus.ONLINE, ThingStatusDetail.NONE);
185 String data = miIoSendCommand.getParams().isJsonArray()
186 && miIoSendCommand.getParams().getAsJsonArray().size() > 0
187 ? miIoSendCommand.getParams().getAsJsonArray().get(0).toString()
189 logger.debug("Custom cloud request send to url '{}' with data '{}'", miIoSendCommand.getMethod(),
191 decryptedResponse = cloudConnector.sendCloudCommand(miIoSendCommand.getMethod(),
192 miIoSendCommand.getCloudServer(), data);
193 miIoSendCommand.setResponse(JsonParser.parseString(decryptedResponse).getAsJsonObject());
194 return miIoSendCommand;
197 // hack due to avoid invalid json errors from some misbehaving device firmwares
198 decryptedResponse = decryptedResponse.replace(",,", ",");
199 JsonElement response;
200 response = JsonParser.parseString(decryptedResponse);
201 if (!response.isJsonObject()) {
202 errorMsg = "Received message is not a JSON object ";
205 logger.trace("Received JSON message {}", response.toString());
206 JsonObject resJson = response.getAsJsonObject();
207 if (resJson.has("id")) {
208 int id = resJson.get("id").getAsInt();
209 if (id == miIoSendCommand.getId()) {
210 miIoSendCommand.setResponse(response.getAsJsonObject());
211 return miIoSendCommand;
213 if (id < miIoSendCommand.getId()) {
214 errorMsg = String.format(
215 "Received message out of sync, extend timeout time. Expected id: %d, received id: %d",
216 miIoSendCommand.getId(), id);
218 errorMsg = String.format("Received message out of sync. Expected id: %d, received id: %d",
219 miIoSendCommand.getId(), id);
223 errorMsg = "Received message is without id";
227 logger.debug("{}: {}", errorMsg, decryptedResponse);
228 } catch (MiIoCryptoException | IOException e) {
229 logger.debug("Send command '{}' -> {} (Device: {}) gave error {}", miIoSendCommand.getCommandString(), ip,
230 deviceId, e.getMessage());
231 errorMsg = e.getMessage();
232 } catch (JsonSyntaxException e) {
233 logger.warn("Could not parse '{}' <- {} (Device: {}) gave error {}", decryptedResponse,
234 miIoSendCommand.getCommandString(), deviceId, e.getMessage());
235 errorMsg = "Received message is invalid JSON";
236 } catch (MiCloudException e) {
237 logger.debug("Send command '{}' -> cloudserver '{}' (Device: {}) gave error {}",
238 miIoSendCommand.getCommandString(), miIoSendCommand.getCloudServer(), deviceId, e.getMessage());
239 errorMsg = e.getMessage();
240 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR);
242 JsonObject erroResp = new JsonObject();
243 erroResp.addProperty("error", errorMsg);
244 miIoSendCommand.setResponse(erroResp);
245 return miIoSendCommand;
248 public synchronized void startReceiver() {
249 MessageSenderThread senderThread = this.senderThread;
250 if (senderThread == null || !senderThread.isAlive()) {
251 senderThread = new MessageSenderThread(deviceId.isBlank() ? "?" + ip : deviceId);
252 senderThread.start();
253 this.senderThread = senderThread;
258 * The {@link MessageSenderThread} is responsible for consuming messages from the queue and sending these to the
262 private class MessageSenderThread extends Thread {
263 private final String deviceId;
265 public MessageSenderThread(String deviceId) {
266 super("OH-binding-miio-MessageSenderThread-" + deviceId);
268 this.deviceId = deviceId;
273 logger.debug("Starting Mi IO MessageSenderThread {}", deviceId);
274 while (!interrupted()) {
276 if (concurrentLinkedQueue.isEmpty()) {
280 MiIoSendCommand queuedMessage = concurrentLinkedQueue.remove();
281 MiIoSendCommand miIoSendCommand = sendMiIoSendCommand(queuedMessage);
282 for (MiIoMessageListener listener : listeners) {
283 logger.trace("inform listener {}, data {} from {}", listener, queuedMessage, miIoSendCommand);
285 listener.onMessageReceived(miIoSendCommand);
286 } catch (Exception e) {
287 logger.debug("Could not inform listener {}: {}: ", listener, e.getMessage(), e);
290 } catch (NoSuchElementException e) {
292 } catch (InterruptedException e) {
293 // That's our signal to stop
295 } catch (Exception e) {
296 logger.warn("Error while polling/sending message for {}", deviceId, e);
300 logger.debug("Finished Mi IO MessageSenderThread {}", deviceId);
304 private String sendCommand(String command, byte[] token, String ip, String deviceId)
305 throws MiIoCryptoException, IOException {
306 byte[] sendMsg = new byte[0];
307 if (!command.isBlank()) {
309 encr = MiIoCrypto.encrypt(command.getBytes(StandardCharsets.UTF_8), token);
310 timeStamp = (int) Instant.now().getEpochSecond();
311 sendMsg = Message.createMsgData(encr, token, Utils.hexStringToByteArray(Utils.getHexId(deviceId)),
312 timeStamp + timeDelta);
314 Message miIoResponseMsg = sendData(sendMsg, ip);
315 if (miIoResponseMsg == null) {
316 if (logger.isTraceEnabled()) {
317 logger.trace("No response from device {} at {} for command {}.\r\n{}", deviceId, ip, command,
318 (new Message(sendMsg)).toSting());
320 logger.debug("No response from device {} at {} for command {}.", deviceId, ip, command);
323 if (errorCounter > MAX_ERRORS) {
324 status = ThingStatusDetail.CONFIGURATION_ERROR;
327 return "{\"error\":\"No Response\"}";
329 if (!miIoResponseMsg.isChecksumValid()) {
330 return "{\"error\":\"Message has invalid checksum\"}";
332 if (errorCounter > 0) {
334 status = ThingStatusDetail.NONE;
335 updateStatus(ThingStatus.ONLINE, status);
340 String decryptedResponse = new String(MiIoCrypto.decrypt(miIoResponseMsg.getData(), token), "UTF-8").trim();
341 logger.trace("Received response from {}: {}", ip, decryptedResponse);
342 return decryptedResponse;
345 public @Nullable Message sendPing(String ip) throws IOException {
346 for (int i = 0; i < 3; i++) {
347 logger.debug("Sending Ping to device '{}' ({})", deviceId, ip);
348 Message resp = sendData(MiIoBindingConstants.DISCOVER_STRING, ip);
358 private void pingFail() {
359 logger.debug("Ping to device '{}' ({}) failed", deviceId, ip);
361 status = ThingStatusDetail.COMMUNICATION_ERROR;
362 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR);
365 private void pingSuccess() {
366 logger.debug("Ping to device '{}' ({}) success", deviceId, ip);
369 status = ThingStatusDetail.NONE;
370 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE);
372 if (ThingStatusDetail.CONFIGURATION_ERROR.equals(status)) {
373 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR);
375 status = ThingStatusDetail.NONE;
376 updateStatus(ThingStatus.ONLINE, status);
381 private void updateStatus(ThingStatus status, ThingStatusDetail statusDetail) {
382 for (MiIoMessageListener listener : listeners) {
383 logger.trace("inform listener {}, data {} from {}", listener, status, statusDetail);
385 listener.onStatusUpdated(status, statusDetail);
386 } catch (Exception e) {
387 logger.debug("Could not inform listener {}: {}", listener, e.getMessage(), e);
392 private @Nullable Message sendData(byte[] sendMsg, String ip) throws IOException {
393 byte[] response = comms(sendMsg, ip);
394 if (response.length >= 32) {
395 Message miIoResponse = new Message(response);
396 timeStamp = (int) TimeUnit.MILLISECONDS.toSeconds(Calendar.getInstance().getTime().getTime());
397 timeDelta = miIoResponse.getTimestampAsInt() - timeStamp;
398 logger.trace("Message Details:{} ", miIoResponse.toSting());
401 logger.trace("Reponse length <32 : {}", response.length);
406 private synchronized byte[] comms(byte[] message, String ip) throws IOException {
407 InetAddress ipAddress = InetAddress.getByName(ip);
408 DatagramSocket clientSocket = getSocket();
409 DatagramPacket receivePacket = new DatagramPacket(new byte[MSG_BUFFER_SIZE], MSG_BUFFER_SIZE);
411 logger.trace("Connection {}:{}", ip, clientSocket.getLocalPort());
412 if (message.length > 0) {
413 byte[] sendData = new byte[MSG_BUFFER_SIZE];
415 DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length, ipAddress,
416 MiIoBindingConstants.PORT);
417 clientSocket.send(sendPacket);
418 sendPacket.setData(new byte[MSG_BUFFER_SIZE]);
420 clientSocket.receive(receivePacket);
421 byte[] response = Arrays.copyOfRange(receivePacket.getData(), receivePacket.getOffset(),
422 receivePacket.getOffset() + receivePacket.getLength());
424 } catch (SocketTimeoutException e) {
425 logger.debug("Communication error for Mi device at {}: {}", ip, e.getMessage());
431 private DatagramSocket getSocket() throws SocketException {
433 DatagramSocket socket = this.socket;
434 if (socket == null || socket.isClosed()) {
435 socket = new DatagramSocket();
436 socket.setSoTimeout(timeout);
437 logger.debug("Opening socket on port: {} ({} {})", socket.getLocalPort(), deviceId, ip);
438 this.socket = socket;
445 public void close() {
447 final MessageSenderThread senderThread = this.senderThread;
448 if (senderThread != null) {
449 senderThread.interrupt();
451 } catch (SecurityException e) {
452 logger.debug("Error while closing: {} ", e.getMessage());
457 public void closeSocket() {
459 final DatagramSocket socket = this.socket;
460 if (socket != null) {
461 logger.debug("Closing socket for port: {} ", socket.getLocalPort());
465 } catch (SecurityException e) {
466 logger.debug("Error while closing: {} ", e.getMessage());
474 return id.incrementAndGet();
478 * @param id the id to set
480 public void setId(int id) {
485 * Time delta between device time and server time
489 public int getTimeDelta() {
493 public String getDeviceId() {
497 public void setDeviceId(String deviceId) {
498 this.deviceId = deviceId;
499 MessageSenderThread senderThread = this.senderThread;
500 if (senderThread != null) {
501 senderThread.setName("OH-binding-miio-MessageSenderThread-" + deviceId);
505 public int getQueueLength() {
506 return concurrentLinkedQueue.size();