import static org.openhab.binding.neohub.internal.NeoHubBindingConstants.*;
import java.io.IOException;
+import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
private static final String SEE_README = "See documentation chapter \"Connection Refused Errors\"";
private static final int MAX_FAILED_SEND_ATTEMPTS = 2;
+ private static final Duration MIN_RESTART_DELAY = Duration.ofSeconds(10);
+ private static final Duration MAX_RESTART_DELAY = Duration.ofHours(1);
private final Logger logger = LoggerFactory.getLogger(NeoHubHandler.class);
private ApiVersion apiVersion = ApiVersion.LEGACY;
private boolean isApiOnline = false;
private int failedSendAttempts = 0;
+ private Duration restartDelay = Duration.from(MIN_RESTART_DELAY);
+ private @Nullable ScheduledFuture<?> restartTask;
public NeoHubHandler(Bridge bridge, WebSocketFactory webSocketFactory) {
super(bridge);
logger.debug("hub '{}' preferLegacyApi={}", getThing().getUID(), config.preferLegacyApi);
}
- // create a web or TCP socket based on the port number in the configuration
- NeoHubSocketBase socket;
- try {
- if (config.useWebSocket) {
- socket = new NeoHubWebSocket(config, webSocketFactory, thing.getUID());
- } else {
- socket = new NeoHubSocket(config, thing.getUID().getAsString());
- }
- } catch (IOException e) {
- logger.debug("\"hub '{}' error creating web/tcp socket: '{}'", getThing().getUID(), e.getMessage());
+ this.config = config;
+ NeoHubSocketBase socket = createSocket();
+ if (socket == null) {
return;
}
-
this.socket = socket;
- this.config = config;
/*
* Try to 'ping' the hub, and if there is a 'connection refused', it is probably due to the mobile App |
startFastPollingBurst();
}
+ /**
+ * Create a web or TCP socket based on the configuration setting
+ */
+ private @Nullable NeoHubSocketBase createSocket() {
+ NeoHubConfiguration config = this.config;
+ if (config == null) {
+ logger.debug("\"hub '{}' configuration is null", getThing().getUID());
+ updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR);
+ } else {
+ try {
+ if (config.useWebSocket) {
+ return new NeoHubWebSocket(config, webSocketFactory, thing.getUID());
+ } else {
+ return new NeoHubSocket(config, thing.getUID().getAsString());
+ }
+ } catch (IOException e) {
+ logger.debug("\"hub '{}' error creating web/tcp socket: '{}'", getThing().getUID(), e.getMessage());
+ updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR);
+ }
+ }
+ return null;
+ }
+
@Override
public void dispose() {
if (logger.isDebugEnabled()) {
- logger.debug("hub '{}' stop background polling..", getThing().getUID());
+ logger.debug("hub '{}' shutting down..", getThing().getUID());
+ }
+
+ closeSocket();
+ ScheduledFuture<?> restartTask = this.restartTask;
+ if (restartTask != null) {
+ restartTask.cancel(true);
}
// clean up the lazy polling scheduler
fast.cancel(true);
this.fastPollingScheduler = null;
}
+ }
+ private void closeSocket() {
NeoHubSocketBase socket = this.socket;
+ this.socket = null;
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
}
- this.socket = null;
}
}
protected @Nullable NeoHubAbstractDeviceData fromNeoHubGetDeviceData() {
NeoHubSocketBase socket = this.socket;
- if (socket == null || config == null) {
- logger.warn(MSG_HUB_CONFIG, getThing().getUID());
+ if (socket == null) {
return null;
}
if (getThing().getStatus() != ThingStatus.ONLINE) {
updateStatus(ThingStatus.ONLINE, ThingStatusDetail.NONE);
+ restartDelay = Duration.from(MIN_RESTART_DELAY);
}
// check if we also need to discard and update systemData
} catch (IOException | NeoHubException e) {
logger.warn(MSG_FMT_DEVICE_POLL_ERR, getThing().getUID(), e.getMessage());
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR);
- return null;
+ scheduleRestart();
}
+ return null;
+ }
+
+ private synchronized void scheduleRestart() {
+ closeSocket();
+ restartTask = scheduler.schedule(() -> {
+ NeoHubSocketBase socket = createSocket();
+ this.socket = socket;
+ if (!Thread.interrupted() && socket == null) { // keep trying..
+ restartDelay = restartDelay.plus(restartDelay);
+ if (restartDelay.compareTo(MAX_RESTART_DELAY) > 0) {
+ restartDelay = Duration.from(MAX_RESTART_DELAY);
+ }
+ scheduleRestart();
+ }
+ }, restartDelay.toSeconds(), TimeUnit.SECONDS);
}
/**
import java.io.Closeable;
import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
import org.eclipse.jdt.annotation.NonNullByDefault;
protected final NeoHubConfiguration config;
protected final String hubId;
+ private static final int REQUEST_INTERVAL_MILLISECS = 1000;
+ private Optional<Instant> lastRequestTime = Optional.empty();
+
public NeoHubSocketBase(NeoHubConfiguration config, String hubId) {
this.config = config;
this.hubId = hubId;
* @throws NeoHubException if the communication returned a response but the response was not valid JSON
*/
public abstract String sendMessage(final String requestJson) throws IOException, NeoHubException;
+
+ /**
+ * Method for throttling requests to prevent overloading the hub.
+ * <p>
+ * The NeoHub can get confused if, while it is uploading data to the cloud, it also receives too many local
+ * requests, so this method throttles the requests to one per REQUEST_INTERVAL_MILLISECS maximum.
+ *
+ * @throws NeoHubException if the wait is interrupted
+ */
+ protected synchronized void throttle() throws NeoHubException {
+ try {
+ Instant now = Instant.now();
+ long delay = lastRequestTime
+ .map(t -> Math.max(0, Duration.between(now, t).toMillis() + REQUEST_INTERVAL_MILLISECS)).orElse(0L);
+ lastRequestTime = Optional.of(now.plusMillis(delay));
+ Thread.sleep(delay);
+ } catch (InterruptedException e) {
+ throw new NeoHubException("Throttle sleep interrupted", e);
+ }
+ }
}
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.time.Instant;
import java.util.concurrent.ExecutionException;
import org.eclipse.jdt.annotation.NonNullByDefault;
*/
private void closeSession() {
Session session = this.session;
+ this.session = null;
if (session != null) {
session.close();
- this.session = null;
}
}
responsePending = true;
IOException caughtException = null;
+ throttle();
try {
// send the request
logger.debug("hub '{}' sending characters:{}", hubId, requestOuter.length());
session.getRemote().sendString(requestOuter);
logger.trace("hub '{}' sent:{}", hubId, requestOuter);
- // sleep and loop until we get a response or the socket is closed
- int sleepRemainingMilliseconds = config.socketTimeout * 1000;
+ // sleep and loop until we get a response, the socket is closed, or it times out
+ Instant timeout = Instant.now().plusSeconds(config.socketTimeout);
while (responsePending) {
try {
Thread.sleep(SLEEP_MILLISECONDS);
- sleepRemainingMilliseconds = sleepRemainingMilliseconds - SLEEP_MILLISECONDS;
- if (sleepRemainingMilliseconds <= 0) {
+ if (Instant.now().isAfter(timeout)) {
throw new IOException("Read timed out");
}
} catch (InterruptedException e) {
caughtException = e;
}
+ caughtException = caughtException != null ? caughtException
+ : this.session == null ? new IOException("WebSocket session closed") : null;
+
logger.debug("hub '{}' received characters:{}", hubId, responseOuter.length());
logger.trace("hub '{}' received:{}", hubId, responseOuter);