]> git.basschouten.com Git - openhab-addons.git/blob
ff7e03e6969868fc31b08b013c09bab88bc4a30a
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2024 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mielecloud.internal.webservice.sse;
14
15 import static org.junit.jupiter.api.Assertions.*;
16 import static org.mockito.ArgumentMatchers.*;
17 import static org.mockito.Mockito.*;
18 import static org.openhab.binding.mielecloud.internal.util.ReflectionUtil.*;
19
20 import java.util.Objects;
21 import java.util.concurrent.ScheduledExecutorService;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.TimeoutException;
24
25 import org.eclipse.jdt.annotation.NonNullByDefault;
26 import org.eclipse.jdt.annotation.Nullable;
27 import org.eclipse.jetty.client.api.Request;
28 import org.eclipse.jetty.client.api.Response;
29 import org.eclipse.jetty.client.api.Response.CompleteListener;
30 import org.eclipse.jetty.client.api.Response.HeadersListener;
31 import org.eclipse.jetty.client.api.Result;
32 import org.eclipse.jetty.http.HttpFields;
33 import org.junit.jupiter.api.Test;
34 import org.mockito.ArgumentMatchers;
35 import org.openhab.binding.mielecloud.internal.webservice.ConnectionError;
36 import org.openhab.binding.mielecloud.internal.webservice.exception.MieleWebserviceDisconnectSseException;
37 import org.openhab.binding.mielecloud.internal.webservice.retry.AuthorizationFailedRetryStrategy;
38
39 /**
40  * @author Björn Lange - Initial contribution
41  */
42 @NonNullByDefault
43 public class SseConnectionTest {
44     private final String URL = "https://openhab.org/";
45
46     @Nullable
47     private Request request;
48
49     @Nullable
50     private SseRequestFactory sseRequestFactory;
51
52     @Nullable
53     private ScheduledExecutorService scheduler;
54
55     @Nullable
56     private BackoffStrategy backoffStrategy;
57
58     @Nullable
59     private SseListener sseListener;
60
61     @Nullable
62     private SseConnection sseConnection;
63
64     @Nullable
65     private HeadersListener registeredHeadersListener;
66
67     @Nullable
68     private CompleteListener registeredCompleteListener;
69
70     private SseRequestFactory mockSseRequestFactory(@Nullable Request request) {
71         SseRequestFactory factory = mock(SseRequestFactory.class);
72         when(factory.createSseRequest(URL)).thenReturn(request);
73         return factory;
74     }
75
76     private ScheduledExecutorService mockScheduler() {
77         return mock(ScheduledExecutorService.class);
78     }
79
80     private Request mockRequest() {
81         Request request = mock(Request.class);
82         when(request.onResponseHeaders(any())).thenAnswer(invocation -> {
83             registeredHeadersListener = invocation.getArgument(0);
84             return request;
85         });
86         when(request.onComplete(any())).thenAnswer(invocation -> {
87             registeredCompleteListener = invocation.getArgument(0);
88             return request;
89         });
90         when(request.idleTimeout(anyLong(), any())).thenReturn(request);
91         when(request.timeout(anyLong(), any())).thenReturn(request);
92         return request;
93     }
94
95     private BackoffStrategy mockBackoffStrategy() {
96         BackoffStrategy backoffStrategy = mock(BackoffStrategy.class);
97         when(backoffStrategy.getSecondsUntilRetry(anyInt())).thenReturn(10L);
98         when(backoffStrategy.getMinimumSecondsUntilRetry()).thenReturn(5L);
99         when(backoffStrategy.getMaximumSecondsUntilRetry()).thenReturn(3600L);
100         return backoffStrategy;
101     }
102
103     private void setUpRunningConnection() {
104         request = mockRequest();
105         sseRequestFactory = mockSseRequestFactory(request);
106         scheduler = mockScheduler();
107         backoffStrategy = mockBackoffStrategy();
108         sseConnection = new SseConnection(URL, getMockedSseRequestFactory(), getMockedScheduler(),
109                 getMockedBackoffStrategy());
110
111         sseListener = mock(SseListener.class);
112         getSseConnection().addSseListener(getMockedSseListener());
113         getSseConnection().connect();
114
115         getRegisteredHeadersListener().onHeaders(null);
116     }
117
118     private Request getMockedRequest() {
119         Request request = this.request;
120         assertNotNull(request);
121         return Objects.requireNonNull(request);
122     }
123
124     private SseRequestFactory getMockedSseRequestFactory() {
125         SseRequestFactory sseRequestFactory = this.sseRequestFactory;
126         assertNotNull(sseRequestFactory);
127         return Objects.requireNonNull(sseRequestFactory);
128     }
129
130     private ScheduledExecutorService getMockedScheduler() {
131         ScheduledExecutorService scheduler = this.scheduler;
132         assertNotNull(scheduler);
133         return Objects.requireNonNull(scheduler);
134     }
135
136     private BackoffStrategy getMockedBackoffStrategy() {
137         BackoffStrategy backoffStrategy = this.backoffStrategy;
138         assertNotNull(backoffStrategy);
139         return Objects.requireNonNull(backoffStrategy);
140     }
141
142     private SseListener getMockedSseListener() {
143         SseListener sseListener = this.sseListener;
144         assertNotNull(sseListener);
145         return Objects.requireNonNull(sseListener);
146     }
147
148     private SseConnection getSseConnection() {
149         SseConnection sseConnection = this.sseConnection;
150         assertNotNull(sseConnection);
151         return Objects.requireNonNull(sseConnection);
152     }
153
154     private HeadersListener getRegisteredHeadersListener() {
155         HeadersListener headersListener = registeredHeadersListener;
156         assertNotNull(headersListener);
157         return Objects.requireNonNull(headersListener);
158     }
159
160     private CompleteListener getRegisteredCompleteListener() {
161         CompleteListener completeListener = registeredCompleteListener;
162         assertNotNull(completeListener);
163         return Objects.requireNonNull(completeListener);
164     }
165
166     @Test
167     public void whenSseConnectionIsConnectedThenTheConnectionRequestIsMade() throws Exception {
168         // given:
169         Request request = mockRequest();
170         SseRequestFactory sseRequestFactory = mockSseRequestFactory(request);
171         ScheduledExecutorService scheduler = mockScheduler();
172         SseConnection sseConnection = new SseConnection(URL, sseRequestFactory, scheduler);
173
174         // when:
175         sseConnection.connect();
176
177         // then:
178         verify(request).send(any());
179     }
180
181     @Test
182     public void whenSseConnectionIsConnectedButNoRequestIsCreatedThenOnlyTheDesiredConnectionStateChanges()
183             throws Exception {
184         // given:
185         SseRequestFactory sseRequestFactory = mockSseRequestFactory(null);
186         ScheduledExecutorService scheduler = mockScheduler();
187         SseConnection sseConnection = new SseConnection(URL, sseRequestFactory, scheduler);
188
189         // when:
190         sseConnection.connect();
191
192         // then:
193         assertTrue(((Boolean) getPrivate(sseConnection, "active")).booleanValue());
194     }
195
196     @Test
197     public void whenHeadersAreReceivedAfterTheSseConnectionWasConnectedThenTheEventStreamParserIsScheduled()
198             throws Exception {
199         // given:
200         Request request = mockRequest();
201         SseRequestFactory sseRequestFactory = mockSseRequestFactory(request);
202         ScheduledExecutorService scheduler = mockScheduler();
203         SseConnection sseConnection = new SseConnection(URL, sseRequestFactory, scheduler);
204         sseConnection.connect();
205         HeadersListener headersListener = registeredHeadersListener;
206         assertNotNull(headersListener);
207
208         // when:
209         headersListener.onHeaders(null);
210
211         // then:
212         verify(scheduler).schedule(ArgumentMatchers.<Runnable> any(), anyLong(), any());
213     }
214
215     @Test
216     public void whenTheSseStreamIsClosedWithATimeoutThenAReconnectIsScheduledAndTheListenersAreNotified()
217             throws Exception {
218         // given:
219         setUpRunningConnection();
220
221         // when:
222         invokePrivate(getSseConnection(), "onSseStreamClosed", new Class[] { Throwable.class }, new TimeoutException());
223
224         // then:
225         verify(getMockedScheduler(), times(2)).schedule(ArgumentMatchers.<Runnable> any(), anyLong(), any());
226         verify(getMockedSseListener()).onConnectionError(ConnectionError.TIMEOUT, 0);
227         verify(getMockedBackoffStrategy()).getSecondsUntilRetry(anyInt());
228     }
229
230     @Test
231     public void whenTheSseStreamIsClosedDueToAJetty401ErrorThenNoReconnectIsScheduledAndATokenRefreshIsRequested()
232             throws Exception {
233         // given:
234         setUpRunningConnection();
235
236         // when:
237         invokePrivate(getSseConnection(), "onSseStreamClosed", new Class[] { Throwable.class }, new RuntimeException(
238                 AuthorizationFailedRetryStrategy.JETTY_401_HEADER_BODY_MISMATCH_EXCEPTION_MESSAGE));
239
240         // then:
241         verify(getMockedScheduler()).schedule(ArgumentMatchers.<Runnable> any(), anyLong(), any());
242         verifyNoMoreInteractions(getMockedScheduler());
243         verify(getMockedSseListener()).onConnectionError(ConnectionError.AUTHORIZATION_FAILED, 0);
244     }
245
246     @Test
247     public void whenTheSseStreamIsClosedWithADifferentExceptionThanATimeoutThenAReconnectIsScheduledAndTheListenersAreNotified()
248             throws Exception {
249         // given:
250         setUpRunningConnection();
251
252         // when:
253         invokePrivate(getSseConnection(), "onSseStreamClosed", new Class[] { Throwable.class },
254                 new IllegalStateException());
255
256         // then:
257         verify(getMockedScheduler(), times(2)).schedule(ArgumentMatchers.<Runnable> any(), anyLong(), any());
258         verify(getMockedSseListener()).onConnectionError(ConnectionError.SSE_STREAM_ENDED, 0);
259         verify(getMockedBackoffStrategy()).getSecondsUntilRetry(anyInt());
260     }
261
262     @Test
263     public void whenTheSseRequestCompletesWithoutResultThenAReconnectIsScheduledAndTheListenersAreNotified()
264             throws Exception {
265         // given:
266         setUpRunningConnection();
267
268         // when:
269         getRegisteredCompleteListener().onComplete(null);
270
271         // then:
272         verify(getMockedScheduler(), times(2)).schedule(ArgumentMatchers.<Runnable> any(), anyLong(), any());
273         verify(getMockedSseListener()).onConnectionError(ConnectionError.SSE_STREAM_ENDED, 0);
274         verify(getMockedBackoffStrategy()).getSecondsUntilRetry(anyInt());
275     }
276
277     @Test
278     public void whenTheSseRequestCompletesWithoutResponseThenAReconnectIsScheduledAndTheListenersAreNotified()
279             throws Exception {
280         // given:
281         setUpRunningConnection();
282
283         Result result = mock(Result.class);
284
285         // when:
286         getRegisteredCompleteListener().onComplete(result);
287
288         // then:
289         verify(getMockedScheduler(), times(2)).schedule(ArgumentMatchers.<Runnable> any(), anyLong(), any());
290         verify(getMockedSseListener()).onConnectionError(ConnectionError.SSE_STREAM_ENDED, 0);
291         verify(getMockedBackoffStrategy()).getSecondsUntilRetry(anyInt());
292     }
293
294     @Test
295     public void whenTheSseRequestCompletesWithASuccessfulResponseThenAReconnectIsScheduledAndTheListenersAreNotified()
296             throws Exception {
297         // given:
298         setUpRunningConnection();
299
300         Response response = mock(Response.class);
301         when(response.getStatus()).thenReturn(200);
302
303         Result result = mock(Result.class);
304         when(result.getResponse()).thenReturn(response);
305
306         // when:
307         getRegisteredCompleteListener().onComplete(result);
308
309         // then:
310         verify(getMockedScheduler(), times(2)).schedule(ArgumentMatchers.<Runnable> any(), anyLong(), any());
311         verify(getMockedSseListener()).onConnectionError(ConnectionError.SSE_STREAM_ENDED, 0);
312         verify(getMockedBackoffStrategy()).getSecondsUntilRetry(anyInt());
313     }
314
315     @Test
316     public void whenTheSseRequestCompletesWithAnAuthorizationFailedResponseThenTheListenersAreNotified()
317             throws Exception {
318         // given:
319         setUpRunningConnection();
320
321         Response response = mock(Response.class);
322         when(response.getStatus()).thenReturn(401);
323
324         Result result = mock(Result.class);
325         when(result.getResponse()).thenReturn(response);
326
327         // when:
328         getRegisteredCompleteListener().onComplete(result);
329
330         // then:
331         verify(getMockedScheduler()).schedule(ArgumentMatchers.<Runnable> any(), anyLong(), any());
332         verify(getMockedSseListener()).onConnectionError(ConnectionError.AUTHORIZATION_FAILED, 0);
333     }
334
335     @Test
336     public void whenTheSseRequestCompletesWithATooManyRequestsResponseWithoutRetryAfterHeaderThenAReconnectIsScheduledAccordingToTheBackoffStrategyAndTheListenersAreNotified()
337             throws Exception {
338         // given:
339         setUpRunningConnection();
340
341         Response response = mock(Response.class);
342         when(response.getStatus()).thenReturn(429);
343         when(response.getHeaders()).thenReturn(new HttpFields());
344
345         Result result = mock(Result.class);
346         when(result.getResponse()).thenReturn(response);
347
348         // when:
349         getRegisteredCompleteListener().onComplete(result);
350
351         // then:
352         verify(getMockedScheduler(), times(2)).schedule(ArgumentMatchers.<Runnable> any(), anyLong(), any());
353         verify(getMockedScheduler()).schedule(ArgumentMatchers.<Runnable> any(), eq(10L), eq(TimeUnit.SECONDS));
354         verify(getMockedSseListener()).onConnectionError(ConnectionError.TOO_MANY_RERQUESTS, 0);
355         verify(getMockedBackoffStrategy()).getSecondsUntilRetry(anyInt());
356     }
357
358     @Test
359     public void whenTheSseRequestCompletesWithATooManyRequestsResponseWithRetryAfterHeaderThenAReconnectIsScheduledAndTheListenersAreNotified()
360             throws Exception {
361         // given:
362         setUpRunningConnection();
363
364         Response response = mock(Response.class);
365         when(response.getStatus()).thenReturn(429);
366         HttpFields httpFields = new HttpFields();
367         httpFields.add("Retry-After", "3600");
368         when(response.getHeaders()).thenReturn(httpFields);
369
370         Result result = mock(Result.class);
371         when(result.getResponse()).thenReturn(response);
372
373         // when:
374         getRegisteredCompleteListener().onComplete(result);
375
376         // then:
377         verify(getMockedScheduler()).schedule(ArgumentMatchers.<Runnable> any(), eq(3600L), eq(TimeUnit.SECONDS));
378         verify(getMockedSseListener()).onConnectionError(ConnectionError.TOO_MANY_RERQUESTS, 0);
379     }
380
381     @Test
382     public void whenTheSseRequestCompletesWithATooManyRequestsResponseWithRetryAfterHeaderWithTooLowValueThenAReconnectIsScheduledWithTheMinimumWaitTime()
383             throws Exception {
384         // given:
385         setUpRunningConnection();
386
387         Response response = mock(Response.class);
388         when(response.getStatus()).thenReturn(429);
389         HttpFields httpFields = new HttpFields();
390         httpFields.add("Retry-After", "1");
391         when(response.getHeaders()).thenReturn(httpFields);
392
393         Result result = mock(Result.class);
394         when(result.getResponse()).thenReturn(response);
395
396         // when:
397         getRegisteredCompleteListener().onComplete(result);
398
399         // then:
400         verify(getMockedScheduler()).schedule(ArgumentMatchers.<Runnable> any(), eq(5L), eq(TimeUnit.SECONDS));
401         verify(getMockedSseListener()).onConnectionError(ConnectionError.TOO_MANY_RERQUESTS, 0);
402     }
403
404     @Test
405     public void whenTheSseRequestCompletesWithATooManyRequestsResponseWithRetryAfterHeaderWithTooHighValueThenAReconnectIsScheduledWithTheMaximumWaitTime()
406             throws Exception {
407         // given:
408         setUpRunningConnection();
409
410         Response response = mock(Response.class);
411         when(response.getStatus()).thenReturn(429);
412         HttpFields httpFields = new HttpFields();
413         httpFields.add("Retry-After", "3601");
414         when(response.getHeaders()).thenReturn(httpFields);
415
416         Result result = mock(Result.class);
417         when(result.getResponse()).thenReturn(response);
418
419         // when:
420         getRegisteredCompleteListener().onComplete(result);
421
422         // then:
423         verify(getMockedScheduler()).schedule(ArgumentMatchers.<Runnable> any(), eq(3600L), eq(TimeUnit.SECONDS));
424         verify(getMockedSseListener()).onConnectionError(ConnectionError.TOO_MANY_RERQUESTS, 0);
425     }
426
427     @Test
428     public void whenTheSseRequestCompletesWithAnInternalServerErrorResponseThenAReconnectIsScheduledAndTheListenersAreNotified()
429             throws Exception {
430         // given:
431         setUpRunningConnection();
432
433         Response response = mock(Response.class);
434         when(response.getStatus()).thenReturn(500);
435
436         Result result = mock(Result.class);
437         when(result.getResponse()).thenReturn(response);
438
439         // when:
440         getRegisteredCompleteListener().onComplete(result);
441
442         // then:
443         verify(getMockedScheduler(), times(2)).schedule(ArgumentMatchers.<Runnable> any(), anyLong(), any());
444         verify(getMockedSseListener()).onConnectionError(ConnectionError.SERVER_ERROR, 0);
445     }
446
447     @Test
448     public void whenTheSseRequestCompletesWithAnInternalServerErrorResponseMultipleTimesThenTheConnectionFailedCounterIsIncrementedEachTime()
449             throws Exception {
450         // given:
451         setUpRunningConnection();
452
453         Response response = mock(Response.class);
454         when(response.getStatus()).thenReturn(500);
455
456         Result result = mock(Result.class);
457         when(result.getResponse()).thenReturn(response);
458
459         // when:
460         getRegisteredCompleteListener().onComplete(result);
461         getRegisteredCompleteListener().onComplete(result);
462
463         // then:
464         verify(getMockedSseListener()).onConnectionError(ConnectionError.SERVER_ERROR, 0);
465         verify(getMockedSseListener()).onConnectionError(ConnectionError.SERVER_ERROR, 1);
466     }
467
468     @Test
469     public void whenTheSseRequestCompletesWithAnUnknownErrorResponseThenAReconnectIsScheduledAndTheListenersAreNotified()
470             throws Exception {
471         // given:
472         setUpRunningConnection();
473
474         Response response = mock(Response.class);
475         when(response.getStatus()).thenReturn(600);
476
477         Result result = mock(Result.class);
478         when(result.getResponse()).thenReturn(response);
479
480         // when:
481         getRegisteredCompleteListener().onComplete(result);
482
483         // then:
484         verify(getMockedScheduler(), times(2)).schedule(ArgumentMatchers.<Runnable> any(), anyLong(), any());
485         verify(getMockedSseListener()).onConnectionError(ConnectionError.OTHER_HTTP_ERROR, 0);
486         verify(getMockedBackoffStrategy()).getSecondsUntilRetry(anyInt());
487     }
488
489     @Test
490     public void whenAServerSentEventIsReceivedThenItIsForwardedToTheListenersAndTheFailedConnectionCounterIsReset()
491             throws Exception {
492         // given:
493         Request request = mockRequest();
494         SseRequestFactory sseRequestFactory = mockSseRequestFactory(request);
495         ScheduledExecutorService scheduler = mockScheduler();
496
497         BackoffStrategy backoffStrategy = mock(BackoffStrategy.class);
498         when(backoffStrategy.getSecondsUntilRetry(anyInt())).thenReturn(10L);
499
500         SseConnection sseConnection = new SseConnection(URL, sseRequestFactory, scheduler, backoffStrategy);
501         SseListener sseListener = mock(SseListener.class);
502         sseConnection.addSseListener(sseListener);
503         setPrivate(sseConnection, "failedConnectionAttempts", 10);
504         sseConnection.connect();
505
506         HeadersListener headersListener = registeredHeadersListener;
507         assertNotNull(headersListener);
508         headersListener.onHeaders(null);
509
510         ServerSentEvent serverSentEvent = new ServerSentEvent("ping", "ping");
511
512         // when:
513         invokePrivate(sseConnection, "onServerSentEvent", serverSentEvent);
514
515         // then:
516         verify(sseListener).onServerSentEvent(serverSentEvent);
517         assertEquals(0, (int) getPrivate(sseConnection, "failedConnectionAttempts"));
518     }
519
520     @Test
521     public void whenTheSseStreamIsDisconnectedThenTheRunningRequestIsAborted() throws Exception {
522         // given:
523         setUpRunningConnection();
524
525         // when:
526         getSseConnection().disconnect();
527
528         // then:
529         verify(getMockedRequest()).abort(any());
530         assertNull(getPrivate(getSseConnection(), "sseRequest"));
531     }
532
533     @Test
534     public void whenTheSseStreamIsDisconnectedThenTheConnectionIsClosedAndNoReconnectIsScheduledAndTheListenersAreNotNotified()
535             throws Exception {
536         // given:
537         setUpRunningConnection();
538
539         // when:
540         getSseConnection().disconnect();
541         invokePrivate(getSseConnection(), "onSseStreamClosed", new Class[] { Throwable.class },
542                 new MieleWebserviceDisconnectSseException());
543
544         // then:
545         verify(getMockedScheduler()).schedule(ArgumentMatchers.<Runnable> any(), anyLong(), any());
546         verifyNoMoreInteractions(getMockedScheduler());
547         verifyNoInteractions(getMockedSseListener());
548     }
549
550     @Test
551     public void whenAPendingReconnectAttemptIsPerformedAfterTheSseConnectionWasDisconnectedThenTheConnectionIsNotRestored()
552             throws Exception {
553         // given:
554         setUpRunningConnection();
555         getSseConnection().disconnect();
556
557         // when:
558         invokePrivate(getSseConnection(), "connectInternal");
559
560         // then:
561         verify(getMockedScheduler()).schedule(ArgumentMatchers.<Runnable> any(), anyLong(), any());
562         verifyNoMoreInteractions(getMockedScheduler());
563         verifyNoInteractions(getMockedSseListener());
564     }
565
566     @Test
567     public void whenTheSseConnectionIsConnectedMultipleTimesWithoutDisconnectingThenOnlyTheFirstConnectResultsInAnConnectionAttempt()
568             throws Exception {
569         // given:
570         Request request = mockRequest();
571         SseRequestFactory sseRequestFactory = mockSseRequestFactory(request);
572         ScheduledExecutorService scheduler = mockScheduler();
573         SseConnection sseConnection = new SseConnection(URL, sseRequestFactory, scheduler);
574         sseConnection.connect();
575
576         // when:
577         sseConnection.connect();
578
579         // then:
580         verify(request, times(1)).onResponseHeaders(any());
581     }
582
583     @Test
584     public void whenTheSseConnectionIsDisconnectedMultipleTimesWithoutConnectingAgainThenOnlyTheFirstDisconnectIsPerformed()
585             throws Exception {
586         // given:
587         Request request = mockRequest();
588         SseRequestFactory sseRequestFactory = mockSseRequestFactory(request);
589         ScheduledExecutorService scheduler = mockScheduler();
590         SseConnection sseConnection = new SseConnection(URL, sseRequestFactory, scheduler);
591         sseConnection.connect();
592         sseConnection.disconnect();
593
594         // when:
595         sseConnection.disconnect();
596
597         // then:
598         verify(request, times(1)).abort(any());
599     }
600 }