2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.mielecloud.internal.webservice.sse;
15 import static org.mockito.ArgumentMatchers.*;
16 import static org.mockito.Mockito.*;
18 import java.io.ByteArrayInputStream;
19 import java.io.IOException;
20 import java.io.InputStream;
21 import java.nio.charset.StandardCharsets;
22 import java.util.concurrent.TimeoutException;
23 import java.util.function.Consumer;
25 import org.eclipse.jdt.annotation.NonNullByDefault;
26 import org.eclipse.jdt.annotation.Nullable;
27 import org.junit.jupiter.api.Test;
28 import org.junit.jupiter.api.extension.ExtendWith;
29 import org.mockito.Mock;
30 import org.mockito.junit.jupiter.MockitoExtension;
31 import org.openhab.binding.mielecloud.internal.webservice.exception.MieleWebserviceDisconnectSseException;
34 * @author Björn Lange - Initial contribution
36 @ExtendWith(MockitoExtension.class)
38 public class SseStreamParserTest {
41 private Consumer<ServerSentEvent> serverSentEventCallback;
45 private Consumer<@Nullable Throwable> streamClosedCallback;
47 private InputStream getInputStreamReadingUtf8Data(String data) {
48 return new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
52 public void whenNoEventIsProvidedThenTheStreamClosedCallbackIsInvoked() {
54 InputStream inputStream = getInputStreamReadingUtf8Data("");
56 SseStreamParser parser = new SseStreamParser(inputStream, serverSentEventCallback, streamClosedCallback);
59 parser.parseAndDispatchEvents();
62 verify(streamClosedCallback).accept(null);
63 verifyNoMoreInteractions(streamClosedCallback);
64 verifyNoInteractions(serverSentEventCallback);
68 public void whenNoEventAndOnlyWhitespaceIsProvidedThenTheStreamClosedCallbackIsInvoked() {
70 InputStream inputStream = getInputStreamReadingUtf8Data("\r\n");
72 SseStreamParser parser = new SseStreamParser(inputStream, serverSentEventCallback, streamClosedCallback);
75 parser.parseAndDispatchEvents();
78 verify(streamClosedCallback).accept(null);
79 verifyNoMoreInteractions(streamClosedCallback);
80 verifyNoInteractions(serverSentEventCallback);
84 public void whenAnEventIsProvidedThenItIsPassedToTheCallback() {
86 InputStream inputStream = getInputStreamReadingUtf8Data("event: ping\r\ndata: pong\r\n");
88 SseStreamParser parser = new SseStreamParser(inputStream, serverSentEventCallback, streamClosedCallback);
91 parser.parseAndDispatchEvents();
94 verify(streamClosedCallback).accept(null);
95 verify(serverSentEventCallback).accept(new ServerSentEvent("ping", "pong"));
96 verifyNoMoreInteractions(streamClosedCallback, serverSentEventCallback);
100 public void whenALineWithInvalidKeyIsProvidedThenItIsIgnored() {
102 InputStream inputStream = getInputStreamReadingUtf8Data("name: ping\r\n");
104 SseStreamParser parser = new SseStreamParser(inputStream, serverSentEventCallback, streamClosedCallback);
107 parser.parseAndDispatchEvents();
110 verify(streamClosedCallback).accept(null);
111 verifyNoMoreInteractions(streamClosedCallback);
112 verifyNoInteractions(serverSentEventCallback);
116 public void whenDataWithoutEventIsProvidedThenItIsIgnored() {
118 InputStream inputStream = getInputStreamReadingUtf8Data("data: ping\r\n");
120 SseStreamParser parser = new SseStreamParser(inputStream, serverSentEventCallback, streamClosedCallback);
123 parser.parseAndDispatchEvents();
126 verify(streamClosedCallback).accept(null);
127 verifyNoMoreInteractions(streamClosedCallback);
128 verifyNoInteractions(serverSentEventCallback);
132 public void whenTheEventStreamBreaksThenTheStreamClosedCallbackIsNotifiedWithTheCause() throws IOException {
134 InputStream inputStream = mock(InputStream.class);
135 TimeoutException timeoutException = new TimeoutException();
136 when(inputStream.read(any(), anyInt(), anyInt())).thenThrow(new IOException(timeoutException));
138 SseStreamParser parser = new SseStreamParser(inputStream, serverSentEventCallback, streamClosedCallback);
141 parser.parseAndDispatchEvents();
144 verify(streamClosedCallback).accept(timeoutException);
145 verifyNoMoreInteractions(streamClosedCallback);
146 verifyNoInteractions(serverSentEventCallback);
150 public void whenTheEventStreamBreaksBecauseOfAnSseDisconnectThenTheStreamCloseCallbackIsNotNotifiedToPreventSseReconnect()
153 InputStream inputStream = mock(InputStream.class);
154 when(inputStream.read(any(), anyInt(), anyInt()))
155 .thenThrow(new IOException(new MieleWebserviceDisconnectSseException()));
157 SseStreamParser parser = new SseStreamParser(inputStream, serverSentEventCallback, streamClosedCallback);
160 parser.parseAndDispatchEvents();
163 verifyNoInteractions(streamClosedCallback, serverSentEventCallback);
167 public void whenTheEventStreamBreaksAndTheResourceCleanupFailsThenItIsIgnored() throws IOException {
169 InputStream inputStream = mock(InputStream.class);
170 when(inputStream.read(any(), anyInt(), anyInt()))
171 .thenThrow(new IOException(new MieleWebserviceDisconnectSseException()));
172 doThrow(new IOException()).when(inputStream).close();
174 SseStreamParser parser = new SseStreamParser(inputStream, serverSentEventCallback, streamClosedCallback);
177 parser.parseAndDispatchEvents();
180 verifyNoInteractions(streamClosedCallback, serverSentEventCallback);