]> git.basschouten.com Git - openhab-addons.git/blob
93cf20cd0128ac8c555998dceb0eca9da05198bd
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.webthing.internal.client;
14
15 import java.io.IOException;
16 import java.nio.ByteBuffer;
17 import java.time.Duration;
18 import java.time.Instant;
19 import java.util.HashMap;
20 import java.util.Map;
21 import java.util.Optional;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.ScheduledFuture;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.atomic.AtomicReference;
26 import java.util.function.BiConsumer;
27 import java.util.function.Consumer;
28
29 import org.eclipse.jdt.annotation.NonNullByDefault;
30 import org.eclipse.jdt.annotation.Nullable;
31 import org.eclipse.jetty.websocket.api.Session;
32 import org.eclipse.jetty.websocket.api.WebSocketListener;
33 import org.eclipse.jetty.websocket.api.WebSocketPingPongListener;
34 import org.openhab.binding.webthing.internal.client.dto.PropertyStatusMessage;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 import com.google.gson.Gson;
39 import com.google.gson.JsonSyntaxException;
40
41 /**
42  * The WebsocketConnection implementation
43  *
44  * @author Gregor Roth - Initial contribution
45  */
46 @NonNullByDefault
47 public class WebSocketConnectionImpl implements WebSocketConnection, WebSocketListener, WebSocketPingPongListener {
48     private static final BiConsumer<String, Object> EMPTY_PROPERTY_CHANGED_LISTENER = (String propertyName,
49             Object value) -> {
50     };
51     private final Logger logger = LoggerFactory.getLogger(WebSocketConnectionImpl.class);
52     private final Gson gson = new Gson();
53     private final Duration pingPeriod;
54     private final Consumer<String> errorHandler;
55     private final ScheduledFuture<?> watchDogHandle;
56     private final ScheduledFuture<?> pingHandle;
57     private final Map<String, BiConsumer<String, Object>> propertyChangedListeners = new HashMap<>();
58     private final AtomicReference<Instant> lastTimeReceived = new AtomicReference<>(Instant.now());
59     private final AtomicReference<Optional<Session>> sessionRef = new AtomicReference<>(Optional.empty());
60
61     /**
62      * constructor
63      *
64      * @param executor the executor to use
65      * @param errorHandler the errorHandler
66      * @param pingPeriod the period pings should be sent
67      */
68     WebSocketConnectionImpl(ScheduledExecutorService executor, Consumer<String> errorHandler, Duration pingPeriod) {
69         this.errorHandler = errorHandler;
70         this.pingPeriod = pingPeriod;
71
72         // send a ping message are x seconds to validate if the connection is not broken
73         this.pingHandle = executor.scheduleWithFixedDelay(this::sendPing, pingPeriod.dividedBy(2).toMillis(),
74                 pingPeriod.toMillis(), TimeUnit.MILLISECONDS);
75
76         // checks if a message (regular message or pong message) has been received recently. If not, connection is
77         // seen as broken
78         this.watchDogHandle = executor.scheduleWithFixedDelay(this::checkConnection, pingPeriod.toMillis(),
79                 pingPeriod.toMillis(), TimeUnit.MILLISECONDS);
80     }
81
82     @Override
83     public void close() {
84         sessionRef.getAndSet(Optional.empty()).ifPresent(Session::close);
85         watchDogHandle.cancel(true);
86         pingHandle.cancel(true);
87     }
88
89     @Override
90     public void observeProperty(String propertyName, BiConsumer<String, Object> listener) {
91         propertyChangedListeners.put(propertyName, listener);
92     }
93
94     @Override
95     public void onWebSocketConnect(@Nullable Session session) {
96         sessionRef.set(Optional.ofNullable(session)); // save websocket session to be able to send ping
97     }
98
99     @Override
100     public void onWebSocketPing(@Nullable ByteBuffer payload) {
101     }
102
103     @Override
104     public void onWebSocketPong(@Nullable ByteBuffer payload) {
105         lastTimeReceived.set(Instant.now());
106     }
107
108     @Override
109     public void onWebSocketBinary(byte @Nullable [] payload, int offset, int len) {
110     }
111
112     @Override
113     public void onWebSocketText(@Nullable String message) {
114         try {
115             if (message != null) {
116                 var propertyStatus = gson.fromJson(message, PropertyStatusMessage.class);
117                 if ((propertyStatus != null) && (propertyStatus.messageType != null)
118                         && (propertyStatus.messageType.equals("propertyStatus"))) {
119                     for (var propertyEntry : propertyStatus.data.entrySet()) {
120                         var listener = propertyChangedListeners.getOrDefault(propertyEntry.getKey(),
121                                 EMPTY_PROPERTY_CHANGED_LISTENER);
122                         try {
123                             listener.accept(propertyEntry.getKey(), propertyEntry.getValue());
124                         } catch (RuntimeException re) {
125                             logger.warn("calling property change listener {} failed. {}", listener, re.getMessage());
126                         }
127                     }
128                 } else {
129                     logger.debug("Ignoring received message of unknown type: {}", message);
130                 }
131             }
132         } catch (JsonSyntaxException se) {
133             logger.warn("received invalid message: {}", message);
134         }
135     }
136
137     @Override
138     public void onWebSocketClose(int statusCode, @Nullable String reason) {
139         onWebSocketError(new IOException("websocket closed by peer. " + Optional.ofNullable(reason).orElse("")));
140     }
141
142     @Override
143     public void onWebSocketError(@Nullable Throwable cause) {
144         var reason = "";
145         if (cause != null) {
146             reason = cause.getMessage();
147         }
148         onError(reason);
149     }
150
151     private void onError(@Nullable String message) {
152         if (message == null) {
153             message = "";
154         }
155         errorHandler.accept(message);
156     }
157
158     private void sendPing() {
159         var optionalSession = sessionRef.get();
160         if (optionalSession.isPresent()) {
161             try {
162                 optionalSession.get().getRemote().sendPing(ByteBuffer.wrap(Instant.now().toString().getBytes()));
163             } catch (IOException e) {
164                 onError("could not send ping " + e.getMessage());
165             }
166         }
167     }
168
169     @Override
170     public boolean isAlive() {
171         var elapsedSinceLastReceived = Duration.between(lastTimeReceived.get(), Instant.now());
172         var thresholdOverdued = pingPeriod.multipliedBy(3);
173         var isOverdued = elapsedSinceLastReceived.toMillis() > thresholdOverdued.toMillis();
174         return sessionRef.get().isPresent() && !isOverdued;
175     }
176
177     private void checkConnection() {
178         // check if connection is alive (message has been received recently)
179         if (!isAlive()) {
180             onError("connection seems to be broken (last message received at " + lastTimeReceived.get() + ", "
181                     + Duration.between(lastTimeReceived.get(), Instant.now()).getSeconds() + " sec ago)");
182         }
183     }
184 }