2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.mielecloud.internal.webservice.sse;
15 import java.io.BufferedReader;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.io.InputStreamReader;
19 import java.nio.charset.StandardCharsets;
20 import java.util.function.Consumer;
22 import org.eclipse.jdt.annotation.NonNullByDefault;
23 import org.eclipse.jdt.annotation.Nullable;
24 import org.openhab.binding.mielecloud.internal.webservice.exception.MieleWebserviceDisconnectSseException;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
29 * Parses events from the SSE event stream and emits them via the given dispatcher.
31 * @author Björn Lange - Initial Contribution
34 class SseStreamParser {
35 private static final String SSE_KEY_EVENT = "event:";
36 private static final String SSE_KEY_DATA = "data:";
38 private final Logger logger = LoggerFactory.getLogger(SseStreamParser.class);
40 private final BufferedReader reader;
41 private final Consumer<ServerSentEvent> onServerSentEventCallback;
42 private final Consumer<@Nullable Throwable> onStreamClosedCallback;
44 private @Nullable String event;
46 SseStreamParser(InputStream inputStream, Consumer<ServerSentEvent> onServerSentEventCallback,
47 Consumer<@Nullable Throwable> onStreamClosedCallback) {
48 this.reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
49 this.onServerSentEventCallback = onServerSentEventCallback;
50 this.onStreamClosedCallback = onStreamClosedCallback;
53 void parseAndDispatchEvents() {
56 while ((line = reader.readLine()) != null) {
60 silentlyCloseReader();
61 logger.debug("SSE stream ended. Closing stream.");
62 onStreamClosedCallback.accept(null);
63 } catch (IOException exception) {
64 silentlyCloseReader();
66 if (!(exception.getCause() instanceof MieleWebserviceDisconnectSseException)) {
67 logger.warn("SSE connection failed unexpectedly: {}", exception.getMessage());
68 onStreamClosedCallback.accept(exception.getCause());
71 logger.debug("SSE stream closed.");
74 private void silentlyCloseReader() {
77 } catch (IOException e) {
78 logger.warn("Failed to clean up SSE connection resources!", e);
82 private void onLineReceived(String line) {
87 if (line.startsWith(SSE_KEY_EVENT)) {
88 event = line.substring(SSE_KEY_EVENT.length()).trim();
89 } else if (line.startsWith(SSE_KEY_DATA)) {
90 String event = this.event;
91 String data = line.substring(SSE_KEY_DATA.length()).trim();
94 logger.warn("Received data payload without prior event payload.");
96 onServerSentEventCallback.accept(new ServerSentEvent(event, data));
99 logger.warn("Unable to parse line from SSE stream: {}", line);