private static final String HOST_REACHABLE = "lan_host_l3addr_reachable";
private static final String VM_CHANGED = "vm_state_changed";
private static final Register REGISTRATION = new Register(VM_CHANGED, HOST_REACHABLE, HOST_UNREACHABLE);
+ private static final Register REGISTRATION_WITHOUT_VM = new Register(HOST_REACHABLE, HOST_UNREACHABLE);
private static final String WS_PATH = "ws/event";
private final Logger logger = LoggerFactory.getLogger(WebSocketManager.class);
private final WebSocketClient client;
private Optional<ScheduledFuture<?>> reconnectJob = Optional.empty();
private volatile @Nullable Session wsSession;
+ @Nullable
+ private String sessionToken;
+ private int reconnectInterval;
+ private boolean vmSupported = true;
+ private boolean retryConnectWithoutVm = false;
private record Register(String action, List<String> events) {
Register(String... events) {
UNKNOWN
}
- private static record WebSocketResponse(boolean success, Action action, String event, String source,
- @Nullable JsonElement result) {
+ private static record WebSocketResponse(boolean success, @Nullable String msg, Action action, String event,
+ String source, @Nullable JsonElement result) {
public String getEvent() {
return source + "_" + event;
}
}
public void openSession(@Nullable String sessionToken, int reconnectInterval) {
+ this.sessionToken = sessionToken;
+ this.reconnectInterval = reconnectInterval;
if (reconnectInterval > 0) {
- URI uri = getUriBuilder().scheme(getUriBuilder().build().getScheme().contains("s") ? "wss" : "ws").build();
- ClientUpgradeRequest request = new ClientUpgradeRequest();
- request.setHeader(ApiHandler.AUTH_HEADER, sessionToken);
try {
client.start();
- stopReconnect();
- reconnectJob = Optional.of(scheduler.scheduleWithFixedDelay(() -> {
- try {
- closeSession();
- client.connect(this, uri, request);
- // Update listeners in case we would have lost data while disconnecting / reconnecting
- listeners.values()
- .forEach(host -> host.handleCommand(new ChannelUID(host.getThing().getUID(), REACHABLE),
- RefreshType.REFRESH));
- logger.debug("Websocket manager connected to {}", uri);
- } catch (IOException e) {
- logger.warn("Error connecting websocket client: {}", e.getMessage());
- }
- }, 0, reconnectInterval, TimeUnit.MINUTES));
+ startReconnect();
} catch (Exception e) {
logger.warn("Error starting websocket client: {}", e.getMessage());
}
}
}
+ private void startReconnect() {
+ URI uri = getUriBuilder().scheme(getUriBuilder().build().getScheme().contains("s") ? "wss" : "ws").build();
+ ClientUpgradeRequest request = new ClientUpgradeRequest();
+ request.setHeader(ApiHandler.AUTH_HEADER, sessionToken);
+ stopReconnect();
+ reconnectJob = Optional.of(scheduler.scheduleWithFixedDelay(() -> {
+ try {
+ closeSession();
+ client.connect(this, uri, request);
+ // Update listeners in case we would have lost data while disconnecting / reconnecting
+ listeners.values().forEach(host -> host
+ .handleCommand(new ChannelUID(host.getThing().getUID(), REACHABLE), RefreshType.REFRESH));
+ logger.debug("Websocket manager connected to {}", uri);
+ } catch (IOException e) {
+ logger.warn("Error connecting websocket client: {}", e.getMessage());
+ }
+ }, 0, reconnectInterval, TimeUnit.MINUTES));
+ }
+
private void stopReconnect() {
reconnectJob.ifPresent(job -> job.cancel(true));
reconnectJob = Optional.empty();
this.wsSession = wsSession;
logger.debug("Websocket connection establisehd");
try {
- wsSession.getRemote().sendString(apiHandler.serialize(REGISTRATION));
+ wsSession.getRemote()
+ .sendString(apiHandler.serialize(vmSupported ? REGISTRATION : REGISTRATION_WITHOUT_VM));
} catch (IOException e) {
logger.warn("Error registering to websocket: {}", e.getMessage());
}
@Override
public void onWebSocketText(@NonNullByDefault({}) String message) {
+ logger.debug("Websocket: received message: {}", message);
Session localSession = wsSession;
if (message.toLowerCase(Locale.US).contains("bye") && localSession != null) {
localSession.close(StatusCode.NORMAL, "Thanks");
default:
logger.warn("Unhandled notification received: {}", result.action);
}
+ } else if (result.action == Action.REGISTER) {
+ logger.debug("Event registration failed!");
+ if (vmSupported && "unsupported event vm_state_changed".equals(result.msg)) {
+ vmSupported = false;
+ retryConnectWithoutVm = true;
+ }
}
}
LanHost host = apiHandler.deserialize(LanHost.class, json.toString());
ApiConsumerHandler handler2 = listeners.get(host.getMac());
if (handler2 instanceof HostHandler hostHandler) {
+ logger.debug("Received notification for mac {} : thing {} is {}reachable",
+ host.getMac().toColonDelimitedString(), hostHandler.getThing().getUID(),
+ host.reachable() ? "" : "not ");
hostHandler.updateConnectivityChannels(host);
}
break;
public void onWebSocketClose(int statusCode, @NonNullByDefault({}) String reason) {
logger.debug("Socket Closed: [{}] - reason {}", statusCode, reason);
this.wsSession = null;
+ if (retryConnectWithoutVm) {
+ logger.debug("Retry connecting websocket client without VM support");
+ retryConnectWithoutVm = false;
+ startReconnect();
+ }
}
@Override