2 * Copyright (c) 2010-2022 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.io.openhabcloud.internal;
15 import java.io.IOException;
16 import java.net.MalformedURLException;
18 import java.net.URISyntaxException;
20 import java.net.URLEncoder;
21 import java.nio.charset.StandardCharsets;
22 import java.util.Iterator;
23 import java.util.List;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ScheduledExecutorService;
28 import java.util.concurrent.TimeUnit;
30 import org.eclipse.jdt.annotation.Nullable;
31 import org.eclipse.jetty.client.HttpClient;
32 import org.eclipse.jetty.client.api.Request;
33 import org.eclipse.jetty.client.util.BytesContentProvider;
34 import org.eclipse.jetty.http.HttpField;
35 import org.eclipse.jetty.http.HttpFields;
36 import org.eclipse.jetty.http.HttpMethod;
37 import org.eclipse.jetty.http.HttpStatus;
38 import org.eclipse.jetty.util.BufferUtil;
39 import org.eclipse.jetty.util.URIUtil;
40 import org.json.JSONException;
41 import org.json.JSONObject;
42 import org.openhab.core.OpenHAB;
43 import org.openhab.core.common.ThreadPoolManager;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
47 import io.socket.backo.Backoff;
48 import io.socket.client.IO;
49 import io.socket.client.IO.Options;
50 import io.socket.client.Manager;
51 import io.socket.client.Socket;
52 import io.socket.emitter.Emitter;
53 import io.socket.engineio.client.Transport;
54 import io.socket.parser.Packet;
55 import io.socket.parser.Parser;
56 import okhttp3.OkHttpClient.Builder;
57 import okhttp3.logging.HttpLoggingInterceptor;
58 import okhttp3.logging.HttpLoggingInterceptor.Level;
61 * This class provides communication between openHAB and the openHAB Cloud service.
62 * It also implements async http proxy for serving requests from user to
63 * openHAB through the openHAB Cloud. It uses Socket.IO connection to connect to
64 * the openHAB Cloud service and Jetty Http client to send local http requests to
67 * @author Victor Belov - Initial contribution
68 * @author Kai Kreuzer - migrated code to new Jetty client and ESH APIs
70 public class CloudClient {
72 * Logger for this class
74 private final Logger logger = LoggerFactory.getLogger(CloudClient.class);
77 * This variable holds base URL for the openHAB Cloud connections
79 private final String baseURL;
82 * This variable holds openHAB's UUID for authenticating and connecting to the openHAB Cloud
84 private final String uuid;
87 * This variable holds openHAB's secret for authenticating and connecting to the openHAB Cloud
89 private final String secret;
92 * This variable holds local openHAB's base URL for connecting to the local openHAB instance
94 private final String localBaseUrl;
97 * This variable holds instance of Jetty HTTP client to make requests to local openHAB
99 private final HttpClient jettyClient;
102 * This map holds HTTP requests to local openHAB which are currently running
104 private final Map<Integer, Request> runningRequests = new ConcurrentHashMap<>();
107 * This variable indicates if connection to the openHAB Cloud is currently in an established state
109 private boolean isConnected;
112 * This variable holds version of local openHAB
114 private String openHABVersion;
117 * This variable holds instance of Socket.IO client class which provides communication
118 * with the openHAB Cloud
120 private Socket socket;
123 * The protocol of the openHAB-cloud URL.
125 private String protocol = "https";
128 * This variable holds instance of CloudClientListener which provides callbacks to communicate
129 * certain events from the openHAB Cloud back to openHAB
131 private CloudClientListener listener;
132 private boolean remoteAccessEnabled;
133 private Set<String> exposedItems;
136 * Back-off strategy for reconnecting when manual reconnection is needed
138 private final Backoff reconnectBackoff = new Backoff();
141 * Delay reconnect scheduler pool
144 protected final ScheduledExecutorService scheduler = ThreadPoolManager
145 .getScheduledPool(ThreadPoolManager.THREAD_POOL_NAME_COMMON);
148 * Constructor of CloudClient
150 * @param uuid openHAB's UUID to connect to the openHAB Cloud
151 * @param secret openHAB's Secret to connect to the openHAB Cloud
152 * @param remoteAccessEnabled Allow the openHAB Cloud to be used as a remote proxy
153 * @param exposedItems Items that are made available to apps connected to the openHAB Cloud
155 public CloudClient(HttpClient httpClient, String uuid, String secret, String baseURL, String localBaseUrl,
156 boolean remoteAccessEnabled, Set<String> exposedItems) {
158 this.secret = secret;
159 this.baseURL = baseURL;
160 this.localBaseUrl = localBaseUrl;
161 this.remoteAccessEnabled = remoteAccessEnabled;
162 this.exposedItems = exposedItems;
163 this.jettyClient = httpClient;
164 reconnectBackoff.setMin(1000);
165 reconnectBackoff.setMax(30_000);
166 reconnectBackoff.setJitter(0.5);
170 * Connect to the openHAB Cloud
173 public void connect() {
175 Options options = new Options();
176 if (logger.isTraceEnabled()) {
177 // When trace level logging is enabled, we activate further logging of HTTP calls
178 // of the Socket.IO library
179 Builder okHttpBuilder = new Builder();
180 HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor();
181 loggingInterceptor.setLevel(Level.BASIC);
182 okHttpBuilder.addInterceptor(loggingInterceptor);
183 okHttpBuilder.addNetworkInterceptor(loggingInterceptor);
184 options.callFactory = okHttpBuilder.build();
185 options.webSocketFactory = okHttpBuilder.build();
187 socket = IO.socket(baseURL, options);
188 URL parsed = new URL(baseURL);
189 protocol = parsed.getProtocol();
190 } catch (URISyntaxException e) {
191 logger.error("Error creating Socket.IO: {}", e.getMessage());
193 } catch (MalformedURLException e) {
194 logger.error("Error parsing baseURL to get protocol, assuming https. Error: {}", e.getMessage());
198 // socket manager events
201 .on(Manager.EVENT_TRANSPORT, args -> {
202 logger.trace("Manager.EVENT_TRANSPORT");
203 Transport transport = (Transport) args[0];
204 transport.on(Transport.EVENT_REQUEST_HEADERS, new Emitter.Listener() {
206 public void call(Object... args) {
207 logger.trace("Transport.EVENT_REQUEST_HEADERS");
208 @SuppressWarnings("unchecked")
209 Map<String, List<String>> headers = (Map<String, List<String>>) args[0];
210 headers.put("uuid", List.of(uuid));
211 headers.put("secret", List.of(secret));
212 headers.put("openhabversion", List.of(OpenHAB.getVersion()));
213 headers.put("clientversion", List.of(CloudService.clientVersion));
214 headers.put("remoteaccess", List.of(((Boolean) remoteAccessEnabled).toString()));
218 .on(Manager.EVENT_CONNECT_ERROR, args -> {
219 if (args.length > 0) {
220 if (args[0] instanceof Exception) {
221 Exception e = (Exception) args[0];
223 "Error connecting to the openHAB Cloud instance: {} {}. Should reconnect automatically.",
224 e.getClass().getSimpleName(), e.getMessage());
227 "Error connecting to the openHAB Cloud instance: {}. Should reconnect automatically.",
231 logger.debug("Error connecting to the openHAB Cloud instance. Should reconnect automatically.");
234 .on(Manager.EVENT_OPEN, args -> logger.debug("Socket.IO OPEN"))//
235 .on(Manager.EVENT_CLOSE, args -> logger.debug("Socket.IO CLOSE: {}", args[0]))//
236 .on(Manager.EVENT_PACKET, args -> {
237 int packetTypeIndex = -1;
238 String type = "<unexpected packet type>";
239 if (args.length == 1 && args[0] instanceof Packet<?>) {
240 packetTypeIndex = ((Packet<?>) args[0]).type;
242 if (packetTypeIndex < Parser.types.length) {
243 type = Parser.types[packetTypeIndex];
245 type = "<unknown type>";
248 logger.trace("Socket.IO Packet: {} ({})", type, packetTypeIndex);
255 socket.on(Socket.EVENT_CONNECT, args -> {
256 logger.debug("Socket.IO connected");
260 .on(Socket.EVENT_CONNECTING, args -> logger.debug("Socket.IO connecting"))//
261 .on(Socket.EVENT_RECONNECTING, args -> logger.debug("Socket.IO re-connecting (attempt {})", args[0]))//
262 .on(Socket.EVENT_RECONNECT,
263 args -> logger.debug("Socket.IO re-connected successfully (attempt {})", args[0]))//
264 .on(Socket.EVENT_RECONNECT_ERROR, args -> {
265 if (args[0] instanceof Exception) {
266 Exception e = (Exception) args[0];
267 logger.debug("Socket.IO re-connect attempt error: {} {}", e.getClass().getSimpleName(),
270 logger.debug("Socket.IO re-connect attempt error: {}", args[0]);
273 .on(Socket.EVENT_RECONNECT_FAILED,
274 args -> logger.debug("Socket.IO re-connect attempts failed. Stopping reconnection."))//
275 .on(Socket.EVENT_DISCONNECT, args -> {
276 if (args.length > 0) {
277 logger.warn("Socket.IO disconnected: {}", args[0]);
279 logger.warn("Socket.IO disconnected");
284 .on(Socket.EVENT_ERROR, args -> {
285 if (CloudClient.this.socket.connected()) {
286 if (args.length > 0) {
287 if (args[0] instanceof Exception) {
288 Exception e = (Exception) args[0];
289 logger.warn("Error during communication: {} {}", e.getClass().getSimpleName(),
292 logger.warn("Error during communication: {}", args[0]);
295 logger.warn("Error during communication");
298 // We are not connected currently, manual reconnection is needed to keep trying to
302 // Socket.IO 1.x java client: 'error' event is emitted from Socket on connection errors that
304 // retried, but also with error that are automatically retried. If we
306 // Note how this is different in Socket.IO 2.x java client, Socket emits 'connect_error'
308 // OBS: Don't get confused with Socket IO 2.x docs online, in 1.x connect_error is emitted
310 // errors that are retried by the library automatically!
311 long delay = reconnectBackoff.duration();
312 // Try reconnecting on connection errors
313 if (args.length > 0) {
314 if (args[0] instanceof Exception) {
315 Exception e = (Exception) args[0];
317 "Error connecting to the openHAB Cloud instance: {} {}. Reconnecting after {} ms.",
318 e.getClass().getSimpleName(), e.getMessage(), delay);
321 "Error connecting to the openHAB Cloud instance: {}. Reconnecting after {} ms.",
325 logger.warn("Error connecting to the openHAB Cloud instance. Reconnecting.");
328 scheduler.schedule(new Runnable() {
333 }, delay, TimeUnit.MILLISECONDS);
337 .on(Socket.EVENT_PING, args -> logger.debug("Socket.IO ping"))//
338 .on(Socket.EVENT_PONG, args -> logger.debug("Socket.IO pong: {} ms", args[0]))//
339 .on("request", args -> onEvent("request", (JSONObject) args[0]))//
340 .on("cancel", args -> onEvent("cancel", (JSONObject) args[0]))//
341 .on("command", args -> onEvent("command", (JSONObject) args[0]))//
347 * Callback method for socket.io client which is called when connection is established
350 public void onConnect() {
351 logger.info("Connected to the openHAB Cloud service (UUID = {}, base URL = {})", censored(this.uuid),
353 reconnectBackoff.reset();
358 * Callback method for socket.io client which is called when disconnect occurs
361 public void onDisconnect() {
362 logger.info("Disconnected from the openHAB Cloud service (UUID = {}, base URL = {})", censored(this.uuid),
365 // And clean up the list of running requests
366 runningRequests.clear();
370 * Callback method for socket.io client which is called when a message is received
373 public void onEvent(String event, JSONObject data) {
374 logger.debug("on(): {}", event);
375 if ("command".equals(event)) {
376 handleCommandEvent(data);
379 if (remoteAccessEnabled) {
380 if ("request".equals(event)) {
381 handleRequestEvent(data);
382 } else if ("cancel".equals(event)) {
383 handleCancelEvent(data);
385 logger.warn("Unsupported event from openHAB Cloud: {}", event);
390 private void handleRequestEvent(JSONObject data) {
392 // Get unique request Id
393 int requestId = data.getInt("id");
394 logger.debug("Got request {}", requestId);
396 String requestPath = data.getString("path");
397 logger.debug("Path {}", requestPath);
398 // Get request method
399 String requestMethod = data.getString("method");
400 logger.debug("Method {}", requestMethod);
401 // Get JSONObject for request headers
402 JSONObject requestHeadersJson = data.getJSONObject("headers");
403 logger.debug("Headers: {}", requestHeadersJson.toString());
405 String requestBody = data.getString("body");
406 logger.trace("Body {}", requestBody);
407 // Get JSONObject for request query parameters
408 JSONObject requestQueryJson = data.getJSONObject("query");
409 logger.debug("Query {}", requestQueryJson.toString());
410 // Create URI builder with base request URI of openHAB and path from request
411 String newPath = URIUtil.addPaths(localBaseUrl, requestPath);
412 Iterator<String> queryIterator = requestQueryJson.keys();
413 // Add query parameters to URI builder, if any
415 while (queryIterator.hasNext()) {
416 String queryName = queryIterator.next();
417 newPath += queryName;
419 newPath += URLEncoder.encode(requestQueryJson.getString(queryName), "UTF-8");
420 if (queryIterator.hasNext()) {
424 // Finally get the future request URI
425 URI requestUri = new URI(newPath);
426 // All preparations which are common for different methods are done
427 // Now perform the request to openHAB
429 logger.debug("Request method is {}", requestMethod);
430 Request request = jettyClient.newRequest(requestUri);
431 setRequestHeaders(request, requestHeadersJson);
432 String proto = protocol;
433 if (data.has("protocol")) {
434 proto = data.getString("protocol");
436 request.header("X-Forwarded-Proto", proto);
437 HttpMethod method = HttpMethod.fromString(requestMethod);
438 if (method == null) {
439 logger.debug("Unsupported request method {}", requestMethod);
442 request.method(method);
443 if (!requestBody.isEmpty()) {
444 request.content(new BytesContentProvider(requestBody.getBytes()));
447 request.onResponseHeaders(response -> {
448 logger.debug("onHeaders {}", requestId);
449 JSONObject responseJson = new JSONObject();
451 responseJson.put("id", requestId);
452 responseJson.put("headers", getJSONHeaders(response.getHeaders()));
453 responseJson.put("responseStatusCode", response.getStatus());
454 responseJson.put("responseStatusText", "OK");
455 socket.emit("responseHeader", responseJson);
456 logger.trace("Sent headers to request {}", requestId);
457 logger.trace("{}", responseJson.toString());
458 } catch (JSONException e) {
459 logger.debug("{}", e.getMessage());
461 }).onResponseContent((theResponse, content) -> {
462 logger.debug("onResponseContent: {}, content size {}", requestId, String.valueOf(content.remaining()));
463 JSONObject responseJson = new JSONObject();
465 responseJson.put("id", requestId);
466 responseJson.put("body", BufferUtil.toArray(content));
467 if (logger.isTraceEnabled()) {
468 logger.trace("{}", StandardCharsets.UTF_8.decode(content).toString());
470 socket.emit("responseContentBinary", responseJson);
471 logger.trace("Sent content to request {}", requestId);
472 } catch (JSONException e) {
473 logger.debug("{}", e.getMessage());
475 }).onRequestFailure((origRequest, failure) -> {
476 logger.debug("onRequestFailure: {}, {}", requestId, failure.getMessage());
477 JSONObject responseJson = new JSONObject();
479 responseJson.put("id", requestId);
480 responseJson.put("responseStatusText", "openHAB connection error: " + failure.getMessage());
481 socket.emit("responseError", responseJson);
482 } catch (JSONException e) {
483 logger.debug("{}", e.getMessage());
486 logger.debug("onComplete: {}", requestId);
487 // Remove this request from list of running requests
488 runningRequests.remove(requestId);
489 if ((result != null && result.isFailed())
490 && (result.getResponse() != null && result.getResponse().getStatus() != HttpStatus.OK_200)) {
491 if (result.getFailure() != null) {
492 logger.debug("Jetty request {} failed: {}", requestId, result.getFailure().getMessage());
494 if (result.getRequestFailure() != null) {
495 logger.debug("Request Failure: {}", result.getRequestFailure().getMessage());
497 if (result.getResponseFailure() != null) {
498 logger.debug("Response Failure: {}", result.getResponseFailure().getMessage());
501 JSONObject responseJson = new JSONObject();
503 responseJson.put("id", requestId);
504 socket.emit("responseFinished", responseJson);
505 logger.debug("Finished responding to request {}", requestId);
506 } catch (JSONException e) {
507 logger.debug("{}", e.getMessage());
511 // If successfully submitted request to http client, add it to the list of currently
512 // running requests to be able to cancel it if needed
513 runningRequests.put(requestId, request);
514 } catch (JSONException | IOException | URISyntaxException e) {
515 logger.debug("{}", e.getMessage());
519 private void setRequestHeaders(Request request, JSONObject requestHeadersJson) {
520 Iterator<String> headersIterator = requestHeadersJson.keys();
521 // Convert JSONObject of headers into Header ArrayList
522 while (headersIterator.hasNext()) {
523 String headerName = headersIterator.next();
526 headerValue = requestHeadersJson.getString(headerName);
527 logger.debug("Jetty set header {} = {}", headerName, headerValue);
528 if (!headerName.equalsIgnoreCase("Content-Length")) {
529 request.header(headerName, headerValue);
531 } catch (JSONException e) {
532 logger.warn("Error processing request headers: {}", e.getMessage());
537 private void handleCancelEvent(JSONObject data) {
539 int requestId = data.getInt("id");
540 logger.debug("Received cancel for request {}", requestId);
541 // Find and abort running request
542 Request request = runningRequests.get(requestId);
543 if (request != null) {
544 request.abort(new InterruptedException());
545 runningRequests.remove(requestId);
547 } catch (JSONException e) {
548 logger.debug("{}", e.getMessage());
552 private void handleCommandEvent(JSONObject data) {
553 String itemName = data.getString("item");
554 if (exposedItems.contains(itemName)) {
556 logger.debug("Received command {} for item {}.", data.getString("command"), itemName);
557 if (this.listener != null) {
558 this.listener.sendCommand(itemName, data.getString("command"));
560 } catch (JSONException e) {
561 logger.debug("{}", e.getMessage());
564 logger.warn("Received command from openHAB Cloud for item '{}', which is not exposed.", itemName);
569 * This method sends notification to the openHAB Cloud
571 * @param userId openHAB Cloud user id
572 * @param message notification message text
573 * @param icon name of the icon for this notification
574 * @param severity severity name for this notification
576 public void sendNotification(String userId, String message, @Nullable String icon, @Nullable String severity) {
578 JSONObject notificationMessage = new JSONObject();
580 notificationMessage.put("userId", userId);
581 notificationMessage.put("message", message);
582 notificationMessage.put("icon", icon);
583 notificationMessage.put("severity", severity);
584 socket.emit("notification", notificationMessage);
585 } catch (JSONException e) {
586 logger.debug("{}", e.getMessage());
589 logger.debug("No connection, notification is not sent");
594 * This method sends log notification to the openHAB Cloud
596 * @param message notification message text
597 * @param icon name of the icon for this notification
598 * @param severity severity name for this notification
600 public void sendLogNotification(String message, @Nullable String icon, @Nullable String severity) {
602 JSONObject notificationMessage = new JSONObject();
604 notificationMessage.put("message", message);
605 notificationMessage.put("icon", icon);
606 notificationMessage.put("severity", severity);
607 socket.emit("lognotification", notificationMessage);
608 } catch (JSONException e) {
609 logger.debug("{}", e.getMessage());
612 logger.debug("No connection, notification is not sent");
617 * This method sends broadcast notification to the openHAB Cloud
619 * @param message notification message text
620 * @param icon name of the icon for this notification
621 * @param severity severity name for this notification
623 public void sendBroadcastNotification(String message, @Nullable String icon, @Nullable String severity) {
625 JSONObject notificationMessage = new JSONObject();
627 notificationMessage.put("message", message);
628 notificationMessage.put("icon", icon);
629 notificationMessage.put("severity", severity);
630 socket.emit("broadcastnotification", notificationMessage);
631 } catch (JSONException e) {
632 logger.debug("{}", e.getMessage());
635 logger.debug("No connection, notification is not sent");
640 * Send item update to openHAB Cloud
642 * @param itemName the name of the item
643 * @param itemState updated item state
646 public void sendItemUpdate(String itemName, String itemState) {
648 logger.debug("Sending update '{}' for item '{}'", itemState, itemName);
649 JSONObject itemUpdateMessage = new JSONObject();
651 itemUpdateMessage.put("itemName", itemName);
652 itemUpdateMessage.put("itemStatus", itemState);
653 socket.emit("itemupdate", itemUpdateMessage);
654 } catch (JSONException e) {
655 logger.debug("{}", e.getMessage());
658 logger.debug("No connection, Item update is not sent");
663 * Returns true if openHAB Cloud connection is active
665 public boolean isConnected() {
670 * Disconnect from openHAB Cloud
672 public void shutdown() {
673 logger.info("Shutting down openHAB Cloud service connection");
677 public String getOpenHABVersion() {
678 return openHABVersion;
681 public void setOpenHABVersion(String openHABVersion) {
682 this.openHABVersion = openHABVersion;
685 public void setListener(CloudClientListener listener) {
686 this.listener = listener;
689 private JSONObject getJSONHeaders(HttpFields httpFields) {
690 JSONObject headersJSON = new JSONObject();
692 for (HttpField field : httpFields) {
693 headersJSON.put(field.getName(), field.getValue());
695 } catch (JSONException e) {
696 logger.warn("Error forming response headers: {}", e.getMessage());
701 private static String censored(String secret) {
702 if (secret.length() < 4) {
705 return secret.substring(0, 2) + "..." + secret.substring(secret.length() - 2, secret.length());