2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.webthing.internal.client;
15 import java.io.IOException;
16 import java.nio.ByteBuffer;
17 import java.time.Duration;
18 import java.time.Instant;
19 import java.util.HashMap;
21 import java.util.Optional;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.ScheduledFuture;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.atomic.AtomicReference;
26 import java.util.function.BiConsumer;
27 import java.util.function.Consumer;
29 import org.eclipse.jdt.annotation.NonNullByDefault;
30 import org.eclipse.jdt.annotation.Nullable;
31 import org.eclipse.jetty.websocket.api.Session;
32 import org.eclipse.jetty.websocket.api.WebSocketListener;
33 import org.eclipse.jetty.websocket.api.WebSocketPingPongListener;
34 import org.openhab.binding.webthing.internal.client.dto.PropertyStatusMessage;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
38 import com.google.gson.Gson;
39 import com.google.gson.JsonSyntaxException;
42 * The WebsocketConnection implementation
44 * @author Gregor Roth - Initial contribution
47 public class WebSocketConnectionImpl implements WebSocketConnection, WebSocketListener, WebSocketPingPongListener {
48 private static final BiConsumer<String, Object> EMPTY_PROPERTY_CHANGED_LISTENER = (String propertyName,
51 private final Logger logger = LoggerFactory.getLogger(WebSocketConnectionImpl.class);
52 private final Gson gson = new Gson();
53 private final Duration pingPeriod;
54 private final Consumer<String> errorHandler;
55 private final ScheduledFuture<?> watchDogHandle;
56 private final ScheduledFuture<?> pingHandle;
57 private final Map<String, BiConsumer<String, Object>> propertyChangedListeners = new HashMap<>();
58 private final AtomicReference<Instant> lastTimeReceived = new AtomicReference<>(Instant.now());
59 private final AtomicReference<Optional<Session>> sessionRef = new AtomicReference<>(Optional.empty());
64 * @param executor the executor to use
65 * @param errorHandler the errorHandler
66 * @param pingPeriod the period pings should be sent
68 WebSocketConnectionImpl(ScheduledExecutorService executor, Consumer<String> errorHandler, Duration pingPeriod) {
69 this.errorHandler = errorHandler;
70 this.pingPeriod = pingPeriod;
72 // send a ping message are x seconds to validate if the connection is not broken
73 this.pingHandle = executor.scheduleWithFixedDelay(this::sendPing, pingPeriod.dividedBy(2).toMillis(),
74 pingPeriod.toMillis(), TimeUnit.MILLISECONDS);
76 // checks if a message (regular message or pong message) has been received recently. If not, connection is
78 this.watchDogHandle = executor.scheduleWithFixedDelay(this::checkConnection, pingPeriod.toMillis(),
79 pingPeriod.toMillis(), TimeUnit.MILLISECONDS);
84 sessionRef.getAndSet(Optional.empty()).ifPresent(Session::close);
85 watchDogHandle.cancel(true);
86 pingHandle.cancel(true);
90 public void observeProperty(String propertyName, BiConsumer<String, Object> listener) {
91 propertyChangedListeners.put(propertyName, listener);
95 public void onWebSocketConnect(@Nullable Session session) {
96 sessionRef.set(Optional.ofNullable(session)); // save websocket session to be able to send ping
100 public void onWebSocketPing(@Nullable ByteBuffer payload) {
104 public void onWebSocketPong(@Nullable ByteBuffer payload) {
105 lastTimeReceived.set(Instant.now());
109 public void onWebSocketBinary(byte @Nullable [] payload, int offset, int len) {
113 public void onWebSocketText(@Nullable String message) {
115 if (message != null) {
116 var propertyStatus = gson.fromJson(message, PropertyStatusMessage.class);
117 if ((propertyStatus != null) && (propertyStatus.messageType != null)
118 && (propertyStatus.messageType.equals("propertyStatus"))) {
119 for (var propertyEntry : propertyStatus.data.entrySet()) {
120 var listener = propertyChangedListeners.getOrDefault(propertyEntry.getKey(),
121 EMPTY_PROPERTY_CHANGED_LISTENER);
123 listener.accept(propertyEntry.getKey(), propertyEntry.getValue());
124 } catch (RuntimeException re) {
125 logger.warn("calling property change listener {} failed. {}", listener, re.getMessage());
129 logger.debug("Ignoring received message of unknown type: {}", message);
132 } catch (JsonSyntaxException se) {
133 logger.warn("received invalid message: {}", message);
138 public void onWebSocketClose(int statusCode, @Nullable String reason) {
139 onWebSocketError(new IOException("websocket closed by peer. " + Optional.ofNullable(reason).orElse("")));
143 public void onWebSocketError(@Nullable Throwable cause) {
146 reason = cause.getMessage();
151 private void onError(@Nullable String message) {
152 if (message == null) {
155 errorHandler.accept(message);
158 private void sendPing() {
159 var optionalSession = sessionRef.get();
160 if (optionalSession.isPresent()) {
162 optionalSession.get().getRemote().sendPing(ByteBuffer.wrap(Instant.now().toString().getBytes()));
163 } catch (IOException e) {
164 onError("could not send ping " + e.getMessage());
170 public boolean isAlive() {
171 var elapsedSinceLastReceived = Duration.between(lastTimeReceived.get(), Instant.now());
172 var thresholdOverdued = pingPeriod.multipliedBy(3);
173 var isOverdued = elapsedSinceLastReceived.toMillis() > thresholdOverdued.toMillis();
174 return sessionRef.get().isPresent() && !isOverdued;
177 private void checkConnection() {
178 // check if connection is alive (message has been received recently)
180 onError("connection seems to be broken (last message received at " + lastTimeReceived.get() + ", "
181 + Duration.between(lastTimeReceived.get(), Instant.now()).getSeconds() + " sec ago)");