]> git.basschouten.com Git - openhab-addons.git/blob
41915562c4690b5b2c8800dbb831ee11c5cc5cb2
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mielecloud.internal.webservice.sse;
14
15 import java.io.BufferedReader;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.io.InputStreamReader;
19 import java.nio.charset.StandardCharsets;
20 import java.util.function.Consumer;
21
22 import org.eclipse.jdt.annotation.NonNullByDefault;
23 import org.eclipse.jdt.annotation.Nullable;
24 import org.openhab.binding.mielecloud.internal.webservice.exception.MieleWebserviceDisconnectSseException;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 /**
29  * Parses events from the SSE event stream and emits them via the given dispatcher.
30  *
31  * @author Björn Lange - Initial Contribution
32  */
33 @NonNullByDefault
34 class SseStreamParser {
35     private static final String SSE_KEY_EVENT = "event:";
36     private static final String SSE_KEY_DATA = "data:";
37
38     private final Logger logger = LoggerFactory.getLogger(SseStreamParser.class);
39
40     private final BufferedReader reader;
41     private final Consumer<ServerSentEvent> onServerSentEventCallback;
42     private final Consumer<@Nullable Throwable> onStreamClosedCallback;
43
44     private @Nullable String event;
45
46     SseStreamParser(InputStream inputStream, Consumer<ServerSentEvent> onServerSentEventCallback,
47             Consumer<@Nullable Throwable> onStreamClosedCallback) {
48         this.reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
49         this.onServerSentEventCallback = onServerSentEventCallback;
50         this.onStreamClosedCallback = onStreamClosedCallback;
51     }
52
53     void parseAndDispatchEvents() {
54         try {
55             String line = null;
56             while ((line = reader.readLine()) != null) {
57                 onLineReceived(line);
58             }
59
60             silentlyCloseReader();
61             logger.debug("SSE stream ended. Closing stream.");
62             onStreamClosedCallback.accept(null);
63         } catch (IOException exception) {
64             silentlyCloseReader();
65
66             if (!(exception.getCause() instanceof MieleWebserviceDisconnectSseException)) {
67                 logger.warn("SSE connection failed unexpectedly: {}", exception.getMessage());
68                 onStreamClosedCallback.accept(exception.getCause());
69             }
70         }
71         logger.debug("SSE stream closed.");
72     }
73
74     private void silentlyCloseReader() {
75         try {
76             reader.close();
77         } catch (IOException e) {
78             logger.warn("Failed to clean up SSE connection resources!", e);
79         }
80     }
81
82     private void onLineReceived(String line) {
83         if (line.isEmpty()) {
84             return;
85         }
86
87         if (line.startsWith(SSE_KEY_EVENT)) {
88             event = line.substring(SSE_KEY_EVENT.length()).trim();
89         } else if (line.startsWith(SSE_KEY_DATA)) {
90             String event = this.event;
91             String data = line.substring(SSE_KEY_DATA.length()).trim();
92
93             if (event == null) {
94                 logger.warn("Received data payload without prior event payload.");
95             } else {
96                 onServerSentEventCallback.accept(new ServerSentEvent(event, data));
97             }
98         } else {
99             logger.warn("Unable to parse line from SSE stream: {}", line);
100         }
101     }
102 }