2 * Copyright (c) 2010-2024 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.mielecloud.internal.webservice.sse;
15 import static org.junit.jupiter.api.Assertions.*;
16 import static org.mockito.ArgumentMatchers.*;
17 import static org.mockito.Mockito.*;
18 import static org.openhab.binding.mielecloud.internal.util.ReflectionUtil.*;
20 import java.util.Objects;
21 import java.util.concurrent.ScheduledExecutorService;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.TimeoutException;
25 import org.eclipse.jdt.annotation.NonNullByDefault;
26 import org.eclipse.jdt.annotation.Nullable;
27 import org.eclipse.jetty.client.api.Request;
28 import org.eclipse.jetty.client.api.Response;
29 import org.eclipse.jetty.client.api.Response.CompleteListener;
30 import org.eclipse.jetty.client.api.Response.HeadersListener;
31 import org.eclipse.jetty.client.api.Result;
32 import org.eclipse.jetty.http.HttpFields;
33 import org.junit.jupiter.api.Test;
34 import org.mockito.ArgumentMatchers;
35 import org.openhab.binding.mielecloud.internal.webservice.ConnectionError;
36 import org.openhab.binding.mielecloud.internal.webservice.exception.MieleWebserviceDisconnectSseException;
37 import org.openhab.binding.mielecloud.internal.webservice.retry.AuthorizationFailedRetryStrategy;
40 * @author Björn Lange - Initial contribution
43 public class SseConnectionTest {
44 private final String URL = "https://openhab.org/";
47 private Request request;
50 private SseRequestFactory sseRequestFactory;
53 private ScheduledExecutorService scheduler;
56 private BackoffStrategy backoffStrategy;
59 private SseListener sseListener;
62 private SseConnection sseConnection;
65 private HeadersListener registeredHeadersListener;
68 private CompleteListener registeredCompleteListener;
70 private SseRequestFactory mockSseRequestFactory(@Nullable Request request) {
71 SseRequestFactory factory = mock(SseRequestFactory.class);
72 when(factory.createSseRequest(URL)).thenReturn(request);
76 private ScheduledExecutorService mockScheduler() {
77 return mock(ScheduledExecutorService.class);
80 private Request mockRequest() {
81 Request request = mock(Request.class);
82 when(request.onResponseHeaders(any())).thenAnswer(invocation -> {
83 registeredHeadersListener = invocation.getArgument(0);
86 when(request.onComplete(any())).thenAnswer(invocation -> {
87 registeredCompleteListener = invocation.getArgument(0);
90 when(request.idleTimeout(anyLong(), any())).thenReturn(request);
91 when(request.timeout(anyLong(), any())).thenReturn(request);
95 private BackoffStrategy mockBackoffStrategy() {
96 BackoffStrategy backoffStrategy = mock(BackoffStrategy.class);
97 when(backoffStrategy.getSecondsUntilRetry(anyInt())).thenReturn(10L);
98 when(backoffStrategy.getMinimumSecondsUntilRetry()).thenReturn(5L);
99 when(backoffStrategy.getMaximumSecondsUntilRetry()).thenReturn(3600L);
100 return backoffStrategy;
103 private void setUpRunningConnection() {
104 request = mockRequest();
105 sseRequestFactory = mockSseRequestFactory(request);
106 scheduler = mockScheduler();
107 backoffStrategy = mockBackoffStrategy();
108 sseConnection = new SseConnection(URL, getMockedSseRequestFactory(), getMockedScheduler(),
109 getMockedBackoffStrategy());
111 sseListener = mock(SseListener.class);
112 getSseConnection().addSseListener(getMockedSseListener());
113 getSseConnection().connect();
115 getRegisteredHeadersListener().onHeaders(null);
118 private Request getMockedRequest() {
119 Request request = this.request;
120 assertNotNull(request);
121 return Objects.requireNonNull(request);
124 private SseRequestFactory getMockedSseRequestFactory() {
125 SseRequestFactory sseRequestFactory = this.sseRequestFactory;
126 assertNotNull(sseRequestFactory);
127 return Objects.requireNonNull(sseRequestFactory);
130 private ScheduledExecutorService getMockedScheduler() {
131 ScheduledExecutorService scheduler = this.scheduler;
132 assertNotNull(scheduler);
133 return Objects.requireNonNull(scheduler);
136 private BackoffStrategy getMockedBackoffStrategy() {
137 BackoffStrategy backoffStrategy = this.backoffStrategy;
138 assertNotNull(backoffStrategy);
139 return Objects.requireNonNull(backoffStrategy);
142 private SseListener getMockedSseListener() {
143 SseListener sseListener = this.sseListener;
144 assertNotNull(sseListener);
145 return Objects.requireNonNull(sseListener);
148 private SseConnection getSseConnection() {
149 SseConnection sseConnection = this.sseConnection;
150 assertNotNull(sseConnection);
151 return Objects.requireNonNull(sseConnection);
154 private HeadersListener getRegisteredHeadersListener() {
155 HeadersListener headersListener = registeredHeadersListener;
156 assertNotNull(headersListener);
157 return Objects.requireNonNull(headersListener);
160 private CompleteListener getRegisteredCompleteListener() {
161 CompleteListener completeListener = registeredCompleteListener;
162 assertNotNull(completeListener);
163 return Objects.requireNonNull(completeListener);
167 public void whenSseConnectionIsConnectedThenTheConnectionRequestIsMade() throws Exception {
169 Request request = mockRequest();
170 SseRequestFactory sseRequestFactory = mockSseRequestFactory(request);
171 ScheduledExecutorService scheduler = mockScheduler();
172 SseConnection sseConnection = new SseConnection(URL, sseRequestFactory, scheduler);
175 sseConnection.connect();
178 verify(request).send(any());
182 public void whenSseConnectionIsConnectedButNoRequestIsCreatedThenOnlyTheDesiredConnectionStateChanges()
185 SseRequestFactory sseRequestFactory = mockSseRequestFactory(null);
186 ScheduledExecutorService scheduler = mockScheduler();
187 SseConnection sseConnection = new SseConnection(URL, sseRequestFactory, scheduler);
190 sseConnection.connect();
193 assertTrue(((Boolean) getPrivate(sseConnection, "active")).booleanValue());
197 public void whenHeadersAreReceivedAfterTheSseConnectionWasConnectedThenTheEventStreamParserIsScheduled()
200 Request request = mockRequest();
201 SseRequestFactory sseRequestFactory = mockSseRequestFactory(request);
202 ScheduledExecutorService scheduler = mockScheduler();
203 SseConnection sseConnection = new SseConnection(URL, sseRequestFactory, scheduler);
204 sseConnection.connect();
205 HeadersListener headersListener = registeredHeadersListener;
206 assertNotNull(headersListener);
209 headersListener.onHeaders(null);
212 verify(scheduler).schedule(ArgumentMatchers.<Runnable> any(), anyLong(), any());
216 public void whenTheSseStreamIsClosedWithATimeoutThenAReconnectIsScheduledAndTheListenersAreNotified()
219 setUpRunningConnection();
222 invokePrivate(getSseConnection(), "onSseStreamClosed", new Class[] { Throwable.class }, new TimeoutException());
225 verify(getMockedScheduler(), times(2)).schedule(ArgumentMatchers.<Runnable> any(), anyLong(), any());
226 verify(getMockedSseListener()).onConnectionError(ConnectionError.TIMEOUT, 0);
227 verify(getMockedBackoffStrategy()).getSecondsUntilRetry(anyInt());
231 public void whenTheSseStreamIsClosedDueToAJetty401ErrorThenNoReconnectIsScheduledAndATokenRefreshIsRequested()
234 setUpRunningConnection();
237 invokePrivate(getSseConnection(), "onSseStreamClosed", new Class[] { Throwable.class }, new RuntimeException(
238 AuthorizationFailedRetryStrategy.JETTY_401_HEADER_BODY_MISMATCH_EXCEPTION_MESSAGE));
241 verify(getMockedScheduler()).schedule(ArgumentMatchers.<Runnable> any(), anyLong(), any());
242 verifyNoMoreInteractions(getMockedScheduler());
243 verify(getMockedSseListener()).onConnectionError(ConnectionError.AUTHORIZATION_FAILED, 0);
247 public void whenTheSseStreamIsClosedWithADifferentExceptionThanATimeoutThenAReconnectIsScheduledAndTheListenersAreNotified()
250 setUpRunningConnection();
253 invokePrivate(getSseConnection(), "onSseStreamClosed", new Class[] { Throwable.class },
254 new IllegalStateException());
257 verify(getMockedScheduler(), times(2)).schedule(ArgumentMatchers.<Runnable> any(), anyLong(), any());
258 verify(getMockedSseListener()).onConnectionError(ConnectionError.SSE_STREAM_ENDED, 0);
259 verify(getMockedBackoffStrategy()).getSecondsUntilRetry(anyInt());
263 public void whenTheSseRequestCompletesWithoutResultThenAReconnectIsScheduledAndTheListenersAreNotified()
266 setUpRunningConnection();
269 getRegisteredCompleteListener().onComplete(null);
272 verify(getMockedScheduler(), times(2)).schedule(ArgumentMatchers.<Runnable> any(), anyLong(), any());
273 verify(getMockedSseListener()).onConnectionError(ConnectionError.SSE_STREAM_ENDED, 0);
274 verify(getMockedBackoffStrategy()).getSecondsUntilRetry(anyInt());
278 public void whenTheSseRequestCompletesWithoutResponseThenAReconnectIsScheduledAndTheListenersAreNotified()
281 setUpRunningConnection();
283 Result result = mock(Result.class);
286 getRegisteredCompleteListener().onComplete(result);
289 verify(getMockedScheduler(), times(2)).schedule(ArgumentMatchers.<Runnable> any(), anyLong(), any());
290 verify(getMockedSseListener()).onConnectionError(ConnectionError.SSE_STREAM_ENDED, 0);
291 verify(getMockedBackoffStrategy()).getSecondsUntilRetry(anyInt());
295 public void whenTheSseRequestCompletesWithASuccessfulResponseThenAReconnectIsScheduledAndTheListenersAreNotified()
298 setUpRunningConnection();
300 Response response = mock(Response.class);
301 when(response.getStatus()).thenReturn(200);
303 Result result = mock(Result.class);
304 when(result.getResponse()).thenReturn(response);
307 getRegisteredCompleteListener().onComplete(result);
310 verify(getMockedScheduler(), times(2)).schedule(ArgumentMatchers.<Runnable> any(), anyLong(), any());
311 verify(getMockedSseListener()).onConnectionError(ConnectionError.SSE_STREAM_ENDED, 0);
312 verify(getMockedBackoffStrategy()).getSecondsUntilRetry(anyInt());
316 public void whenTheSseRequestCompletesWithAnAuthorizationFailedResponseThenTheListenersAreNotified()
319 setUpRunningConnection();
321 Response response = mock(Response.class);
322 when(response.getStatus()).thenReturn(401);
324 Result result = mock(Result.class);
325 when(result.getResponse()).thenReturn(response);
328 getRegisteredCompleteListener().onComplete(result);
331 verify(getMockedScheduler()).schedule(ArgumentMatchers.<Runnable> any(), anyLong(), any());
332 verify(getMockedSseListener()).onConnectionError(ConnectionError.AUTHORIZATION_FAILED, 0);
336 public void whenTheSseRequestCompletesWithATooManyRequestsResponseWithoutRetryAfterHeaderThenAReconnectIsScheduledAccordingToTheBackoffStrategyAndTheListenersAreNotified()
339 setUpRunningConnection();
341 Response response = mock(Response.class);
342 when(response.getStatus()).thenReturn(429);
343 when(response.getHeaders()).thenReturn(new HttpFields());
345 Result result = mock(Result.class);
346 when(result.getResponse()).thenReturn(response);
349 getRegisteredCompleteListener().onComplete(result);
352 verify(getMockedScheduler(), times(2)).schedule(ArgumentMatchers.<Runnable> any(), anyLong(), any());
353 verify(getMockedScheduler()).schedule(ArgumentMatchers.<Runnable> any(), eq(10L), eq(TimeUnit.SECONDS));
354 verify(getMockedSseListener()).onConnectionError(ConnectionError.TOO_MANY_RERQUESTS, 0);
355 verify(getMockedBackoffStrategy()).getSecondsUntilRetry(anyInt());
359 public void whenTheSseRequestCompletesWithATooManyRequestsResponseWithRetryAfterHeaderThenAReconnectIsScheduledAndTheListenersAreNotified()
362 setUpRunningConnection();
364 Response response = mock(Response.class);
365 when(response.getStatus()).thenReturn(429);
366 HttpFields httpFields = new HttpFields();
367 httpFields.add("Retry-After", "3600");
368 when(response.getHeaders()).thenReturn(httpFields);
370 Result result = mock(Result.class);
371 when(result.getResponse()).thenReturn(response);
374 getRegisteredCompleteListener().onComplete(result);
377 verify(getMockedScheduler()).schedule(ArgumentMatchers.<Runnable> any(), eq(3600L), eq(TimeUnit.SECONDS));
378 verify(getMockedSseListener()).onConnectionError(ConnectionError.TOO_MANY_RERQUESTS, 0);
382 public void whenTheSseRequestCompletesWithATooManyRequestsResponseWithRetryAfterHeaderWithTooLowValueThenAReconnectIsScheduledWithTheMinimumWaitTime()
385 setUpRunningConnection();
387 Response response = mock(Response.class);
388 when(response.getStatus()).thenReturn(429);
389 HttpFields httpFields = new HttpFields();
390 httpFields.add("Retry-After", "1");
391 when(response.getHeaders()).thenReturn(httpFields);
393 Result result = mock(Result.class);
394 when(result.getResponse()).thenReturn(response);
397 getRegisteredCompleteListener().onComplete(result);
400 verify(getMockedScheduler()).schedule(ArgumentMatchers.<Runnable> any(), eq(5L), eq(TimeUnit.SECONDS));
401 verify(getMockedSseListener()).onConnectionError(ConnectionError.TOO_MANY_RERQUESTS, 0);
405 public void whenTheSseRequestCompletesWithATooManyRequestsResponseWithRetryAfterHeaderWithTooHighValueThenAReconnectIsScheduledWithTheMaximumWaitTime()
408 setUpRunningConnection();
410 Response response = mock(Response.class);
411 when(response.getStatus()).thenReturn(429);
412 HttpFields httpFields = new HttpFields();
413 httpFields.add("Retry-After", "3601");
414 when(response.getHeaders()).thenReturn(httpFields);
416 Result result = mock(Result.class);
417 when(result.getResponse()).thenReturn(response);
420 getRegisteredCompleteListener().onComplete(result);
423 verify(getMockedScheduler()).schedule(ArgumentMatchers.<Runnable> any(), eq(3600L), eq(TimeUnit.SECONDS));
424 verify(getMockedSseListener()).onConnectionError(ConnectionError.TOO_MANY_RERQUESTS, 0);
428 public void whenTheSseRequestCompletesWithAnInternalServerErrorResponseThenAReconnectIsScheduledAndTheListenersAreNotified()
431 setUpRunningConnection();
433 Response response = mock(Response.class);
434 when(response.getStatus()).thenReturn(500);
436 Result result = mock(Result.class);
437 when(result.getResponse()).thenReturn(response);
440 getRegisteredCompleteListener().onComplete(result);
443 verify(getMockedScheduler(), times(2)).schedule(ArgumentMatchers.<Runnable> any(), anyLong(), any());
444 verify(getMockedSseListener()).onConnectionError(ConnectionError.SERVER_ERROR, 0);
448 public void whenTheSseRequestCompletesWithAnInternalServerErrorResponseMultipleTimesThenTheConnectionFailedCounterIsIncrementedEachTime()
451 setUpRunningConnection();
453 Response response = mock(Response.class);
454 when(response.getStatus()).thenReturn(500);
456 Result result = mock(Result.class);
457 when(result.getResponse()).thenReturn(response);
460 getRegisteredCompleteListener().onComplete(result);
461 getRegisteredCompleteListener().onComplete(result);
464 verify(getMockedSseListener()).onConnectionError(ConnectionError.SERVER_ERROR, 0);
465 verify(getMockedSseListener()).onConnectionError(ConnectionError.SERVER_ERROR, 1);
469 public void whenTheSseRequestCompletesWithAnUnknownErrorResponseThenAReconnectIsScheduledAndTheListenersAreNotified()
472 setUpRunningConnection();
474 Response response = mock(Response.class);
475 when(response.getStatus()).thenReturn(600);
477 Result result = mock(Result.class);
478 when(result.getResponse()).thenReturn(response);
481 getRegisteredCompleteListener().onComplete(result);
484 verify(getMockedScheduler(), times(2)).schedule(ArgumentMatchers.<Runnable> any(), anyLong(), any());
485 verify(getMockedSseListener()).onConnectionError(ConnectionError.OTHER_HTTP_ERROR, 0);
486 verify(getMockedBackoffStrategy()).getSecondsUntilRetry(anyInt());
490 public void whenAServerSentEventIsReceivedThenItIsForwardedToTheListenersAndTheFailedConnectionCounterIsReset()
493 Request request = mockRequest();
494 SseRequestFactory sseRequestFactory = mockSseRequestFactory(request);
495 ScheduledExecutorService scheduler = mockScheduler();
497 BackoffStrategy backoffStrategy = mock(BackoffStrategy.class);
498 when(backoffStrategy.getSecondsUntilRetry(anyInt())).thenReturn(10L);
500 SseConnection sseConnection = new SseConnection(URL, sseRequestFactory, scheduler, backoffStrategy);
501 SseListener sseListener = mock(SseListener.class);
502 sseConnection.addSseListener(sseListener);
503 setPrivate(sseConnection, "failedConnectionAttempts", 10);
504 sseConnection.connect();
506 HeadersListener headersListener = registeredHeadersListener;
507 assertNotNull(headersListener);
508 headersListener.onHeaders(null);
510 ServerSentEvent serverSentEvent = new ServerSentEvent("ping", "ping");
513 invokePrivate(sseConnection, "onServerSentEvent", serverSentEvent);
516 verify(sseListener).onServerSentEvent(serverSentEvent);
517 assertEquals(0, (int) getPrivate(sseConnection, "failedConnectionAttempts"));
521 public void whenTheSseStreamIsDisconnectedThenTheRunningRequestIsAborted() throws Exception {
523 setUpRunningConnection();
526 getSseConnection().disconnect();
529 verify(getMockedRequest()).abort(any());
530 assertNull(getPrivate(getSseConnection(), "sseRequest"));
534 public void whenTheSseStreamIsDisconnectedThenTheConnectionIsClosedAndNoReconnectIsScheduledAndTheListenersAreNotNotified()
537 setUpRunningConnection();
540 getSseConnection().disconnect();
541 invokePrivate(getSseConnection(), "onSseStreamClosed", new Class[] { Throwable.class },
542 new MieleWebserviceDisconnectSseException());
545 verify(getMockedScheduler()).schedule(ArgumentMatchers.<Runnable> any(), anyLong(), any());
546 verifyNoMoreInteractions(getMockedScheduler());
547 verifyNoInteractions(getMockedSseListener());
551 public void whenAPendingReconnectAttemptIsPerformedAfterTheSseConnectionWasDisconnectedThenTheConnectionIsNotRestored()
554 setUpRunningConnection();
555 getSseConnection().disconnect();
558 invokePrivate(getSseConnection(), "connectInternal");
561 verify(getMockedScheduler()).schedule(ArgumentMatchers.<Runnable> any(), anyLong(), any());
562 verifyNoMoreInteractions(getMockedScheduler());
563 verifyNoInteractions(getMockedSseListener());
567 public void whenTheSseConnectionIsConnectedMultipleTimesWithoutDisconnectingThenOnlyTheFirstConnectResultsInAnConnectionAttempt()
570 Request request = mockRequest();
571 SseRequestFactory sseRequestFactory = mockSseRequestFactory(request);
572 ScheduledExecutorService scheduler = mockScheduler();
573 SseConnection sseConnection = new SseConnection(URL, sseRequestFactory, scheduler);
574 sseConnection.connect();
577 sseConnection.connect();
580 verify(request, times(1)).onResponseHeaders(any());
584 public void whenTheSseConnectionIsDisconnectedMultipleTimesWithoutConnectingAgainThenOnlyTheFirstDisconnectIsPerformed()
587 Request request = mockRequest();
588 SseRequestFactory sseRequestFactory = mockSseRequestFactory(request);
589 ScheduledExecutorService scheduler = mockScheduler();
590 SseConnection sseConnection = new SseConnection(URL, sseRequestFactory, scheduler);
591 sseConnection.connect();
592 sseConnection.disconnect();
595 sseConnection.disconnect();
598 verify(request, times(1)).abort(any());