import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jdt.annotation.Nullable;
import org.eclipse.jetty.client.HttpClient;
import io.socket.client.Socket;
import io.socket.emitter.Emitter;
import io.socket.engineio.client.Transport;
+import io.socket.engineio.client.transports.WebSocket;
import io.socket.parser.Packet;
import io.socket.parser.Parser;
import okhttp3.OkHttpClient.Builder;
* @author Kai Kreuzer - migrated code to new Jetty client and ESH APIs
*/
public class CloudClient {
+
+ private static final long RECONNECT_MIN = 2_000;
+
+ private static final long RECONNECT_MAX = 60_000;
+
+ private static final double RECONNECT_JITTER = 0.75;
+
+ private static final long READ_TIMEOUT = 60_0000;
+
/*
* Logger for this class
*/
*/
private boolean isConnected;
- /*
- * This variable holds version of local openHAB
- */
- private String openHABVersion;
-
/*
* This variable holds instance of Socket.IO client class which provides communication
* with the openHAB Cloud
/*
* Delay reconnect scheduler pool
- *
+ *
*/
protected final ScheduledExecutorService scheduler = ThreadPoolManager
.getScheduledPool(ThreadPoolManager.THREAD_POOL_NAME_COMMON);
+ @SuppressWarnings("null")
+ private final AtomicReference<Optional<ScheduledFuture<?>>> reconnectFuture = new AtomicReference<>(
+ Optional.empty());
+
/**
* Constructor of CloudClient
*
this.remoteAccessEnabled = remoteAccessEnabled;
this.exposedItems = exposedItems;
this.jettyClient = httpClient;
- reconnectBackoff.setMin(1000);
- reconnectBackoff.setMax(30_000);
- reconnectBackoff.setJitter(0.5);
+ reconnectBackoff.setMin(RECONNECT_MIN);
+ reconnectBackoff.setMax(RECONNECT_MAX);
+ reconnectBackoff.setJitter(RECONNECT_JITTER);
}
/**
public void connect() {
try {
Options options = new Options();
+ options.transports = new String[] { WebSocket.NAME };
+ options.reconnection = true;
+ options.reconnectionAttempts = Integer.MAX_VALUE;
+ options.reconnectionDelay = RECONNECT_MIN;
+ options.reconnectionDelayMax = RECONNECT_MAX;
+ options.randomizationFactor = RECONNECT_JITTER;
+ options.timeout = READ_TIMEOUT;
+ Builder okHttpBuilder = new Builder();
+ okHttpBuilder.readTimeout(READ_TIMEOUT, TimeUnit.MILLISECONDS);
if (logger.isTraceEnabled()) {
// When trace level logging is enabled, we activate further logging of HTTP calls
// of the Socket.IO library
- Builder okHttpBuilder = new Builder();
HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor();
loggingInterceptor.setLevel(Level.BASIC);
okHttpBuilder.addInterceptor(loggingInterceptor);
okHttpBuilder.addNetworkInterceptor(loggingInterceptor);
- options.callFactory = okHttpBuilder.build();
- options.webSocketFactory = okHttpBuilder.build();
}
+ options.callFactory = okHttpBuilder.build();
+ options.webSocketFactory = okHttpBuilder.build();
socket = IO.socket(baseURL, options);
URL parsed = new URL(baseURL);
protocol = parsed.getProtocol();
.on(Socket.EVENT_RECONNECT_FAILED,
args -> logger.debug("Socket.IO re-connect attempts failed. Stopping reconnection."))//
.on(Socket.EVENT_DISCONNECT, args -> {
- if (args.length > 0) {
- logger.warn("Socket.IO disconnected: {}", args[0]);
- } else {
- logger.warn("Socket.IO disconnected");
- }
+ String message = args.length > 0 ? args[0].toString() : "";
+ logger.warn("Socket.IO disconnected: {}", message);
isConnected = false;
onDisconnect();
+ // https://github.com/socketio/socket.io-client/commit/afb952d854e1d8728ce07b7c3a9f0dee2a61ef4e
+ if ("io server disconnect".equals(message)) {
+ socket.close();
+ long delay = reconnectBackoff.duration();
+ logger.warn("Reconnecting after {} ms.", delay);
+ scheduleReconnect(delay);
+ }
})//
.on(Socket.EVENT_ERROR, args -> {
if (CloudClient.this.socket.connected()) {
logger.warn("Error connecting to the openHAB Cloud instance. Reconnecting.");
}
socket.close();
- scheduler.schedule(new Runnable() {
- @Override
- public void run() {
- socket.connect();
- }
- }, delay, TimeUnit.MILLISECONDS);
+ scheduleReconnect(delay);
}
})//
*/
public void shutdown() {
logger.info("Shutting down openHAB Cloud service connection");
+ reconnectFuture.get().ifPresent(future -> future.cancel(true));
socket.disconnect();
}
- public String getOpenHABVersion() {
- return openHABVersion;
- }
-
- public void setOpenHABVersion(String openHABVersion) {
- this.openHABVersion = openHABVersion;
- }
-
public void setListener(CloudClientListener listener) {
this.listener = listener;
}
+ private void scheduleReconnect(long delay) {
+ reconnectFuture.getAndSet(Optional.of(scheduler.schedule(new Runnable() {
+ @Override
+ public void run() {
+ socket.connect();
+ }
+ }, delay, TimeUnit.MILLISECONDS))).ifPresent(future -> future.cancel(true));
+ }
+
private JSONObject getJSONHeaders(HttpFields httpFields) {
JSONObject headersJSON = new JSONObject();
try {