]> git.basschouten.com Git - openhab-addons.git/blob
24416440c353fc1f3394c8bb9838dd6857316396
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mielecloud.internal.webservice.sse;
14
15 import java.util.ArrayList;
16 import java.util.List;
17 import java.util.concurrent.ScheduledExecutorService;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.TimeoutException;
20
21 import org.eclipse.jdt.annotation.NonNullByDefault;
22 import org.eclipse.jdt.annotation.Nullable;
23 import org.eclipse.jetty.client.api.Request;
24 import org.eclipse.jetty.client.api.Response;
25 import org.eclipse.jetty.client.api.Result;
26 import org.eclipse.jetty.client.util.InputStreamResponseListener;
27 import org.openhab.binding.mielecloud.internal.webservice.ConnectionError;
28 import org.openhab.binding.mielecloud.internal.webservice.HttpUtil;
29 import org.openhab.binding.mielecloud.internal.webservice.exception.AuthorizationFailedException;
30 import org.openhab.binding.mielecloud.internal.webservice.exception.MieleWebserviceDisconnectSseException;
31 import org.openhab.binding.mielecloud.internal.webservice.exception.MieleWebserviceException;
32 import org.openhab.binding.mielecloud.internal.webservice.exception.MieleWebserviceTransientException;
33 import org.openhab.binding.mielecloud.internal.webservice.exception.TooManyRequestsException;
34 import org.openhab.binding.mielecloud.internal.webservice.retry.AuthorizationFailedRetryStrategy;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 /**
39  * An active or inactive SSE connection emitting a stream of events.
40  *
41  * @author Björn Lange - Initial Contribution
42  */
43 @NonNullByDefault
44 public final class SseConnection {
45     private static final long CONNECTION_TIMEOUT = 30;
46     private static final TimeUnit CONNECTION_TIMEOUT_UNIT = TimeUnit.SECONDS;
47
48     private final Logger logger = LoggerFactory.getLogger(SseConnection.class);
49
50     private final String endpoint;
51     private final SseRequestFactory requestFactory;
52     private final ScheduledExecutorService scheduler;
53     private final BackoffStrategy backoffStrategy;
54
55     private final List<SseListener> listeners = new ArrayList<>();
56
57     private boolean active = false;
58
59     private int failedConnectionAttempts = 0;
60
61     @Nullable
62     private Request sseRequest;
63
64     /**
65      * Creates a new {@link SseConnection} to the given endpoint.
66      *
67      * Note: It is required to call {@link #connect()} in order to open the connection and start receiving events.
68      *
69      * @param endpoint The endpoint to connect to.
70      * @param requestFactory Factory for creating requests.
71      * @param scheduler Scheduler to run scheduled and concurrent tasks on.
72      */
73     public SseConnection(String endpoint, SseRequestFactory requestFactory, ScheduledExecutorService scheduler) {
74         this(endpoint, requestFactory, scheduler, new ExponentialBackoffWithJitter());
75     }
76
77     /**
78      * Creates a new {@link SseConnection} to the given endpoint.
79      *
80      * Note: It is required to call {@link #connect()} in order to open the connection and start receiving events.
81      *
82      * @param endpoint The endpoint to connect to.
83      * @param requestFactory Factory for creating requests.
84      * @param scheduler Scheduler to run scheduled and concurrent tasks on.
85      * @param backoffStrategy Strategy for deriving the wait time between connection attempts.
86      */
87     SseConnection(String endpoint, SseRequestFactory requestFactory, ScheduledExecutorService scheduler,
88             BackoffStrategy backoffStrategy) {
89         this.endpoint = endpoint;
90         this.requestFactory = requestFactory;
91         this.scheduler = scheduler;
92         this.backoffStrategy = backoffStrategy;
93     }
94
95     public synchronized void connect() {
96         active = true;
97         connectInternal();
98     }
99
100     private synchronized void connectInternal() {
101         if (!active) {
102             return;
103         }
104
105         Request runningRequest = this.sseRequest;
106         if (runningRequest != null) {
107             return;
108         }
109
110         logger.debug("Opening SSE connection...");
111         Request sseRequest = createRequest();
112         if (sseRequest == null) {
113             logger.warn("Could not create SSE request, not opening SSE connection.");
114             return;
115         }
116
117         final InputStreamResponseListener stream = new InputStreamResponseListener();
118         SseStreamParser eventStreamParser = new SseStreamParser(stream.getInputStream(), this::onServerSentEvent,
119                 this::onSseStreamClosed);
120
121         sseRequest = sseRequest
122                 .onResponseHeaders(
123                         response -> scheduler.schedule(eventStreamParser::parseAndDispatchEvents, 0, TimeUnit.SECONDS))
124                 .onComplete(result -> onConnectionComplete(result));
125         sseRequest.send(stream);
126         this.sseRequest = sseRequest;
127     }
128
129     @Nullable
130     private Request createRequest() {
131         Request sseRequest = requestFactory.createSseRequest(endpoint);
132         if (sseRequest == null) {
133             return null;
134         }
135
136         return sseRequest.timeout(0, TimeUnit.SECONDS).idleTimeout(CONNECTION_TIMEOUT, CONNECTION_TIMEOUT_UNIT);
137     }
138
139     private synchronized void onSseStreamClosed(@Nullable Throwable exception) {
140         if (exception != null && AuthorizationFailedRetryStrategy.JETTY_401_HEADER_BODY_MISMATCH_EXCEPTION_MESSAGE
141                 .equals(exception.getMessage())) {
142             onConnectionError(ConnectionError.AUTHORIZATION_FAILED);
143         } else if (exception instanceof TimeoutException) {
144             onConnectionError(ConnectionError.TIMEOUT);
145         } else {
146             onConnectionError(ConnectionError.SSE_STREAM_ENDED);
147         }
148     }
149
150     private synchronized void onConnectionComplete(@Nullable Result result) {
151         sseRequest = null;
152
153         if (result == null) {
154             logger.warn("SSE stream was closed but there was no result delivered.");
155             onConnectionError(ConnectionError.SSE_STREAM_ENDED);
156             return;
157         }
158
159         Response response = result.getResponse();
160         if (response == null) {
161             logger.warn("SSE stream was closed without response.");
162             onConnectionError(ConnectionError.SSE_STREAM_ENDED);
163             return;
164         }
165
166         onConnectionClosed(response);
167     }
168
169     private void onConnectionClosed(Response response) {
170         try {
171             HttpUtil.checkHttpSuccess(response);
172             onConnectionError(ConnectionError.SSE_STREAM_ENDED);
173         } catch (AuthorizationFailedException e) {
174             onConnectionError(ConnectionError.AUTHORIZATION_FAILED);
175         } catch (TooManyRequestsException e) {
176             long secondsUntilRetry = e.getSecondsUntilRetry();
177             if (secondsUntilRetry < 0) {
178                 onConnectionError(ConnectionError.TOO_MANY_RERQUESTS);
179             } else {
180                 onConnectionError(ConnectionError.TOO_MANY_RERQUESTS, secondsUntilRetry);
181             }
182         } catch (MieleWebserviceTransientException e) {
183             onConnectionError(e.getConnectionError(), 0);
184         } catch (MieleWebserviceException e) {
185             onConnectionError(e.getConnectionError());
186         }
187     }
188
189     private void onConnectionError(ConnectionError connectionError) {
190         onConnectionError(connectionError, backoffStrategy.getSecondsUntilRetry(failedConnectionAttempts));
191     }
192
193     private synchronized void onConnectionError(ConnectionError connectionError, long secondsUntilRetry) {
194         if (!active) {
195             return;
196         }
197
198         if (connectionError != ConnectionError.AUTHORIZATION_FAILED) {
199             scheduleReconnect(secondsUntilRetry);
200         }
201
202         fireConnectionError(connectionError);
203         failedConnectionAttempts++;
204     }
205
206     private void scheduleReconnect(long secondsUntilRetry) {
207         long retryInSeconds = Math.max(backoffStrategy.getMinimumSecondsUntilRetry(),
208                 Math.min(secondsUntilRetry, backoffStrategy.getMaximumSecondsUntilRetry()));
209         scheduler.schedule(this::connectInternal, retryInSeconds, TimeUnit.SECONDS);
210         logger.debug("Scheduled reconnect attempt for Miele webservice to take place in {} seconds", retryInSeconds);
211     }
212
213     public synchronized void disconnect() {
214         active = false;
215
216         Request runningRequest = sseRequest;
217         if (runningRequest == null) {
218             logger.debug("SSE connection is not established, skipping SSE disconnect.");
219             return;
220         }
221
222         logger.debug("Disconnecting SSE");
223         runningRequest.abort(new MieleWebserviceDisconnectSseException());
224         sseRequest = null;
225         logger.debug("Disconnected");
226     }
227
228     private void onServerSentEvent(ServerSentEvent event) {
229         failedConnectionAttempts = 0;
230         listeners.forEach(l -> l.onServerSentEvent(event));
231     }
232
233     private void fireConnectionError(ConnectionError connectionError) {
234         listeners.forEach(l -> l.onConnectionError(connectionError, failedConnectionAttempts));
235     }
236
237     public void addSseListener(SseListener listener) {
238         listeners.add(listener);
239     }
240 }