2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.mielecloud.internal.webservice.sse;
15 import java.util.ArrayList;
16 import java.util.List;
17 import java.util.concurrent.ScheduledExecutorService;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.TimeoutException;
21 import org.eclipse.jdt.annotation.NonNullByDefault;
22 import org.eclipse.jdt.annotation.Nullable;
23 import org.eclipse.jetty.client.api.Request;
24 import org.eclipse.jetty.client.api.Response;
25 import org.eclipse.jetty.client.api.Result;
26 import org.eclipse.jetty.client.util.InputStreamResponseListener;
27 import org.openhab.binding.mielecloud.internal.webservice.ConnectionError;
28 import org.openhab.binding.mielecloud.internal.webservice.HttpUtil;
29 import org.openhab.binding.mielecloud.internal.webservice.exception.AuthorizationFailedException;
30 import org.openhab.binding.mielecloud.internal.webservice.exception.MieleWebserviceDisconnectSseException;
31 import org.openhab.binding.mielecloud.internal.webservice.exception.MieleWebserviceException;
32 import org.openhab.binding.mielecloud.internal.webservice.exception.MieleWebserviceTransientException;
33 import org.openhab.binding.mielecloud.internal.webservice.exception.TooManyRequestsException;
34 import org.openhab.binding.mielecloud.internal.webservice.retry.AuthorizationFailedRetryStrategy;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
39 * An active or inactive SSE connection emitting a stream of events.
41 * @author Björn Lange - Initial Contribution
44 public final class SseConnection {
45 private static final long CONNECTION_TIMEOUT = 30;
46 private static final TimeUnit CONNECTION_TIMEOUT_UNIT = TimeUnit.SECONDS;
48 private final Logger logger = LoggerFactory.getLogger(SseConnection.class);
50 private final String endpoint;
51 private final SseRequestFactory requestFactory;
52 private final ScheduledExecutorService scheduler;
53 private final BackoffStrategy backoffStrategy;
55 private final List<SseListener> listeners = new ArrayList<>();
57 private boolean active = false;
59 private int failedConnectionAttempts = 0;
62 private Request sseRequest;
65 * Creates a new {@link SseConnection} to the given endpoint.
67 * Note: It is required to call {@link #connect()} in order to open the connection and start receiving events.
69 * @param endpoint The endpoint to connect to.
70 * @param requestFactory Factory for creating requests.
71 * @param scheduler Scheduler to run scheduled and concurrent tasks on.
73 public SseConnection(String endpoint, SseRequestFactory requestFactory, ScheduledExecutorService scheduler) {
74 this(endpoint, requestFactory, scheduler, new ExponentialBackoffWithJitter());
78 * Creates a new {@link SseConnection} to the given endpoint.
80 * Note: It is required to call {@link #connect()} in order to open the connection and start receiving events.
82 * @param endpoint The endpoint to connect to.
83 * @param requestFactory Factory for creating requests.
84 * @param scheduler Scheduler to run scheduled and concurrent tasks on.
85 * @param backoffStrategy Strategy for deriving the wait time between connection attempts.
87 SseConnection(String endpoint, SseRequestFactory requestFactory, ScheduledExecutorService scheduler,
88 BackoffStrategy backoffStrategy) {
89 this.endpoint = endpoint;
90 this.requestFactory = requestFactory;
91 this.scheduler = scheduler;
92 this.backoffStrategy = backoffStrategy;
95 public synchronized void connect() {
100 private synchronized void connectInternal() {
105 Request runningRequest = this.sseRequest;
106 if (runningRequest != null) {
110 logger.debug("Opening SSE connection...");
111 Request sseRequest = createRequest();
112 if (sseRequest == null) {
113 logger.warn("Could not create SSE request, not opening SSE connection.");
117 final InputStreamResponseListener stream = new InputStreamResponseListener();
118 SseStreamParser eventStreamParser = new SseStreamParser(stream.getInputStream(), this::onServerSentEvent,
119 this::onSseStreamClosed);
121 sseRequest = sseRequest
123 response -> scheduler.schedule(eventStreamParser::parseAndDispatchEvents, 0, TimeUnit.SECONDS))
124 .onComplete(result -> onConnectionComplete(result));
125 sseRequest.send(stream);
126 this.sseRequest = sseRequest;
130 private Request createRequest() {
131 Request sseRequest = requestFactory.createSseRequest(endpoint);
132 if (sseRequest == null) {
136 return sseRequest.timeout(0, TimeUnit.SECONDS).idleTimeout(CONNECTION_TIMEOUT, CONNECTION_TIMEOUT_UNIT);
139 private synchronized void onSseStreamClosed(@Nullable Throwable exception) {
140 if (exception != null && AuthorizationFailedRetryStrategy.JETTY_401_HEADER_BODY_MISMATCH_EXCEPTION_MESSAGE
141 .equals(exception.getMessage())) {
142 onConnectionError(ConnectionError.AUTHORIZATION_FAILED);
143 } else if (exception instanceof TimeoutException) {
144 onConnectionError(ConnectionError.TIMEOUT);
146 onConnectionError(ConnectionError.SSE_STREAM_ENDED);
150 private synchronized void onConnectionComplete(@Nullable Result result) {
153 if (result == null) {
154 logger.warn("SSE stream was closed but there was no result delivered.");
155 onConnectionError(ConnectionError.SSE_STREAM_ENDED);
159 Response response = result.getResponse();
160 if (response == null) {
161 logger.warn("SSE stream was closed without response.");
162 onConnectionError(ConnectionError.SSE_STREAM_ENDED);
166 onConnectionClosed(response);
169 private void onConnectionClosed(Response response) {
171 HttpUtil.checkHttpSuccess(response);
172 onConnectionError(ConnectionError.SSE_STREAM_ENDED);
173 } catch (AuthorizationFailedException e) {
174 onConnectionError(ConnectionError.AUTHORIZATION_FAILED);
175 } catch (TooManyRequestsException e) {
176 long secondsUntilRetry = e.getSecondsUntilRetry();
177 if (secondsUntilRetry < 0) {
178 onConnectionError(ConnectionError.TOO_MANY_RERQUESTS);
180 onConnectionError(ConnectionError.TOO_MANY_RERQUESTS, secondsUntilRetry);
182 } catch (MieleWebserviceTransientException e) {
183 onConnectionError(e.getConnectionError(), 0);
184 } catch (MieleWebserviceException e) {
185 onConnectionError(e.getConnectionError());
189 private void onConnectionError(ConnectionError connectionError) {
190 onConnectionError(connectionError, backoffStrategy.getSecondsUntilRetry(failedConnectionAttempts));
193 private synchronized void onConnectionError(ConnectionError connectionError, long secondsUntilRetry) {
198 if (connectionError != ConnectionError.AUTHORIZATION_FAILED) {
199 scheduleReconnect(secondsUntilRetry);
202 fireConnectionError(connectionError);
203 failedConnectionAttempts++;
206 private void scheduleReconnect(long secondsUntilRetry) {
207 long retryInSeconds = Math.max(backoffStrategy.getMinimumSecondsUntilRetry(),
208 Math.min(secondsUntilRetry, backoffStrategy.getMaximumSecondsUntilRetry()));
209 scheduler.schedule(this::connectInternal, retryInSeconds, TimeUnit.SECONDS);
210 logger.debug("Scheduled reconnect attempt for Miele webservice to take place in {} seconds", retryInSeconds);
213 public synchronized void disconnect() {
216 Request runningRequest = sseRequest;
217 if (runningRequest == null) {
218 logger.debug("SSE connection is not established, skipping SSE disconnect.");
222 logger.debug("Disconnecting SSE");
223 runningRequest.abort(new MieleWebserviceDisconnectSseException());
225 logger.debug("Disconnected");
228 private void onServerSentEvent(ServerSentEvent event) {
229 failedConnectionAttempts = 0;
230 listeners.forEach(l -> l.onServerSentEvent(event));
233 private void fireConnectionError(ConnectionError connectionError) {
234 listeners.forEach(l -> l.onConnectionError(connectionError, failedConnectionAttempts));
237 public void addSseListener(SseListener listener) {
238 listeners.add(listener);