2 * Copyright (c) 2010-2021 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.io.openhabcloud.internal;
15 import java.io.IOException;
16 import java.net.MalformedURLException;
18 import java.net.URISyntaxException;
20 import java.net.URLEncoder;
21 import java.nio.charset.StandardCharsets;
22 import java.util.Iterator;
23 import java.util.List;
26 import java.util.concurrent.ConcurrentHashMap;
28 import org.eclipse.jdt.annotation.Nullable;
29 import org.eclipse.jetty.client.HttpClient;
30 import org.eclipse.jetty.client.api.Request;
31 import org.eclipse.jetty.client.util.BytesContentProvider;
32 import org.eclipse.jetty.http.HttpField;
33 import org.eclipse.jetty.http.HttpFields;
34 import org.eclipse.jetty.http.HttpMethod;
35 import org.eclipse.jetty.http.HttpStatus;
36 import org.eclipse.jetty.util.BufferUtil;
37 import org.eclipse.jetty.util.URIUtil;
38 import org.json.JSONException;
39 import org.json.JSONObject;
40 import org.openhab.core.OpenHAB;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
44 import io.socket.backo.Backoff;
45 import io.socket.client.IO;
46 import io.socket.client.Manager;
47 import io.socket.client.Socket;
48 import io.socket.emitter.Emitter;
49 import io.socket.engineio.client.Transport;
50 import io.socket.thread.EventThread;
53 * This class provides communication between openHAB and the openHAB Cloud service.
54 * It also implements async http proxy for serving requests from user to
55 * openHAB through the openHAB Cloud. It uses Socket.IO connection to connect to
56 * the openHAB Cloud service and Jetty Http client to send local http requests to
59 * @author Victor Belov - Initial contribution
60 * @author Kai Kreuzer - migrated code to new Jetty client and ESH APIs
62 public class CloudClient {
64 * Logger for this class
66 private final Logger logger = LoggerFactory.getLogger(CloudClient.class);
69 * This variable holds base URL for the openHAB Cloud connections
71 private final String baseURL;
74 * This variable holds openHAB's UUID for authenticating and connecting to the openHAB Cloud
76 private final String uuid;
79 * This variable holds openHAB's secret for authenticating and connecting to the openHAB Cloud
81 private final String secret;
84 * This variable holds local openHAB's base URL for connecting to the local openHAB instance
86 private final String localBaseUrl;
89 * This variable holds instance of Jetty HTTP client to make requests to local openHAB
91 private final HttpClient jettyClient;
94 * This map holds HTTP requests to local openHAB which are currently running
96 private final Map<Integer, Request> runningRequests = new ConcurrentHashMap<>();
99 * This variable indicates if connection to the openHAB Cloud is currently in an established state
101 private boolean isConnected;
104 * This variable holds version of local openHAB
106 private String openHABVersion;
109 * This variable holds instance of Socket.IO client class which provides communication
110 * with the openHAB Cloud
112 private Socket socket;
115 * The protocol of the openHAB-cloud URL.
117 private String protocol = "https";
120 * This variable holds instance of CloudClientListener which provides callbacks to communicate
121 * certain events from the openHAB Cloud back to openHAB
123 private CloudClientListener listener;
124 private boolean remoteAccessEnabled;
125 private Set<String> exposedItems;
128 * Back-off strategy for reconnecting when manual reconnection is needed
130 private final Backoff reconnectBackoff = new Backoff();
133 * Constructor of CloudClient
135 * @param uuid openHAB's UUID to connect to the openHAB Cloud
136 * @param secret openHAB's Secret to connect to the openHAB Cloud
137 * @param remoteAccessEnabled Allow the openHAB Cloud to be used as a remote proxy
138 * @param exposedItems Items that are made available to apps connected to the openHAB Cloud
140 public CloudClient(HttpClient httpClient, String uuid, String secret, String baseURL, String localBaseUrl,
141 boolean remoteAccessEnabled, Set<String> exposedItems) {
143 this.secret = secret;
144 this.baseURL = baseURL;
145 this.localBaseUrl = localBaseUrl;
146 this.remoteAccessEnabled = remoteAccessEnabled;
147 this.exposedItems = exposedItems;
148 this.jettyClient = httpClient;
149 reconnectBackoff.setMin(1000);
150 reconnectBackoff.setMax(30_000);
151 reconnectBackoff.setJitter(0.5);
155 * Connect to the openHAB Cloud
158 public void connect() {
160 socket = IO.socket(baseURL);
161 URL parsed = new URL(baseURL);
162 protocol = parsed.getProtocol();
163 } catch (URISyntaxException e) {
164 logger.error("Error creating Socket.IO: {}", e.getMessage());
165 } catch (MalformedURLException e) {
166 logger.error("Error parsing baseURL to get protocol, assuming https. Error: {}", e.getMessage());
168 socket.io().on(Manager.EVENT_TRANSPORT, new Emitter.Listener() {
170 public void call(Object... args) {
171 logger.trace("Manager.EVENT_TRANSPORT");
172 Transport transport = (Transport) args[0];
173 transport.on(Transport.EVENT_REQUEST_HEADERS, new Emitter.Listener() {
175 public void call(Object... args) {
176 logger.trace("Transport.EVENT_REQUEST_HEADERS");
177 @SuppressWarnings("unchecked")
178 Map<String, List<String>> headers = (Map<String, List<String>>) args[0];
179 headers.put("uuid", List.of(uuid));
180 headers.put("secret", List.of(secret));
181 headers.put("openhabversion", List.of(OpenHAB.getVersion()));
182 headers.put("clientversion", List.of(CloudService.clientVersion));
183 headers.put("remoteaccess", List.of(((Boolean) remoteAccessEnabled).toString()));
187 }).on(Manager.EVENT_CONNECT_ERROR, new Emitter.Listener() {
190 public void call(Object... args) {
191 if (args.length > 0) {
192 if (args[0] instanceof Exception) {
193 Exception e = (Exception) args[0];
195 "Error connecting to the openHAB Cloud instance: {} {}. Should reconnect automatically.",
196 e.getClass().getSimpleName(), e.getMessage());
199 "Error connecting to the openHAB Cloud instance: {}. Should reconnect automatically.",
203 logger.debug("Error connecting to the openHAB Cloud instance. Should reconnect automatically.");
207 socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
209 public void call(Object... args) {
210 logger.debug("Socket.IO connected");
214 }).on(Socket.EVENT_CONNECTING, new Emitter.Listener() {
216 public void call(Object... args) {
217 logger.debug("Socket.IO connecting");
219 }).on(Socket.EVENT_RECONNECTING, new Emitter.Listener() {
221 public void call(Object... args) {
222 logger.debug("Socket.IO re-connecting (attempt {})", args[0]);
224 }).on(Socket.EVENT_RECONNECT, new Emitter.Listener() {
226 public void call(Object... args) {
227 logger.debug("Socket.IO re-connected successfully (attempt {})", args[0]);
229 }).on(Socket.EVENT_RECONNECT_ERROR, new Emitter.Listener() {
231 public void call(Object... args) {
232 if (args[0] instanceof Exception) {
233 Exception e = (Exception) args[0];
234 logger.debug("Socket.IO re-connect attempt error: {} {}", e.getClass().getSimpleName(),
237 logger.debug("Socket.IO re-connect attempt error: {}", args[0]);
240 }).on(Socket.EVENT_RECONNECT_FAILED, new Emitter.Listener() {
242 public void call(Object... args) {
243 logger.debug("Socket.IO re-connect attempts failed. Stopping reconnection.");
245 }).on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {
247 public void call(Object... args) {
248 if (args.length > 0) {
249 logger.debug("Socket.IO disconnected: {}", args[0]);
251 logger.debug("Socket.IO disconnected");
256 }).on(Socket.EVENT_ERROR, new Emitter.Listener() {
258 public void call(Object... args) {
259 if (CloudClient.this.socket.connected()) {
260 if (logger.isDebugEnabled() && args.length > 0) {
261 logger.error("Error during communication: {}", args[0]);
263 logger.error("Error during communication");
266 // We are not connected currently, manual reconnection is needed to keep trying to (re-)establish
269 // Socket.IO 1.x java client: 'error' event is emitted from Socket on connection errors that are not
270 // retried, but also with error that are automatically retried. If we
272 // Note how this is different in Socket.IO 2.x java client, Socket emits 'connect_error' event.
273 // OBS: Don't get confused with Socket IO 2.x docs online, in 1.x connect_error is emitted also on
274 // errors that are retried by the library automatically!
275 long delay = reconnectBackoff.duration();
276 // Try reconnecting on connection errors
277 if (logger.isDebugEnabled() && args.length > 0) {
278 if (args[0] instanceof Exception) {
279 Exception e = (Exception) args[0];
281 "Error connecting to the openHAB Cloud instance: {} {}. Reconnecting after {} ms.",
282 e.getClass().getSimpleName(), e.getMessage(), delay);
285 "Error connecting to the openHAB Cloud instance: {}. Reconnecting after {} ms.",
289 logger.error("Error connecting to the openHAB Cloud instance. Reconnecting.");
292 sleepSocketIO(delay);
296 }).on("request", new Emitter.Listener() {
298 public void call(Object... args) {
299 onEvent("request", (JSONObject) args[0]);
301 }).on("cancel", new Emitter.Listener() {
303 public void call(Object... args) {
304 onEvent("cancel", (JSONObject) args[0]);
306 }).on("command", new Emitter.Listener() {
309 public void call(Object... args) {
310 onEvent("command", (JSONObject) args[0]);
317 * Callback method for socket.io client which is called when connection is established
320 public void onConnect() {
321 logger.info("Connected to the openHAB Cloud service (UUID = {}, base URL = {})", this.uuid, this.localBaseUrl);
322 reconnectBackoff.reset();
327 * Callback method for socket.io client which is called when disconnect occurs
330 public void onDisconnect() {
331 logger.info("Disconnected from the openHAB Cloud service (UUID = {}, base URL = {})", this.uuid,
334 // And clean up the list of running requests
335 runningRequests.clear();
339 * Callback method for socket.io client which is called when an error occurs
342 public void onError(IOException error) {
343 logger.debug("{}", error.getMessage());
347 * Callback method for socket.io client which is called when a message is received
350 public void onEvent(String event, JSONObject data) {
351 logger.debug("on(): {}", event);
352 if ("command".equals(event)) {
353 handleCommandEvent(data);
356 if (remoteAccessEnabled) {
357 if ("request".equals(event)) {
358 handleRequestEvent(data);
359 } else if ("cancel".equals(event)) {
360 handleCancelEvent(data);
362 logger.warn("Unsupported event from openHAB Cloud: {}", event);
367 private void handleRequestEvent(JSONObject data) {
369 // Get unique request Id
370 int requestId = data.getInt("id");
371 logger.debug("Got request {}", requestId);
373 String requestPath = data.getString("path");
374 logger.debug("Path {}", requestPath);
375 // Get request method
376 String requestMethod = data.getString("method");
377 logger.debug("Method {}", requestMethod);
378 // Get JSONObject for request headers
379 JSONObject requestHeadersJson = data.getJSONObject("headers");
380 logger.debug("Headers: {}", requestHeadersJson.toString());
382 String requestBody = data.getString("body");
383 logger.trace("Body {}", requestBody);
384 // Get JSONObject for request query parameters
385 JSONObject requestQueryJson = data.getJSONObject("query");
386 logger.debug("Query {}", requestQueryJson.toString());
387 // Create URI builder with base request URI of openHAB and path from request
388 String newPath = URIUtil.addPaths(localBaseUrl, requestPath);
389 Iterator<String> queryIterator = requestQueryJson.keys();
390 // Add query parameters to URI builder, if any
392 while (queryIterator.hasNext()) {
393 String queryName = queryIterator.next();
394 newPath += queryName;
396 newPath += URLEncoder.encode(requestQueryJson.getString(queryName), "UTF-8");
397 if (queryIterator.hasNext()) {
401 // Finally get the future request URI
402 URI requestUri = new URI(newPath);
403 // All preparations which are common for different methods are done
404 // Now perform the request to openHAB
406 logger.debug("Request method is {}", requestMethod);
407 Request request = jettyClient.newRequest(requestUri);
408 setRequestHeaders(request, requestHeadersJson);
409 String proto = protocol;
410 if (data.has("protocol")) {
411 proto = data.getString("protocol");
413 request.header("X-Forwarded-Proto", proto);
414 HttpMethod method = HttpMethod.fromString(requestMethod);
415 if (method == null) {
416 logger.debug("Unsupported request method {}", requestMethod);
419 request.method(method);
420 if (!requestBody.isEmpty()) {
421 request.content(new BytesContentProvider(requestBody.getBytes()));
424 request.onResponseHeaders(response -> {
425 logger.debug("onHeaders {}", requestId);
426 JSONObject responseJson = new JSONObject();
428 responseJson.put("id", requestId);
429 responseJson.put("headers", getJSONHeaders(response.getHeaders()));
430 responseJson.put("responseStatusCode", response.getStatus());
431 responseJson.put("responseStatusText", "OK");
432 socket.emit("responseHeader", responseJson);
433 logger.trace("Sent headers to request {}", requestId);
434 logger.trace("{}", responseJson.toString());
435 } catch (JSONException e) {
436 logger.debug("{}", e.getMessage());
438 }).onResponseContent((theResponse, content) -> {
439 logger.debug("onResponseContent: {}, content size {}", requestId, String.valueOf(content.remaining()));
440 JSONObject responseJson = new JSONObject();
442 responseJson.put("id", requestId);
443 responseJson.put("body", BufferUtil.toArray(content));
444 if (logger.isTraceEnabled()) {
445 logger.trace("{}", StandardCharsets.UTF_8.decode(content).toString());
447 socket.emit("responseContentBinary", responseJson);
448 logger.trace("Sent content to request {}", requestId);
449 } catch (JSONException e) {
450 logger.debug("{}", e.getMessage());
452 }).onRequestFailure((origRequest, failure) -> {
453 logger.debug("onRequestFailure: {}, {}", requestId, failure.getMessage());
454 JSONObject responseJson = new JSONObject();
456 responseJson.put("id", requestId);
457 responseJson.put("responseStatusText", "openHAB connection error: " + failure.getMessage());
458 socket.emit("responseError", responseJson);
459 } catch (JSONException e) {
460 logger.debug("{}", e.getMessage());
463 logger.debug("onComplete: {}", requestId);
464 // Remove this request from list of running requests
465 runningRequests.remove(requestId);
466 if ((result != null && result.isFailed())
467 && (result.getResponse() != null && result.getResponse().getStatus() != HttpStatus.OK_200)) {
468 if (result.getFailure() != null) {
469 logger.debug("Jetty request {} failed: {}", requestId, result.getFailure().getMessage());
471 if (result.getRequestFailure() != null) {
472 logger.debug("Request Failure: {}", result.getRequestFailure().getMessage());
474 if (result.getResponseFailure() != null) {
475 logger.debug("Response Failure: {}", result.getResponseFailure().getMessage());
478 JSONObject responseJson = new JSONObject();
480 responseJson.put("id", requestId);
481 socket.emit("responseFinished", responseJson);
482 logger.debug("Finished responding to request {}", requestId);
483 } catch (JSONException e) {
484 logger.debug("{}", e.getMessage());
488 // If successfully submitted request to http client, add it to the list of currently
489 // running requests to be able to cancel it if needed
490 runningRequests.put(requestId, request);
491 } catch (JSONException | IOException | URISyntaxException e) {
492 logger.debug("{}", e.getMessage());
496 private void setRequestHeaders(Request request, JSONObject requestHeadersJson) {
497 Iterator<String> headersIterator = requestHeadersJson.keys();
498 // Convert JSONObject of headers into Header ArrayList
499 while (headersIterator.hasNext()) {
500 String headerName = headersIterator.next();
503 headerValue = requestHeadersJson.getString(headerName);
504 logger.debug("Jetty set header {} = {}", headerName, headerValue);
505 if (!headerName.equalsIgnoreCase("Content-Length")) {
506 request.header(headerName, headerValue);
508 } catch (JSONException e) {
509 logger.warn("Error processing request headers: {}", e.getMessage());
514 private void handleCancelEvent(JSONObject data) {
516 int requestId = data.getInt("id");
517 logger.debug("Received cancel for request {}", requestId);
518 // Find and abort running request
519 Request request = runningRequests.get(requestId);
520 if (request != null) {
521 request.abort(new InterruptedException());
522 runningRequests.remove(requestId);
524 } catch (JSONException e) {
525 logger.debug("{}", e.getMessage());
529 private void handleCommandEvent(JSONObject data) {
530 String itemName = data.getString("item");
531 if (exposedItems.contains(itemName)) {
533 logger.debug("Received command {} for item {}.", data.getString("command"), itemName);
534 if (this.listener != null) {
535 this.listener.sendCommand(itemName, data.getString("command"));
537 } catch (JSONException e) {
538 logger.debug("{}", e.getMessage());
541 logger.warn("Received command from openHAB Cloud for item '{}', which is not exposed.", itemName);
546 * This method sends notification to the openHAB Cloud
548 * @param userId openHAB Cloud user id
549 * @param message notification message text
550 * @param icon name of the icon for this notification
551 * @param severity severity name for this notification
553 public void sendNotification(String userId, String message, @Nullable String icon, @Nullable String severity) {
555 JSONObject notificationMessage = new JSONObject();
557 notificationMessage.put("userId", userId);
558 notificationMessage.put("message", message);
559 notificationMessage.put("icon", icon);
560 notificationMessage.put("severity", severity);
561 socket.emit("notification", notificationMessage);
562 } catch (JSONException e) {
563 logger.debug("{}", e.getMessage());
566 logger.debug("No connection, notification is not sent");
571 * This method sends log notification to the openHAB Cloud
573 * @param message notification message text
574 * @param icon name of the icon for this notification
575 * @param severity severity name for this notification
577 public void sendLogNotification(String message, @Nullable String icon, @Nullable String severity) {
579 JSONObject notificationMessage = new JSONObject();
581 notificationMessage.put("message", message);
582 notificationMessage.put("icon", icon);
583 notificationMessage.put("severity", severity);
584 socket.emit("lognotification", notificationMessage);
585 } catch (JSONException e) {
586 logger.debug("{}", e.getMessage());
589 logger.debug("No connection, notification is not sent");
594 * This method sends broadcast notification to the openHAB Cloud
596 * @param message notification message text
597 * @param icon name of the icon for this notification
598 * @param severity severity name for this notification
600 public void sendBroadcastNotification(String message, @Nullable String icon, @Nullable String severity) {
602 JSONObject notificationMessage = new JSONObject();
604 notificationMessage.put("message", message);
605 notificationMessage.put("icon", icon);
606 notificationMessage.put("severity", severity);
607 socket.emit("broadcastnotification", notificationMessage);
608 } catch (JSONException e) {
609 logger.debug("{}", e.getMessage());
612 logger.debug("No connection, notification is not sent");
617 * Send item update to openHAB Cloud
619 * @param itemName the name of the item
620 * @param itemState updated item state
623 public void sendItemUpdate(String itemName, String itemState) {
625 logger.debug("Sending update '{}' for item '{}'", itemState, itemName);
626 JSONObject itemUpdateMessage = new JSONObject();
628 itemUpdateMessage.put("itemName", itemName);
629 itemUpdateMessage.put("itemStatus", itemState);
630 socket.emit("itemupdate", itemUpdateMessage);
631 } catch (JSONException e) {
632 logger.debug("{}", e.getMessage());
635 logger.debug("No connection, Item update is not sent");
640 * Returns true if openHAB Cloud connection is active
642 public boolean isConnected() {
647 * Disconnect from openHAB Cloud
649 public void shutdown() {
650 logger.info("Shutting down openHAB Cloud service connection");
654 public String getOpenHABVersion() {
655 return openHABVersion;
658 public void setOpenHABVersion(String openHABVersion) {
659 this.openHABVersion = openHABVersion;
662 public void setListener(CloudClientListener listener) {
663 this.listener = listener;
666 private JSONObject getJSONHeaders(HttpFields httpFields) {
667 JSONObject headersJSON = new JSONObject();
669 for (HttpField field : httpFields) {
670 headersJSON.put(field.getName(), field.getValue());
672 } catch (JSONException e) {
673 logger.warn("Error forming response headers: {}", e.getMessage());
678 private void sleepSocketIO(long delay) {
679 EventThread.exec(() -> {
682 } catch (InterruptedException e) {