2 * Copyright (c) 2010-2021 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.webthing.internal.client;
15 import java.io.IOException;
16 import java.nio.ByteBuffer;
17 import java.time.Duration;
18 import java.time.Instant;
19 import java.util.HashMap;
21 import java.util.Optional;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.ScheduledFuture;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.atomic.AtomicReference;
26 import java.util.function.BiConsumer;
27 import java.util.function.Consumer;
29 import org.eclipse.jdt.annotation.NonNull;
30 import org.eclipse.jdt.annotation.NonNullByDefault;
31 import org.eclipse.jdt.annotation.Nullable;
32 import org.eclipse.jetty.websocket.api.Session;
33 import org.eclipse.jetty.websocket.api.WebSocketListener;
34 import org.eclipse.jetty.websocket.api.WebSocketPingPongListener;
35 import org.openhab.binding.webthing.internal.client.dto.PropertyStatusMessage;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
39 import com.google.gson.Gson;
40 import com.google.gson.JsonSyntaxException;
43 * The WebsocketConnection implementation
45 * @author Gregor Roth - Initial contribution
48 public class WebSocketConnectionImpl implements WebSocketConnection, WebSocketListener, WebSocketPingPongListener {
49 private static final BiConsumer<String, Object> EMPTY_PROPERTY_CHANGED_LISTENER = (String propertyName,
52 private final Logger logger = LoggerFactory.getLogger(WebSocketConnectionImpl.class);
53 private final Gson gson = new Gson();
54 private final Duration pingPeriod;
55 private final Consumer<String> errorHandler;
56 private final ScheduledFuture<?> watchDogHandle;
57 private final ScheduledFuture<?> pingHandle;
58 private final Map<String, BiConsumer<String, Object>> propertyChangedListeners = new HashMap<>();
59 private final AtomicReference<Instant> lastTimeReceived = new AtomicReference<>(Instant.now());
60 private final AtomicReference<Optional<Session>> sessionRef = new AtomicReference<>(Optional.empty());
65 * @param executor the executor to use
66 * @param errorHandler the errorHandler
67 * @param pingPeriod the period pings should be sent
69 WebSocketConnectionImpl(ScheduledExecutorService executor, Consumer<String> errorHandler, Duration pingPeriod) {
70 this.errorHandler = errorHandler;
71 this.pingPeriod = pingPeriod;
73 // send a ping message are x seconds to validate if the connection is not broken
74 this.pingHandle = executor.scheduleWithFixedDelay(this::sendPing, pingPeriod.dividedBy(2).toMillis(),
75 pingPeriod.toMillis(), TimeUnit.MILLISECONDS);
77 // checks if a message (regular message or pong message) has been received recently. If not, connection is
79 this.watchDogHandle = executor.scheduleWithFixedDelay(this::checkConnection, pingPeriod.toMillis(),
80 pingPeriod.toMillis(), TimeUnit.MILLISECONDS);
85 sessionRef.getAndSet(Optional.empty()).ifPresent(Session::close);
86 watchDogHandle.cancel(true);
87 pingHandle.cancel(true);
91 public void observeProperty(@NonNull String propertyName, @NonNull BiConsumer<String, Object> listener) {
92 propertyChangedListeners.put(propertyName, listener);
96 public void onWebSocketConnect(@Nullable Session session) {
97 sessionRef.set(Optional.ofNullable(session)); // save websocket session to be able to send ping
101 public void onWebSocketPing(@Nullable ByteBuffer payload) {
105 public void onWebSocketPong(@Nullable ByteBuffer payload) {
106 lastTimeReceived.set(Instant.now());
110 public void onWebSocketBinary(byte @Nullable [] payload, int offset, int len) {
114 public void onWebSocketText(@Nullable String message) {
116 if (message != null) {
117 var propertyStatus = gson.fromJson(message, PropertyStatusMessage.class);
118 if ((propertyStatus != null) && (propertyStatus.messageType != null)
119 && (propertyStatus.messageType.equals("propertyStatus"))) {
120 for (var propertyEntry : propertyStatus.data.entrySet()) {
121 var listener = propertyChangedListeners.getOrDefault(propertyEntry.getKey(),
122 EMPTY_PROPERTY_CHANGED_LISTENER);
124 listener.accept(propertyEntry.getKey(), propertyEntry.getValue());
125 } catch (RuntimeException re) {
126 logger.warn("calling property change listener {} failed. {}", listener, re.getMessage());
130 logger.debug("Ignoring received message of unknown type: {}", message);
133 } catch (JsonSyntaxException se) {
134 logger.warn("received invalid message: {}", message);
139 public void onWebSocketClose(int statusCode, @Nullable String reason) {
140 onWebSocketError(new IOException("websocket closed by peer. " + Optional.ofNullable(reason).orElse("")));
144 public void onWebSocketError(@Nullable Throwable cause) {
147 reason = cause.getMessage();
152 private void onError(@Nullable String message) {
153 if (message == null) {
156 errorHandler.accept(message);
159 private void sendPing() {
160 var optionalSession = sessionRef.get();
161 if (optionalSession.isPresent()) {
163 optionalSession.get().getRemote().sendPing(ByteBuffer.wrap(Instant.now().toString().getBytes()));
164 } catch (IOException e) {
165 onError("could not send ping " + e.getMessage());
171 public boolean isAlive() {
172 var elapsedSinceLastReceived = Duration.between(lastTimeReceived.get(), Instant.now());
173 var thresholdOverdued = pingPeriod.multipliedBy(3);
174 var isOverdued = elapsedSinceLastReceived.toMillis() > thresholdOverdued.toMillis();
175 return sessionRef.get().isPresent() && !isOverdued;
178 private void checkConnection() {
179 // check if connection is alive (message has been received recently)
181 onError("connection seems to be broken (last message received at " + lastTimeReceived.get() + ", "
182 + Duration.between(lastTimeReceived.get(), Instant.now()).getSeconds() + " sec ago)");