]> git.basschouten.com Git - openhab-addons.git/blob
e491a9e8dcc2fe2866be0f997d00fd3926def15d
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2021 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.webthing.internal.client;
14
15 import java.io.IOException;
16 import java.nio.ByteBuffer;
17 import java.time.Duration;
18 import java.time.Instant;
19 import java.util.HashMap;
20 import java.util.Map;
21 import java.util.Optional;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.ScheduledFuture;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.atomic.AtomicReference;
26 import java.util.function.BiConsumer;
27 import java.util.function.Consumer;
28
29 import org.eclipse.jdt.annotation.NonNull;
30 import org.eclipse.jdt.annotation.NonNullByDefault;
31 import org.eclipse.jdt.annotation.Nullable;
32 import org.eclipse.jetty.websocket.api.Session;
33 import org.eclipse.jetty.websocket.api.WebSocketListener;
34 import org.eclipse.jetty.websocket.api.WebSocketPingPongListener;
35 import org.openhab.binding.webthing.internal.client.dto.PropertyStatusMessage;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 import com.google.gson.Gson;
40 import com.google.gson.JsonSyntaxException;
41
42 /**
43  * The WebsocketConnection implementation
44  *
45  * @author Gregor Roth - Initial contribution
46  */
47 @NonNullByDefault
48 public class WebSocketConnectionImpl implements WebSocketConnection, WebSocketListener, WebSocketPingPongListener {
49     private static final BiConsumer<String, Object> EMPTY_PROPERTY_CHANGED_LISTENER = (String propertyName,
50             Object value) -> {
51     };
52     private final Logger logger = LoggerFactory.getLogger(WebSocketConnectionImpl.class);
53     private final Gson gson = new Gson();
54     private final Duration pingPeriod;
55     private final Consumer<String> errorHandler;
56     private final ScheduledFuture<?> watchDogHandle;
57     private final ScheduledFuture<?> pingHandle;
58     private final Map<String, BiConsumer<String, Object>> propertyChangedListeners = new HashMap<>();
59     private final AtomicReference<Instant> lastTimeReceived = new AtomicReference<>(Instant.now());
60     private final AtomicReference<Optional<Session>> sessionRef = new AtomicReference<>(Optional.empty());
61
62     /**
63      * constructor
64      *
65      * @param executor the executor to use
66      * @param errorHandler the errorHandler
67      * @param pingPeriod the period pings should be sent
68      */
69     WebSocketConnectionImpl(ScheduledExecutorService executor, Consumer<String> errorHandler, Duration pingPeriod) {
70         this.errorHandler = errorHandler;
71         this.pingPeriod = pingPeriod;
72
73         // send a ping message are x seconds to validate if the connection is not broken
74         this.pingHandle = executor.scheduleWithFixedDelay(this::sendPing, pingPeriod.dividedBy(2).toMillis(),
75                 pingPeriod.toMillis(), TimeUnit.MILLISECONDS);
76
77         // checks if a message (regular message or pong message) has been received recently. If not, connection is
78         // seen as broken
79         this.watchDogHandle = executor.scheduleWithFixedDelay(this::checkConnection, pingPeriod.toMillis(),
80                 pingPeriod.toMillis(), TimeUnit.MILLISECONDS);
81     }
82
83     @Override
84     public void close() {
85         sessionRef.getAndSet(Optional.empty()).ifPresent(Session::close);
86         watchDogHandle.cancel(true);
87         pingHandle.cancel(true);
88     }
89
90     @Override
91     public void observeProperty(@NonNull String propertyName, @NonNull BiConsumer<String, Object> listener) {
92         propertyChangedListeners.put(propertyName, listener);
93     }
94
95     @Override
96     public void onWebSocketConnect(@Nullable Session session) {
97         sessionRef.set(Optional.ofNullable(session)); // save websocket session to be able to send ping
98     }
99
100     @Override
101     public void onWebSocketPing(@Nullable ByteBuffer payload) {
102     }
103
104     @Override
105     public void onWebSocketPong(@Nullable ByteBuffer payload) {
106         lastTimeReceived.set(Instant.now());
107     }
108
109     @Override
110     public void onWebSocketBinary(byte @Nullable [] payload, int offset, int len) {
111     }
112
113     @Override
114     public void onWebSocketText(@Nullable String message) {
115         try {
116             if (message != null) {
117                 var propertyStatus = gson.fromJson(message, PropertyStatusMessage.class);
118                 if ((propertyStatus != null) && (propertyStatus.messageType != null)
119                         && (propertyStatus.messageType.equals("propertyStatus"))) {
120                     for (var propertyEntry : propertyStatus.data.entrySet()) {
121                         var listener = propertyChangedListeners.getOrDefault(propertyEntry.getKey(),
122                                 EMPTY_PROPERTY_CHANGED_LISTENER);
123                         try {
124                             listener.accept(propertyEntry.getKey(), propertyEntry.getValue());
125                         } catch (RuntimeException re) {
126                             logger.warn("calling property change listener {} failed. {}", listener, re.getMessage());
127                         }
128                     }
129                 } else {
130                     logger.debug("Ignoring received message of unknown type: {}", message);
131                 }
132             }
133         } catch (JsonSyntaxException se) {
134             logger.warn("received invalid message: {}", message);
135         }
136     }
137
138     @Override
139     public void onWebSocketClose(int statusCode, @Nullable String reason) {
140         onWebSocketError(new IOException("websocket closed by peer. " + Optional.ofNullable(reason).orElse("")));
141     }
142
143     @Override
144     public void onWebSocketError(@Nullable Throwable cause) {
145         var reason = "";
146         if (cause != null) {
147             reason = cause.getMessage();
148         }
149         onError(reason);
150     }
151
152     private void onError(@Nullable String message) {
153         if (message == null) {
154             message = "";
155         }
156         errorHandler.accept(message);
157     }
158
159     private void sendPing() {
160         var optionalSession = sessionRef.get();
161         if (optionalSession.isPresent()) {
162             try {
163                 optionalSession.get().getRemote().sendPing(ByteBuffer.wrap(Instant.now().toString().getBytes()));
164             } catch (IOException e) {
165                 onError("could not send ping " + e.getMessage());
166             }
167         }
168     }
169
170     @Override
171     public boolean isAlive() {
172         var elapsedSinceLastReceived = Duration.between(lastTimeReceived.get(), Instant.now());
173         var thresholdOverdued = pingPeriod.multipliedBy(3);
174         var isOverdued = elapsedSinceLastReceived.toMillis() > thresholdOverdued.toMillis();
175         return sessionRef.get().isPresent() && !isOverdued;
176     }
177
178     private void checkConnection() {
179         // check if connection is alive (message has been received recently)
180         if (!isAlive()) {
181             onError("connection seems to be broken (last message received at " + lastTimeReceived.get() + ", "
182                     + Duration.between(lastTimeReceived.get(), Instant.now()).getSeconds() + " sec ago)");
183         }
184     }
185 }