2 * Copyright (c) 2010-2024 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.io.openhabcloud.internal;
15 import java.io.IOException;
16 import java.net.MalformedURLException;
18 import java.net.URISyntaxException;
20 import java.net.URLEncoder;
21 import java.nio.charset.StandardCharsets;
22 import java.util.Iterator;
23 import java.util.List;
25 import java.util.Optional;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.ScheduledExecutorService;
29 import java.util.concurrent.ScheduledFuture;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicReference;
33 import org.eclipse.jdt.annotation.Nullable;
34 import org.eclipse.jetty.client.HttpClient;
35 import org.eclipse.jetty.client.api.Request;
36 import org.eclipse.jetty.client.util.BytesContentProvider;
37 import org.eclipse.jetty.http.HttpField;
38 import org.eclipse.jetty.http.HttpFields;
39 import org.eclipse.jetty.http.HttpMethod;
40 import org.eclipse.jetty.http.HttpStatus;
41 import org.eclipse.jetty.util.BufferUtil;
42 import org.eclipse.jetty.util.URIUtil;
43 import org.json.JSONArray;
44 import org.json.JSONException;
45 import org.json.JSONObject;
46 import org.openhab.core.OpenHAB;
47 import org.openhab.core.common.ThreadPoolManager;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
51 import io.socket.backo.Backoff;
52 import io.socket.client.IO;
53 import io.socket.client.IO.Options;
54 import io.socket.client.Manager;
55 import io.socket.client.Socket;
56 import io.socket.emitter.Emitter;
57 import io.socket.engineio.client.Transport;
58 import io.socket.engineio.client.transports.WebSocket;
59 import io.socket.parser.Packet;
60 import io.socket.parser.Parser;
61 import okhttp3.OkHttpClient.Builder;
62 import okhttp3.logging.HttpLoggingInterceptor;
63 import okhttp3.logging.HttpLoggingInterceptor.Level;
66 * This class provides communication between openHAB and the openHAB Cloud service.
67 * It also implements async http proxy for serving requests from user to
68 * openHAB through the openHAB Cloud. It uses Socket.IO connection to connect to
69 * the openHAB Cloud service and Jetty Http client to send local http requests to
72 * @author Victor Belov - Initial contribution
73 * @author Kai Kreuzer - migrated code to new Jetty client and ESH APIs
74 * @author Dan Cunningham - Extended notification enhancements
76 public class CloudClient {
78 private static final long RECONNECT_MIN = 2_000;
80 private static final long RECONNECT_MAX = 60_000;
82 private static final double RECONNECT_JITTER = 0.75;
84 private static final long READ_TIMEOUT = 60_0000;
87 * Logger for this class
89 private final Logger logger = LoggerFactory.getLogger(CloudClient.class);
92 * This variable holds base URL for the openHAB Cloud connections
94 private final String baseURL;
97 * This variable holds openHAB's UUID for authenticating and connecting to the openHAB Cloud
99 private final String uuid;
102 * This variable holds openHAB's secret for authenticating and connecting to the openHAB Cloud
104 private final String secret;
107 * This variable holds local openHAB's base URL for connecting to the local openHAB instance
109 private final String localBaseUrl;
112 * This variable holds instance of Jetty HTTP client to make requests to local openHAB
114 private final HttpClient jettyClient;
117 * This map holds HTTP requests to local openHAB which are currently running
119 private final Map<Integer, Request> runningRequests = new ConcurrentHashMap<>();
122 * This variable indicates if connection to the openHAB Cloud is currently in an established state
124 private boolean isConnected;
127 * This variable holds instance of Socket.IO client class which provides communication
128 * with the openHAB Cloud
130 private Socket socket;
133 * The protocol of the openHAB-cloud URL.
135 private String protocol = "https";
138 * This variable holds instance of CloudClientListener which provides callbacks to communicate
139 * certain events from the openHAB Cloud back to openHAB
141 private CloudClientListener listener;
142 private boolean remoteAccessEnabled;
143 private Set<String> exposedItems;
146 * Back-off strategy for reconnecting when manual reconnection is needed
148 private final Backoff reconnectBackoff = new Backoff();
151 * Delay reconnect scheduler pool
154 protected final ScheduledExecutorService scheduler = ThreadPoolManager
155 .getScheduledPool(ThreadPoolManager.THREAD_POOL_NAME_COMMON);
157 @SuppressWarnings("null")
158 private final AtomicReference<Optional<ScheduledFuture<?>>> reconnectFuture = new AtomicReference<>(
162 * Constructor of CloudClient
164 * @param uuid openHAB's UUID to connect to the openHAB Cloud
165 * @param secret openHAB's Secret to connect to the openHAB Cloud
166 * @param remoteAccessEnabled Allow the openHAB Cloud to be used as a remote proxy
167 * @param exposedItems Items that are made available to apps connected to the openHAB Cloud
169 public CloudClient(HttpClient httpClient, String uuid, String secret, String baseURL, String localBaseUrl,
170 boolean remoteAccessEnabled, Set<String> exposedItems) {
172 this.secret = secret;
173 this.baseURL = baseURL;
174 this.localBaseUrl = localBaseUrl;
175 this.remoteAccessEnabled = remoteAccessEnabled;
176 this.exposedItems = exposedItems;
177 this.jettyClient = httpClient;
178 reconnectBackoff.setMin(RECONNECT_MIN);
179 reconnectBackoff.setMax(RECONNECT_MAX);
180 reconnectBackoff.setJitter(RECONNECT_JITTER);
184 * Connect to the openHAB Cloud
187 public void connect() {
189 Options options = new Options();
190 options.transports = new String[] { WebSocket.NAME };
191 options.reconnection = true;
192 options.reconnectionAttempts = Integer.MAX_VALUE;
193 options.reconnectionDelay = RECONNECT_MIN;
194 options.reconnectionDelayMax = RECONNECT_MAX;
195 options.randomizationFactor = RECONNECT_JITTER;
196 options.timeout = READ_TIMEOUT;
197 Builder okHttpBuilder = new Builder();
198 okHttpBuilder.readTimeout(READ_TIMEOUT, TimeUnit.MILLISECONDS);
199 if (logger.isTraceEnabled()) {
200 // When trace level logging is enabled, we activate further logging of HTTP calls
201 // of the Socket.IO library
202 HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor();
203 loggingInterceptor.setLevel(Level.BASIC);
204 okHttpBuilder.addInterceptor(loggingInterceptor);
205 okHttpBuilder.addNetworkInterceptor(loggingInterceptor);
207 options.callFactory = okHttpBuilder.build();
208 options.webSocketFactory = okHttpBuilder.build();
209 socket = IO.socket(baseURL, options);
210 URL parsed = new URL(baseURL);
211 protocol = parsed.getProtocol();
212 } catch (URISyntaxException e) {
213 logger.error("Error creating Socket.IO: {}", e.getMessage());
215 } catch (MalformedURLException e) {
216 logger.error("Error parsing baseURL to get protocol, assuming https. Error: {}", e.getMessage());
220 // socket manager events
223 .on(Manager.EVENT_TRANSPORT, args -> {
224 logger.trace("Manager.EVENT_TRANSPORT");
225 Transport transport = (Transport) args[0];
226 transport.on(Transport.EVENT_REQUEST_HEADERS, new Emitter.Listener() {
228 public void call(Object... args) {
229 logger.trace("Transport.EVENT_REQUEST_HEADERS");
230 @SuppressWarnings("unchecked")
231 Map<String, List<String>> headers = (Map<String, List<String>>) args[0];
232 headers.put("uuid", List.of(uuid));
233 headers.put("secret", List.of(secret));
234 headers.put("openhabversion", List.of(OpenHAB.getVersion()));
235 headers.put("clientversion", List.of(CloudService.clientVersion));
236 headers.put("remoteaccess", List.of(((Boolean) remoteAccessEnabled).toString()));
240 .on(Manager.EVENT_CONNECT_ERROR, args -> {
241 if (args.length > 0) {
242 if (args[0] instanceof Exception e) {
244 "Error connecting to the openHAB Cloud instance: {} {}. Should reconnect automatically.",
245 e.getClass().getSimpleName(), e.getMessage());
248 "Error connecting to the openHAB Cloud instance: {}. Should reconnect automatically.",
252 logger.debug("Error connecting to the openHAB Cloud instance. Should reconnect automatically.");
255 .on(Manager.EVENT_OPEN, args -> logger.debug("Socket.IO OPEN"))//
256 .on(Manager.EVENT_CLOSE, args -> logger.debug("Socket.IO CLOSE: {}", args[0]))//
257 .on(Manager.EVENT_PACKET, args -> {
258 int packetTypeIndex = -1;
259 String type = "<unexpected packet type>";
260 if (args.length == 1 && args[0] instanceof Packet<?> packet) {
261 packetTypeIndex = packet.type;
263 if (packetTypeIndex < Parser.types.length) {
264 type = Parser.types[packetTypeIndex];
266 type = "<unknown type>";
269 logger.trace("Socket.IO Packet: {} ({})", type, packetTypeIndex);
276 socket.on(Socket.EVENT_CONNECT, args -> {
277 logger.debug("Socket.IO connected");
281 .on(Socket.EVENT_CONNECTING, args -> logger.debug("Socket.IO connecting"))//
282 .on(Socket.EVENT_RECONNECTING, args -> logger.debug("Socket.IO re-connecting (attempt {})", args[0]))//
283 .on(Socket.EVENT_RECONNECT,
284 args -> logger.debug("Socket.IO re-connected successfully (attempt {})", args[0]))//
285 .on(Socket.EVENT_RECONNECT_ERROR, args -> {
286 if (args[0] instanceof Exception e) {
287 logger.debug("Socket.IO re-connect attempt error: {} {}", e.getClass().getSimpleName(),
290 logger.debug("Socket.IO re-connect attempt error: {}", args[0]);
293 .on(Socket.EVENT_RECONNECT_FAILED,
294 args -> logger.debug("Socket.IO re-connect attempts failed. Stopping reconnection."))//
295 .on(Socket.EVENT_DISCONNECT, args -> {
296 String message = args.length > 0 ? args[0].toString() : "";
297 logger.warn("Socket.IO disconnected: {}", message);
300 // https://github.com/socketio/socket.io-client/commit/afb952d854e1d8728ce07b7c3a9f0dee2a61ef4e
301 if ("io server disconnect".equals(message)) {
303 long delay = reconnectBackoff.duration();
304 logger.warn("Reconnecting after {} ms.", delay);
305 scheduleReconnect(delay);
308 .on(Socket.EVENT_ERROR, args -> {
309 if (CloudClient.this.socket.connected()) {
310 if (args.length > 0) {
311 if (args[0] instanceof Exception e) {
312 logger.warn("Error during communication: {} {}", e.getClass().getSimpleName(),
315 logger.warn("Error during communication: {}", args[0]);
318 logger.warn("Error during communication");
321 // We are not connected currently, manual reconnection is needed to keep trying to
325 // Socket.IO 1.x java client: 'error' event is emitted from Socket on connection errors that
327 // retried, but also with error that are automatically retried. If we
329 // Note how this is different in Socket.IO 2.x java client, Socket emits 'connect_error'
331 // OBS: Don't get confused with Socket IO 2.x docs online, in 1.x connect_error is emitted
333 // errors that are retried by the library automatically!
334 long delay = reconnectBackoff.duration();
335 // Try reconnecting on connection errors
336 if (args.length > 0) {
337 if (args[0] instanceof Exception e) {
339 "Error connecting to the openHAB Cloud instance: {} {}. Reconnecting after {} ms.",
340 e.getClass().getSimpleName(), e.getMessage(), delay);
343 "Error connecting to the openHAB Cloud instance: {}. Reconnecting after {} ms.",
347 logger.warn("Error connecting to the openHAB Cloud instance. Reconnecting.");
350 scheduleReconnect(delay);
354 .on(Socket.EVENT_PING, args -> logger.debug("Socket.IO ping"))//
355 .on(Socket.EVENT_PONG, args -> logger.debug("Socket.IO pong: {} ms", args[0]))//
356 .on("request", args -> onEvent("request", (JSONObject) args[0]))//
357 .on("cancel", args -> onEvent("cancel", (JSONObject) args[0]))//
358 .on("command", args -> onEvent("command", (JSONObject) args[0]))//
364 * Callback method for socket.io client which is called when connection is established
367 public void onConnect() {
368 logger.info("Connected to the openHAB Cloud service (UUID = {}, base URL = {})", censored(this.uuid),
370 reconnectBackoff.reset();
375 * Callback method for socket.io client which is called when disconnect occurs
378 public void onDisconnect() {
379 logger.info("Disconnected from the openHAB Cloud service (UUID = {}, base URL = {})", censored(this.uuid),
382 // And clean up the list of running requests
383 runningRequests.clear();
387 * Callback method for socket.io client which is called when a message is received
390 public void onEvent(String event, JSONObject data) {
391 logger.debug("on(): {}", event);
392 if ("command".equals(event)) {
393 handleCommandEvent(data);
396 if (remoteAccessEnabled) {
397 if ("request".equals(event)) {
398 handleRequestEvent(data);
399 } else if ("cancel".equals(event)) {
400 handleCancelEvent(data);
402 logger.warn("Unsupported event from openHAB Cloud: {}", event);
407 private void handleRequestEvent(JSONObject data) {
409 // Get unique request Id
410 int requestId = data.getInt("id");
411 logger.debug("Got request {}", requestId);
413 String requestPath = data.getString("path");
414 logger.debug("Path {}", requestPath);
415 // Get request method
416 String requestMethod = data.getString("method");
417 logger.debug("Method {}", requestMethod);
418 // Get JSONObject for request headers
419 JSONObject requestHeadersJson = data.getJSONObject("headers");
420 logger.debug("Headers: {}", requestHeadersJson.toString());
422 String requestBody = data.getString("body");
423 logger.trace("Body {}", requestBody);
424 // Get JSONObject for request query parameters
425 JSONObject requestQueryJson = data.getJSONObject("query");
426 logger.debug("Query {}", requestQueryJson.toString());
427 // Create URI builder with base request URI of openHAB and path from request
428 String newPath = URIUtil.addPaths(localBaseUrl, requestPath);
429 Iterator<String> queryIterator = requestQueryJson.keys();
430 // Add query parameters to URI builder, if any
432 while (queryIterator.hasNext()) {
433 String queryName = queryIterator.next();
434 newPath += queryName;
436 newPath += URLEncoder.encode(requestQueryJson.getString(queryName), "UTF-8");
437 if (queryIterator.hasNext()) {
441 // Finally get the future request URI
442 URI requestUri = new URI(newPath);
443 // All preparations which are common for different methods are done
444 // Now perform the request to openHAB
446 logger.debug("Request method is {}", requestMethod);
447 Request request = jettyClient.newRequest(requestUri);
448 setRequestHeaders(request, requestHeadersJson);
449 String proto = protocol;
450 if (data.has("protocol")) {
451 proto = data.getString("protocol");
453 request.header("X-Forwarded-Proto", proto);
454 HttpMethod method = HttpMethod.fromString(requestMethod);
455 if (method == null) {
456 logger.debug("Unsupported request method {}", requestMethod);
459 request.method(method);
460 if (!requestBody.isEmpty()) {
461 request.content(new BytesContentProvider(requestBody.getBytes()));
464 request.onResponseHeaders(response -> {
465 logger.debug("onHeaders {}", requestId);
466 JSONObject responseJson = new JSONObject();
468 responseJson.put("id", requestId);
469 responseJson.put("headers", getJSONHeaders(response.getHeaders()));
470 responseJson.put("responseStatusCode", response.getStatus());
471 responseJson.put("responseStatusText", "OK");
472 socket.emit("responseHeader", responseJson);
473 logger.trace("Sent headers to request {}", requestId);
474 logger.trace("{}", responseJson.toString());
475 } catch (JSONException e) {
476 logger.debug("{}", e.getMessage());
478 }).onResponseContent((theResponse, content) -> {
479 logger.debug("onResponseContent: {}, content size {}", requestId, String.valueOf(content.remaining()));
480 JSONObject responseJson = new JSONObject();
482 responseJson.put("id", requestId);
483 responseJson.put("body", BufferUtil.toArray(content));
484 if (logger.isTraceEnabled()) {
485 logger.trace("{}", StandardCharsets.UTF_8.decode(content).toString());
487 socket.emit("responseContentBinary", responseJson);
488 logger.trace("Sent content to request {}", requestId);
489 } catch (JSONException e) {
490 logger.debug("{}", e.getMessage());
492 }).onRequestFailure((origRequest, failure) -> {
493 logger.debug("onRequestFailure: {}, {}", requestId, failure.getMessage());
494 JSONObject responseJson = new JSONObject();
496 responseJson.put("id", requestId);
497 responseJson.put("responseStatusText", "openHAB connection error: " + failure.getMessage());
498 socket.emit("responseError", responseJson);
499 } catch (JSONException e) {
500 logger.debug("{}", e.getMessage());
503 logger.debug("onComplete: {}", requestId);
504 // Remove this request from list of running requests
505 runningRequests.remove(requestId);
506 if ((result != null && result.isFailed())
507 && (result.getResponse() != null && result.getResponse().getStatus() != HttpStatus.OK_200)) {
508 if (result.getFailure() != null) {
509 logger.debug("Jetty request {} failed: {}", requestId, result.getFailure().getMessage());
511 if (result.getRequestFailure() != null) {
512 logger.debug("Request Failure: {}", result.getRequestFailure().getMessage());
514 if (result.getResponseFailure() != null) {
515 logger.debug("Response Failure: {}", result.getResponseFailure().getMessage());
518 JSONObject responseJson = new JSONObject();
520 responseJson.put("id", requestId);
521 socket.emit("responseFinished", responseJson);
522 logger.debug("Finished responding to request {}", requestId);
523 } catch (JSONException e) {
524 logger.debug("{}", e.getMessage());
528 // If successfully submitted request to http client, add it to the list of currently
529 // running requests to be able to cancel it if needed
530 runningRequests.put(requestId, request);
531 } catch (JSONException | IOException | URISyntaxException e) {
532 logger.debug("{}", e.getMessage());
536 private void setRequestHeaders(Request request, JSONObject requestHeadersJson) {
537 Iterator<String> headersIterator = requestHeadersJson.keys();
538 // Convert JSONObject of headers into Header ArrayList
539 while (headersIterator.hasNext()) {
540 String headerName = headersIterator.next();
543 headerValue = requestHeadersJson.getString(headerName);
544 logger.debug("Jetty set header {} = {}", headerName, headerValue);
545 if (!"Content-Length".equalsIgnoreCase(headerName)) {
546 request.header(headerName, headerValue);
548 } catch (JSONException e) {
549 logger.warn("Error processing request headers: {}", e.getMessage());
554 private void handleCancelEvent(JSONObject data) {
556 int requestId = data.getInt("id");
557 logger.debug("Received cancel for request {}", requestId);
558 // Find and abort running request
559 Request request = runningRequests.get(requestId);
560 if (request != null) {
561 request.abort(new InterruptedException());
562 runningRequests.remove(requestId);
564 } catch (JSONException e) {
565 logger.debug("{}", e.getMessage());
569 private void handleCommandEvent(JSONObject data) {
570 String itemName = data.getString("item");
571 if (exposedItems.contains(itemName)) {
573 logger.debug("Received command {} for item {}.", data.getString("command"), itemName);
574 if (this.listener != null) {
575 this.listener.sendCommand(itemName, data.getString("command"));
577 } catch (JSONException e) {
578 logger.debug("{}", e.getMessage());
581 logger.warn("Received command from openHAB Cloud for item '{}', which is not exposed.", itemName);
586 * This method sends notification to the openHAB Cloud
588 * @param userId openHAB Cloud user id
589 * @param message notification message text
590 * @param icon name of the icon for this notification
591 * @param severity severity name for this notification
592 * @param title for the notification
593 * @param onClickAction the action to perform when clicked
594 * @param mediaAttachmentUrl the media to attach to a notification
595 * @param actionButton1 an action button in the format "Title=Action"
596 * @param actionButton2 an action button in the format "Title=Action"
597 * @param actionButton3 an action button in the format "Title=Action"
599 public void sendNotification(String userId, String message, @Nullable String icon, @Nullable String severity,
600 @Nullable String title, @Nullable String onClickAction, @Nullable String mediaAttachmentUrl,
601 @Nullable String actionButton1, @Nullable String actionButton2, @Nullable String actionButton3) {
602 sendNotificationInternal(userId, message, icon, severity, title, onClickAction, mediaAttachmentUrl,
603 actionButton1, actionButton2, actionButton3);
607 * This method sends broadcast notification to the openHAB Cloud
609 * @param message notification message text
610 * @param icon name of the icon for this notification
611 * @param severity severity name for this notification
612 * @param title for this notification
613 * @param onClickAction the action to perform when clicked
614 * @param mediaAttachmentUrl the media to attach to a notification
615 * @param actionButton1 an action button in the format "Title=Action"
616 * @param actionButton2 an action button in the format "Title=Action"
617 * @param actionButton3 an action button in the format "Title=Action"
619 public void sendBroadcastNotification(String message, @Nullable String icon, @Nullable String severity,
620 @Nullable String title, @Nullable String onClickAction, @Nullable String mediaAttachmentUrl,
621 @Nullable String actionButton1, @Nullable String actionButton2, @Nullable String actionButton3) {
622 sendNotificationInternal(null, message, icon, severity, title, onClickAction, mediaAttachmentUrl, actionButton1,
623 actionButton2, actionButton3);
626 private void sendNotificationInternal(@Nullable String userId, String message, @Nullable String icon,
627 @Nullable String severity, @Nullable String title, @Nullable String onClickAction,
628 @Nullable String mediaAttachmentUrl, @Nullable String actionButton1, @Nullable String actionButton2,
629 @Nullable String actionButton3) {
631 JSONObject notificationMessage = new JSONObject();
633 if (userId != null) {
634 notificationMessage.put("userId", userId);
636 notificationMessage.put("message", message);
637 notificationMessage.put("icon", icon);
638 notificationMessage.put("severity", severity);
640 notificationMessage.put("title", title);
642 if (onClickAction != null) {
643 notificationMessage.put("on-click", onClickAction);
645 if (mediaAttachmentUrl != null) {
646 notificationMessage.put("media-attachment-url", mediaAttachmentUrl);
648 JSONArray actionArray = createActionArray(actionButton1, actionButton2, actionButton3);
649 if (!actionArray.isEmpty()) {
650 notificationMessage.put("actions", actionArray);
652 socket.emit(userId == null ? "broadcastnotification" : "notification", notificationMessage);
653 } catch (JSONException e) {
654 logger.debug("{}", e.getMessage());
657 logger.debug("No connection, notification is not sent");
662 * This method sends log notification to the openHAB Cloud
664 * @param message notification message text
665 * @param icon name of the icon for this notification
666 * @param severity severity name for this notification
668 public void sendLogNotification(String message, @Nullable String icon, @Nullable String severity) {
670 JSONObject notificationMessage = new JSONObject();
672 notificationMessage.put("message", message);
673 notificationMessage.put("icon", icon);
674 notificationMessage.put("severity", severity);
675 socket.emit("lognotification", notificationMessage);
676 } catch (JSONException e) {
677 logger.debug("{}", e.getMessage());
680 logger.debug("No connection, notification is not sent");
685 * Send item update to openHAB Cloud
687 * @param itemName the name of the item
688 * @param itemState updated item state
691 public void sendItemUpdate(String itemName, String itemState) {
693 logger.debug("Sending update '{}' for item '{}'", itemState, itemName);
694 JSONObject itemUpdateMessage = new JSONObject();
696 itemUpdateMessage.put("itemName", itemName);
697 itemUpdateMessage.put("itemStatus", itemState);
698 socket.emit("itemupdate", itemUpdateMessage);
699 } catch (JSONException e) {
700 logger.debug("{}", e.getMessage());
703 logger.debug("No connection, Item update is not sent");
708 * Returns true if openHAB Cloud connection is active
710 public boolean isConnected() {
715 * Disconnect from openHAB Cloud
717 public void shutdown() {
718 logger.info("Shutting down openHAB Cloud service connection");
719 reconnectFuture.get().ifPresent(future -> future.cancel(true));
723 public void setListener(CloudClientListener listener) {
724 this.listener = listener;
727 private void scheduleReconnect(long delay) {
728 reconnectFuture.getAndSet(Optional.of(scheduler.schedule(new Runnable() {
733 }, delay, TimeUnit.MILLISECONDS))).ifPresent(future -> future.cancel(true));
736 private JSONObject getJSONHeaders(HttpFields httpFields) {
737 JSONObject headersJSON = new JSONObject();
739 for (HttpField field : httpFields) {
740 headersJSON.put(field.getName(), field.getValue());
742 } catch (JSONException e) {
743 logger.warn("Error forming response headers: {}", e.getMessage());
748 private JSONArray createActionArray(@Nullable String... actionStrings) {
749 JSONArray actionArray = new JSONArray();
750 for (String actionString : actionStrings) {
751 if (actionString == null) {
754 String[] parts = actionString.split("=", 2);
755 if (parts.length == 2) {
756 JSONObject action = new JSONObject();
757 action.put("title", parts[0]);
758 action.put("action", parts[1]);
759 actionArray.put(action);
765 private static String censored(String secret) {
766 if (secret.length() < 4) {
769 return secret.substring(0, 2) + "..." + secret.substring(secret.length() - 2, secret.length());