]> git.basschouten.com Git - openhab-addons.git/blob
6e2fbc63d805511d4aee868f0512adeb56d768d4
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.io.openhabcloud.internal;
14
15 import java.io.IOException;
16 import java.net.MalformedURLException;
17 import java.net.URI;
18 import java.net.URISyntaxException;
19 import java.net.URL;
20 import java.net.URLEncoder;
21 import java.nio.charset.StandardCharsets;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Optional;
26 import java.util.Set;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.ScheduledExecutorService;
29 import java.util.concurrent.ScheduledFuture;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicReference;
32
33 import org.eclipse.jdt.annotation.Nullable;
34 import org.eclipse.jetty.client.HttpClient;
35 import org.eclipse.jetty.client.api.Request;
36 import org.eclipse.jetty.client.util.BytesContentProvider;
37 import org.eclipse.jetty.http.HttpField;
38 import org.eclipse.jetty.http.HttpFields;
39 import org.eclipse.jetty.http.HttpMethod;
40 import org.eclipse.jetty.http.HttpStatus;
41 import org.eclipse.jetty.util.BufferUtil;
42 import org.eclipse.jetty.util.URIUtil;
43 import org.json.JSONException;
44 import org.json.JSONObject;
45 import org.openhab.core.OpenHAB;
46 import org.openhab.core.common.ThreadPoolManager;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 import io.socket.backo.Backoff;
51 import io.socket.client.IO;
52 import io.socket.client.IO.Options;
53 import io.socket.client.Manager;
54 import io.socket.client.Socket;
55 import io.socket.emitter.Emitter;
56 import io.socket.engineio.client.Transport;
57 import io.socket.engineio.client.transports.WebSocket;
58 import io.socket.parser.Packet;
59 import io.socket.parser.Parser;
60 import okhttp3.OkHttpClient.Builder;
61 import okhttp3.logging.HttpLoggingInterceptor;
62 import okhttp3.logging.HttpLoggingInterceptor.Level;
63
64 /**
65  * This class provides communication between openHAB and the openHAB Cloud service.
66  * It also implements async http proxy for serving requests from user to
67  * openHAB through the openHAB Cloud. It uses Socket.IO connection to connect to
68  * the openHAB Cloud service and Jetty Http client to send local http requests to
69  * openHAB.
70  *
71  * @author Victor Belov - Initial contribution
72  * @author Kai Kreuzer - migrated code to new Jetty client and ESH APIs
73  */
74 public class CloudClient {
75
76     private static final long RECONNECT_MIN = 2_000;
77
78     private static final long RECONNECT_MAX = 60_000;
79
80     private static final double RECONNECT_JITTER = 0.75;
81
82     private static final long READ_TIMEOUT = 60_0000;
83
84     /*
85      * Logger for this class
86      */
87     private final Logger logger = LoggerFactory.getLogger(CloudClient.class);
88
89     /*
90      * This variable holds base URL for the openHAB Cloud connections
91      */
92     private final String baseURL;
93
94     /*
95      * This variable holds openHAB's UUID for authenticating and connecting to the openHAB Cloud
96      */
97     private final String uuid;
98
99     /*
100      * This variable holds openHAB's secret for authenticating and connecting to the openHAB Cloud
101      */
102     private final String secret;
103
104     /*
105      * This variable holds local openHAB's base URL for connecting to the local openHAB instance
106      */
107     private final String localBaseUrl;
108
109     /*
110      * This variable holds instance of Jetty HTTP client to make requests to local openHAB
111      */
112     private final HttpClient jettyClient;
113
114     /*
115      * This map holds HTTP requests to local openHAB which are currently running
116      */
117     private final Map<Integer, Request> runningRequests = new ConcurrentHashMap<>();
118
119     /*
120      * This variable indicates if connection to the openHAB Cloud is currently in an established state
121      */
122     private boolean isConnected;
123
124     /*
125      * This variable holds instance of Socket.IO client class which provides communication
126      * with the openHAB Cloud
127      */
128     private Socket socket;
129
130     /*
131      * The protocol of the openHAB-cloud URL.
132      */
133     private String protocol = "https";
134
135     /*
136      * This variable holds instance of CloudClientListener which provides callbacks to communicate
137      * certain events from the openHAB Cloud back to openHAB
138      */
139     private CloudClientListener listener;
140     private boolean remoteAccessEnabled;
141     private Set<String> exposedItems;
142
143     /**
144      * Back-off strategy for reconnecting when manual reconnection is needed
145      */
146     private final Backoff reconnectBackoff = new Backoff();
147
148     /*
149      * Delay reconnect scheduler pool
150      *
151      */
152     protected final ScheduledExecutorService scheduler = ThreadPoolManager
153             .getScheduledPool(ThreadPoolManager.THREAD_POOL_NAME_COMMON);
154
155     @SuppressWarnings("null")
156     private final AtomicReference<Optional<ScheduledFuture<?>>> reconnectFuture = new AtomicReference<>(
157             Optional.empty());
158
159     /**
160      * Constructor of CloudClient
161      *
162      * @param uuid openHAB's UUID to connect to the openHAB Cloud
163      * @param secret openHAB's Secret to connect to the openHAB Cloud
164      * @param remoteAccessEnabled Allow the openHAB Cloud to be used as a remote proxy
165      * @param exposedItems Items that are made available to apps connected to the openHAB Cloud
166      */
167     public CloudClient(HttpClient httpClient, String uuid, String secret, String baseURL, String localBaseUrl,
168             boolean remoteAccessEnabled, Set<String> exposedItems) {
169         this.uuid = uuid;
170         this.secret = secret;
171         this.baseURL = baseURL;
172         this.localBaseUrl = localBaseUrl;
173         this.remoteAccessEnabled = remoteAccessEnabled;
174         this.exposedItems = exposedItems;
175         this.jettyClient = httpClient;
176         reconnectBackoff.setMin(RECONNECT_MIN);
177         reconnectBackoff.setMax(RECONNECT_MAX);
178         reconnectBackoff.setJitter(RECONNECT_JITTER);
179     }
180
181     /**
182      * Connect to the openHAB Cloud
183      */
184
185     public void connect() {
186         try {
187             Options options = new Options();
188             options.transports = new String[] { WebSocket.NAME };
189             options.reconnection = true;
190             options.reconnectionAttempts = Integer.MAX_VALUE;
191             options.reconnectionDelay = RECONNECT_MIN;
192             options.reconnectionDelayMax = RECONNECT_MAX;
193             options.randomizationFactor = RECONNECT_JITTER;
194             options.timeout = READ_TIMEOUT;
195             Builder okHttpBuilder = new Builder();
196             okHttpBuilder.readTimeout(READ_TIMEOUT, TimeUnit.MILLISECONDS);
197             if (logger.isTraceEnabled()) {
198                 // When trace level logging is enabled, we activate further logging of HTTP calls
199                 // of the Socket.IO library
200                 HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor();
201                 loggingInterceptor.setLevel(Level.BASIC);
202                 okHttpBuilder.addInterceptor(loggingInterceptor);
203                 okHttpBuilder.addNetworkInterceptor(loggingInterceptor);
204             }
205             options.callFactory = okHttpBuilder.build();
206             options.webSocketFactory = okHttpBuilder.build();
207             socket = IO.socket(baseURL, options);
208             URL parsed = new URL(baseURL);
209             protocol = parsed.getProtocol();
210         } catch (URISyntaxException e) {
211             logger.error("Error creating Socket.IO: {}", e.getMessage());
212             return;
213         } catch (MalformedURLException e) {
214             logger.error("Error parsing baseURL to get protocol, assuming https. Error: {}", e.getMessage());
215             return;
216         }
217         //
218         // socket manager events
219         //
220         socket.io()//
221                 .on(Manager.EVENT_TRANSPORT, args -> {
222                     logger.trace("Manager.EVENT_TRANSPORT");
223                     Transport transport = (Transport) args[0];
224                     transport.on(Transport.EVENT_REQUEST_HEADERS, new Emitter.Listener() {
225                         @Override
226                         public void call(Object... args) {
227                             logger.trace("Transport.EVENT_REQUEST_HEADERS");
228                             @SuppressWarnings("unchecked")
229                             Map<String, List<String>> headers = (Map<String, List<String>>) args[0];
230                             headers.put("uuid", List.of(uuid));
231                             headers.put("secret", List.of(secret));
232                             headers.put("openhabversion", List.of(OpenHAB.getVersion()));
233                             headers.put("clientversion", List.of(CloudService.clientVersion));
234                             headers.put("remoteaccess", List.of(((Boolean) remoteAccessEnabled).toString()));
235                         }
236                     });
237                 })//
238                 .on(Manager.EVENT_CONNECT_ERROR, args -> {
239                     if (args.length > 0) {
240                         if (args[0] instanceof Exception) {
241                             Exception e = (Exception) args[0];
242                             logger.debug(
243                                     "Error connecting to the openHAB Cloud instance: {} {}. Should reconnect automatically.",
244                                     e.getClass().getSimpleName(), e.getMessage());
245                         } else {
246                             logger.debug(
247                                     "Error connecting to the openHAB Cloud instance: {}. Should reconnect automatically.",
248                                     args[0]);
249                         }
250                     } else {
251                         logger.debug("Error connecting to the openHAB Cloud instance. Should reconnect automatically.");
252                     }
253                 })//
254                 .on(Manager.EVENT_OPEN, args -> logger.debug("Socket.IO OPEN"))//
255                 .on(Manager.EVENT_CLOSE, args -> logger.debug("Socket.IO CLOSE: {}", args[0]))//
256                 .on(Manager.EVENT_PACKET, args -> {
257                     int packetTypeIndex = -1;
258                     String type = "<unexpected packet type>";
259                     if (args.length == 1 && args[0] instanceof Packet<?>) {
260                         packetTypeIndex = ((Packet<?>) args[0]).type;
261
262                         if (packetTypeIndex < Parser.types.length) {
263                             type = Parser.types[packetTypeIndex];
264                         } else {
265                             type = "<unknown type>";
266                         }
267                     }
268                     logger.trace("Socket.IO Packet: {} ({})", type, packetTypeIndex);
269                 })//
270         ;
271
272         //
273         // socket events
274         //
275         socket.on(Socket.EVENT_CONNECT, args -> {
276             logger.debug("Socket.IO connected");
277             isConnected = true;
278             onConnect();
279         })//
280                 .on(Socket.EVENT_CONNECTING, args -> logger.debug("Socket.IO connecting"))//
281                 .on(Socket.EVENT_RECONNECTING, args -> logger.debug("Socket.IO re-connecting (attempt {})", args[0]))//
282                 .on(Socket.EVENT_RECONNECT,
283                         args -> logger.debug("Socket.IO re-connected successfully (attempt {})", args[0]))//
284                 .on(Socket.EVENT_RECONNECT_ERROR, args -> {
285                     if (args[0] instanceof Exception) {
286                         Exception e = (Exception) args[0];
287                         logger.debug("Socket.IO re-connect attempt error: {} {}", e.getClass().getSimpleName(),
288                                 e.getMessage());
289                     } else {
290                         logger.debug("Socket.IO re-connect attempt error: {}", args[0]);
291                     }
292                 })//
293                 .on(Socket.EVENT_RECONNECT_FAILED,
294                         args -> logger.debug("Socket.IO re-connect attempts failed. Stopping reconnection."))//
295                 .on(Socket.EVENT_DISCONNECT, args -> {
296                     String message = args.length > 0 ? args[0].toString() : "";
297                     logger.warn("Socket.IO disconnected: {}", message);
298                     isConnected = false;
299                     onDisconnect();
300                     // https://github.com/socketio/socket.io-client/commit/afb952d854e1d8728ce07b7c3a9f0dee2a61ef4e
301                     if ("io server disconnect".equals(message)) {
302                         socket.close();
303                         long delay = reconnectBackoff.duration();
304                         logger.warn("Reconnecting after {} ms.", delay);
305                         scheduleReconnect(delay);
306                     }
307                 })//
308                 .on(Socket.EVENT_ERROR, args -> {
309                     if (CloudClient.this.socket.connected()) {
310                         if (args.length > 0) {
311                             if (args[0] instanceof Exception) {
312                                 Exception e = (Exception) args[0];
313                                 logger.warn("Error during communication: {} {}", e.getClass().getSimpleName(),
314                                         e.getMessage());
315                             } else {
316                                 logger.warn("Error during communication: {}", args[0]);
317                             }
318                         } else {
319                             logger.warn("Error during communication");
320                         }
321                     } else {
322                         // We are not connected currently, manual reconnection is needed to keep trying to
323                         // (re-)establish
324                         // connection.
325                         //
326                         // Socket.IO 1.x java client: 'error' event is emitted from Socket on connection errors that
327                         // are not
328                         // retried, but also with error that are automatically retried. If we
329                         //
330                         // Note how this is different in Socket.IO 2.x java client, Socket emits 'connect_error'
331                         // event.
332                         // OBS: Don't get confused with Socket IO 2.x docs online, in 1.x connect_error is emitted
333                         // also on
334                         // errors that are retried by the library automatically!
335                         long delay = reconnectBackoff.duration();
336                         // Try reconnecting on connection errors
337                         if (args.length > 0) {
338                             if (args[0] instanceof Exception) {
339                                 Exception e = (Exception) args[0];
340                                 logger.warn(
341                                         "Error connecting to the openHAB Cloud instance: {} {}. Reconnecting after {} ms.",
342                                         e.getClass().getSimpleName(), e.getMessage(), delay);
343                             } else {
344                                 logger.warn(
345                                         "Error connecting to the openHAB Cloud instance: {}. Reconnecting after {} ms.",
346                                         args[0], delay);
347                             }
348                         } else {
349                             logger.warn("Error connecting to the openHAB Cloud instance. Reconnecting.");
350                         }
351                         socket.close();
352                         scheduleReconnect(delay);
353                     }
354                 })//
355
356                 .on(Socket.EVENT_PING, args -> logger.debug("Socket.IO ping"))//
357                 .on(Socket.EVENT_PONG, args -> logger.debug("Socket.IO pong: {} ms", args[0]))//
358                 .on("request", args -> onEvent("request", (JSONObject) args[0]))//
359                 .on("cancel", args -> onEvent("cancel", (JSONObject) args[0]))//
360                 .on("command", args -> onEvent("command", (JSONObject) args[0]))//
361         ;
362         socket.connect();
363     }
364
365     /**
366      * Callback method for socket.io client which is called when connection is established
367      */
368
369     public void onConnect() {
370         logger.info("Connected to the openHAB Cloud service (UUID = {}, base URL = {})", censored(this.uuid),
371                 this.localBaseUrl);
372         reconnectBackoff.reset();
373         isConnected = true;
374     }
375
376     /**
377      * Callback method for socket.io client which is called when disconnect occurs
378      */
379
380     public void onDisconnect() {
381         logger.info("Disconnected from the openHAB Cloud service (UUID = {}, base URL = {})", censored(this.uuid),
382                 this.localBaseUrl);
383         isConnected = false;
384         // And clean up the list of running requests
385         runningRequests.clear();
386     }
387
388     /**
389      * Callback method for socket.io client which is called when a message is received
390      */
391
392     public void onEvent(String event, JSONObject data) {
393         logger.debug("on(): {}", event);
394         if ("command".equals(event)) {
395             handleCommandEvent(data);
396             return;
397         }
398         if (remoteAccessEnabled) {
399             if ("request".equals(event)) {
400                 handleRequestEvent(data);
401             } else if ("cancel".equals(event)) {
402                 handleCancelEvent(data);
403             } else {
404                 logger.warn("Unsupported event from openHAB Cloud: {}", event);
405             }
406         }
407     }
408
409     private void handleRequestEvent(JSONObject data) {
410         try {
411             // Get unique request Id
412             int requestId = data.getInt("id");
413             logger.debug("Got request {}", requestId);
414             // Get request path
415             String requestPath = data.getString("path");
416             logger.debug("Path {}", requestPath);
417             // Get request method
418             String requestMethod = data.getString("method");
419             logger.debug("Method {}", requestMethod);
420             // Get JSONObject for request headers
421             JSONObject requestHeadersJson = data.getJSONObject("headers");
422             logger.debug("Headers: {}", requestHeadersJson.toString());
423             // Get request body
424             String requestBody = data.getString("body");
425             logger.trace("Body {}", requestBody);
426             // Get JSONObject for request query parameters
427             JSONObject requestQueryJson = data.getJSONObject("query");
428             logger.debug("Query {}", requestQueryJson.toString());
429             // Create URI builder with base request URI of openHAB and path from request
430             String newPath = URIUtil.addPaths(localBaseUrl, requestPath);
431             Iterator<String> queryIterator = requestQueryJson.keys();
432             // Add query parameters to URI builder, if any
433             newPath += "?";
434             while (queryIterator.hasNext()) {
435                 String queryName = queryIterator.next();
436                 newPath += queryName;
437                 newPath += "=";
438                 newPath += URLEncoder.encode(requestQueryJson.getString(queryName), "UTF-8");
439                 if (queryIterator.hasNext()) {
440                     newPath += "&";
441                 }
442             }
443             // Finally get the future request URI
444             URI requestUri = new URI(newPath);
445             // All preparations which are common for different methods are done
446             // Now perform the request to openHAB
447             // If method is GET
448             logger.debug("Request method is {}", requestMethod);
449             Request request = jettyClient.newRequest(requestUri);
450             setRequestHeaders(request, requestHeadersJson);
451             String proto = protocol;
452             if (data.has("protocol")) {
453                 proto = data.getString("protocol");
454             }
455             request.header("X-Forwarded-Proto", proto);
456             HttpMethod method = HttpMethod.fromString(requestMethod);
457             if (method == null) {
458                 logger.debug("Unsupported request method {}", requestMethod);
459                 return;
460             }
461             request.method(method);
462             if (!requestBody.isEmpty()) {
463                 request.content(new BytesContentProvider(requestBody.getBytes()));
464             }
465
466             request.onResponseHeaders(response -> {
467                 logger.debug("onHeaders {}", requestId);
468                 JSONObject responseJson = new JSONObject();
469                 try {
470                     responseJson.put("id", requestId);
471                     responseJson.put("headers", getJSONHeaders(response.getHeaders()));
472                     responseJson.put("responseStatusCode", response.getStatus());
473                     responseJson.put("responseStatusText", "OK");
474                     socket.emit("responseHeader", responseJson);
475                     logger.trace("Sent headers to request {}", requestId);
476                     logger.trace("{}", responseJson.toString());
477                 } catch (JSONException e) {
478                     logger.debug("{}", e.getMessage());
479                 }
480             }).onResponseContent((theResponse, content) -> {
481                 logger.debug("onResponseContent: {}, content size {}", requestId, String.valueOf(content.remaining()));
482                 JSONObject responseJson = new JSONObject();
483                 try {
484                     responseJson.put("id", requestId);
485                     responseJson.put("body", BufferUtil.toArray(content));
486                     if (logger.isTraceEnabled()) {
487                         logger.trace("{}", StandardCharsets.UTF_8.decode(content).toString());
488                     }
489                     socket.emit("responseContentBinary", responseJson);
490                     logger.trace("Sent content to request {}", requestId);
491                 } catch (JSONException e) {
492                     logger.debug("{}", e.getMessage());
493                 }
494             }).onRequestFailure((origRequest, failure) -> {
495                 logger.debug("onRequestFailure: {},  {}", requestId, failure.getMessage());
496                 JSONObject responseJson = new JSONObject();
497                 try {
498                     responseJson.put("id", requestId);
499                     responseJson.put("responseStatusText", "openHAB connection error: " + failure.getMessage());
500                     socket.emit("responseError", responseJson);
501                 } catch (JSONException e) {
502                     logger.debug("{}", e.getMessage());
503                 }
504             }).send(result -> {
505                 logger.debug("onComplete: {}", requestId);
506                 // Remove this request from list of running requests
507                 runningRequests.remove(requestId);
508                 if ((result != null && result.isFailed())
509                         && (result.getResponse() != null && result.getResponse().getStatus() != HttpStatus.OK_200)) {
510                     if (result.getFailure() != null) {
511                         logger.debug("Jetty request {} failed: {}", requestId, result.getFailure().getMessage());
512                     }
513                     if (result.getRequestFailure() != null) {
514                         logger.debug("Request Failure: {}", result.getRequestFailure().getMessage());
515                     }
516                     if (result.getResponseFailure() != null) {
517                         logger.debug("Response Failure: {}", result.getResponseFailure().getMessage());
518                     }
519                 }
520                 JSONObject responseJson = new JSONObject();
521                 try {
522                     responseJson.put("id", requestId);
523                     socket.emit("responseFinished", responseJson);
524                     logger.debug("Finished responding to request {}", requestId);
525                 } catch (JSONException e) {
526                     logger.debug("{}", e.getMessage());
527                 }
528             });
529
530             // If successfully submitted request to http client, add it to the list of currently
531             // running requests to be able to cancel it if needed
532             runningRequests.put(requestId, request);
533         } catch (JSONException | IOException | URISyntaxException e) {
534             logger.debug("{}", e.getMessage());
535         }
536     }
537
538     private void setRequestHeaders(Request request, JSONObject requestHeadersJson) {
539         Iterator<String> headersIterator = requestHeadersJson.keys();
540         // Convert JSONObject of headers into Header ArrayList
541         while (headersIterator.hasNext()) {
542             String headerName = headersIterator.next();
543             String headerValue;
544             try {
545                 headerValue = requestHeadersJson.getString(headerName);
546                 logger.debug("Jetty set header {} = {}", headerName, headerValue);
547                 if (!headerName.equalsIgnoreCase("Content-Length")) {
548                     request.header(headerName, headerValue);
549                 }
550             } catch (JSONException e) {
551                 logger.warn("Error processing request headers: {}", e.getMessage());
552             }
553         }
554     }
555
556     private void handleCancelEvent(JSONObject data) {
557         try {
558             int requestId = data.getInt("id");
559             logger.debug("Received cancel for request {}", requestId);
560             // Find and abort running request
561             Request request = runningRequests.get(requestId);
562             if (request != null) {
563                 request.abort(new InterruptedException());
564                 runningRequests.remove(requestId);
565             }
566         } catch (JSONException e) {
567             logger.debug("{}", e.getMessage());
568         }
569     }
570
571     private void handleCommandEvent(JSONObject data) {
572         String itemName = data.getString("item");
573         if (exposedItems.contains(itemName)) {
574             try {
575                 logger.debug("Received command {} for item {}.", data.getString("command"), itemName);
576                 if (this.listener != null) {
577                     this.listener.sendCommand(itemName, data.getString("command"));
578                 }
579             } catch (JSONException e) {
580                 logger.debug("{}", e.getMessage());
581             }
582         } else {
583             logger.warn("Received command from openHAB Cloud for item '{}', which is not exposed.", itemName);
584         }
585     }
586
587     /**
588      * This method sends notification to the openHAB Cloud
589      *
590      * @param userId openHAB Cloud user id
591      * @param message notification message text
592      * @param icon name of the icon for this notification
593      * @param severity severity name for this notification
594      */
595     public void sendNotification(String userId, String message, @Nullable String icon, @Nullable String severity) {
596         if (isConnected()) {
597             JSONObject notificationMessage = new JSONObject();
598             try {
599                 notificationMessage.put("userId", userId);
600                 notificationMessage.put("message", message);
601                 notificationMessage.put("icon", icon);
602                 notificationMessage.put("severity", severity);
603                 socket.emit("notification", notificationMessage);
604             } catch (JSONException e) {
605                 logger.debug("{}", e.getMessage());
606             }
607         } else {
608             logger.debug("No connection, notification is not sent");
609         }
610     }
611
612     /**
613      * This method sends log notification to the openHAB Cloud
614      *
615      * @param message notification message text
616      * @param icon name of the icon for this notification
617      * @param severity severity name for this notification
618      */
619     public void sendLogNotification(String message, @Nullable String icon, @Nullable String severity) {
620         if (isConnected()) {
621             JSONObject notificationMessage = new JSONObject();
622             try {
623                 notificationMessage.put("message", message);
624                 notificationMessage.put("icon", icon);
625                 notificationMessage.put("severity", severity);
626                 socket.emit("lognotification", notificationMessage);
627             } catch (JSONException e) {
628                 logger.debug("{}", e.getMessage());
629             }
630         } else {
631             logger.debug("No connection, notification is not sent");
632         }
633     }
634
635     /**
636      * This method sends broadcast notification to the openHAB Cloud
637      *
638      * @param message notification message text
639      * @param icon name of the icon for this notification
640      * @param severity severity name for this notification
641      */
642     public void sendBroadcastNotification(String message, @Nullable String icon, @Nullable String severity) {
643         if (isConnected()) {
644             JSONObject notificationMessage = new JSONObject();
645             try {
646                 notificationMessage.put("message", message);
647                 notificationMessage.put("icon", icon);
648                 notificationMessage.put("severity", severity);
649                 socket.emit("broadcastnotification", notificationMessage);
650             } catch (JSONException e) {
651                 logger.debug("{}", e.getMessage());
652             }
653         } else {
654             logger.debug("No connection, notification is not sent");
655         }
656     }
657
658     /**
659      * Send item update to openHAB Cloud
660      *
661      * @param itemName the name of the item
662      * @param itemState updated item state
663      *
664      */
665     public void sendItemUpdate(String itemName, String itemState) {
666         if (isConnected()) {
667             logger.debug("Sending update '{}' for item '{}'", itemState, itemName);
668             JSONObject itemUpdateMessage = new JSONObject();
669             try {
670                 itemUpdateMessage.put("itemName", itemName);
671                 itemUpdateMessage.put("itemStatus", itemState);
672                 socket.emit("itemupdate", itemUpdateMessage);
673             } catch (JSONException e) {
674                 logger.debug("{}", e.getMessage());
675             }
676         } else {
677             logger.debug("No connection, Item update is not sent");
678         }
679     }
680
681     /**
682      * Returns true if openHAB Cloud connection is active
683      */
684     public boolean isConnected() {
685         return isConnected;
686     }
687
688     /**
689      * Disconnect from openHAB Cloud
690      */
691     public void shutdown() {
692         logger.info("Shutting down openHAB Cloud service connection");
693         reconnectFuture.get().ifPresent(future -> future.cancel(true));
694         socket.disconnect();
695     }
696
697     public void setListener(CloudClientListener listener) {
698         this.listener = listener;
699     }
700
701     private void scheduleReconnect(long delay) {
702         reconnectFuture.getAndSet(Optional.of(scheduler.schedule(new Runnable() {
703             @Override
704             public void run() {
705                 socket.connect();
706             }
707         }, delay, TimeUnit.MILLISECONDS))).ifPresent(future -> future.cancel(true));
708     }
709
710     private JSONObject getJSONHeaders(HttpFields httpFields) {
711         JSONObject headersJSON = new JSONObject();
712         try {
713             for (HttpField field : httpFields) {
714                 headersJSON.put(field.getName(), field.getValue());
715             }
716         } catch (JSONException e) {
717             logger.warn("Error forming response headers: {}", e.getMessage());
718         }
719         return headersJSON;
720     }
721
722     private static String censored(String secret) {
723         if (secret.length() < 4) {
724             return "*******";
725         }
726         return secret.substring(0, 2) + "..." + secret.substring(secret.length() - 2, secret.length());
727     }
728 }