2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.homeconnect.internal.client;
15 import static java.time.LocalDateTime.now;
16 import static org.openhab.binding.homeconnect.internal.client.model.EventType.*;
18 import java.time.Instant;
19 import java.time.LocalDateTime;
20 import java.time.ZonedDateTime;
21 import java.time.temporal.ChronoUnit;
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.Optional;
25 import java.util.TimeZone;
26 import java.util.concurrent.ScheduledExecutorService;
27 import java.util.concurrent.ScheduledFuture;
28 import java.util.concurrent.TimeUnit;
30 import javax.ws.rs.InternalServerErrorException;
31 import javax.ws.rs.NotAuthorizedException;
32 import javax.ws.rs.sse.InboundSseEvent;
34 import org.eclipse.jdt.annotation.NonNullByDefault;
35 import org.eclipse.jdt.annotation.Nullable;
36 import org.eclipse.jetty.http.HttpStatus;
37 import org.openhab.binding.homeconnect.internal.client.listener.HomeConnectEventListener;
38 import org.openhab.binding.homeconnect.internal.client.model.Event;
39 import org.openhab.binding.homeconnect.internal.client.model.EventHandling;
40 import org.openhab.binding.homeconnect.internal.client.model.EventLevel;
41 import org.openhab.binding.homeconnect.internal.client.model.EventType;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
45 import com.google.gson.JsonArray;
46 import com.google.gson.JsonObject;
49 * Event source listener (Server-Sent-Events).
51 * @author Jonas BrĂ¼stel - Initial contribution
52 * @author Laurent Garnier - Replace okhttp SSE by JAX-RS SSE
56 public class HomeConnectEventSourceListener {
57 private static final String EMPTY_DATA = "\"\"";
58 private static final int SSE_MONITOR_INITIAL_DELAY_MIN = 1;
59 private static final int SSE_MONITOR_INTERVAL_MIN = 5;
60 private static final int SSE_MONITOR_BROKEN_CONNECTION_TIMEOUT_MIN = 3;
62 private final String haId;
63 private final HomeConnectEventListener eventListener;
64 private final HomeConnectEventSourceClient client;
65 private final Logger logger = LoggerFactory.getLogger(HomeConnectEventSourceListener.class);
66 private final ScheduledFuture<?> eventSourceMonitorFuture;
67 private final CircularQueue<Event> eventQueue;
68 private final ScheduledExecutorService scheduledExecutorService;
70 private @Nullable LocalDateTime lastEventReceived;
72 public HomeConnectEventSourceListener(String haId, final HomeConnectEventListener eventListener,
73 final HomeConnectEventSourceClient client, final ScheduledExecutorService scheduler,
74 CircularQueue<Event> eventQueue) {
76 this.eventListener = eventListener;
78 this.eventQueue = eventQueue;
79 this.scheduledExecutorService = scheduler;
81 eventSourceMonitorFuture = createMonitor(scheduler);
84 public void onEvent(InboundSseEvent inboundEvent) {
85 String id = inboundEvent.getId();
86 String type = inboundEvent.getName();
87 String data = inboundEvent.readData();
89 lastEventReceived = now();
91 EventType eventType = valueOfType(type);
92 if (eventType != null) {
93 mapEventSourceEventToEvent(haId, eventType, data).forEach(event -> {
94 eventQueue.add(event);
95 logger.debug("Received event ({}): {}", haId, event);
97 eventListener.onEvent(event);
98 } catch (Exception e) {
99 logger.error("Could not publish event to Listener!", e);
103 logger.warn("Received unknown event source type! haId={}, id={}, type={}, data={}", haId, id, type, data);
107 public void onComplete() {
108 logger.debug("Event source listener channel closed ({}).", haId);
110 client.unregisterEventListener(eventListener, true);
113 eventListener.onClosed();
114 } catch (Exception e) {
115 logger.error("Could not publish closed event to listener ({})!", haId, e);
120 public void onError(Throwable error) {
121 String throwableMessage = error.getMessage();
122 String throwableClass = error.getClass().getName();
124 logger.debug("Event source listener connection failure occurred. haId={}, throwable={}, throwableMessage={}",
125 haId, throwableClass, throwableMessage);
127 client.unregisterEventListener(eventListener);
130 if (throwableMessage != null
131 && throwableMessage.contains(String.valueOf(HttpStatus.TOO_MANY_REQUESTS_429))) {
133 "More than 10 active event monitoring channels was reached. Further event monitoring requests are blocked. haId={}",
135 eventListener.onRateLimitReached();
137 // The SSE connection is closed by the server every 24 hours.
138 // When you try to reconnect, it often fails with a NotAuthorizedException (401) for the next few
139 // seconds. So we wait few seconds before trying again.
140 if (error instanceof NotAuthorizedException) {
142 "Event source listener connection failure due to unauthorized exception : wait 20 seconds... haId={}",
144 scheduledExecutorService.schedule(() -> eventListener.onClosed(), 20, TimeUnit.SECONDS);
145 } else if (error instanceof InternalServerErrorException) {
147 "Event source listener connection failure due to internal server exception : wait 2 seconds... haId={}",
149 scheduledExecutorService.schedule(() -> eventListener.onClosed(), 2, TimeUnit.SECONDS);
151 eventListener.onClosed();
154 } catch (Exception e) {
155 logger.error("Could not publish closed event to listener ({})!", haId, e);
160 private ScheduledFuture<?> createMonitor(ScheduledExecutorService scheduler) {
161 return scheduler.scheduleWithFixedDelay(() -> {
162 logger.trace("Check event source connection ({}). Last event package received at {}.", haId,
164 if (lastEventReceived != null && ChronoUnit.MINUTES.between(lastEventReceived,
165 now()) > SSE_MONITOR_BROKEN_CONNECTION_TIMEOUT_MIN) {
166 logger.warn("Dead event source connection detected ({}).", haId);
168 client.unregisterEventListener(eventListener);
171 eventListener.onClosed();
172 } catch (Exception e) {
173 logger.error("Could not publish closed event to listener ({})!", haId, e);
177 }, SSE_MONITOR_INITIAL_DELAY_MIN, SSE_MONITOR_INTERVAL_MIN, TimeUnit.MINUTES);
180 public void stopMonitor() {
181 if (!eventSourceMonitorFuture.isDone()) {
182 logger.debug("Dispose event source connection monitor of appliance ({}).", haId);
183 eventSourceMonitorFuture.cancel(true);
187 private List<Event> mapEventSourceEventToEvent(String haId, EventType type, @Nullable String data) {
188 List<Event> events = new ArrayList<>();
190 if ((STATUS.equals(type) || EVENT.equals(type) || NOTIFY.equals(type)) && data != null && !data.trim().isEmpty()
191 && !EMPTY_DATA.equals(data)) {
193 JsonObject responseObject = HttpHelper.parseString(data).getAsJsonObject();
194 JsonArray items = responseObject.getAsJsonArray("items");
196 items.forEach(item -> {
197 JsonObject obj = (JsonObject) item;
198 String key = getJsonElementAsString(obj, "key").orElse(null);
199 String value = getJsonElementAsString(obj, "value").orElse(null);
200 String unit = getJsonElementAsString(obj, "unit").orElse(null);
201 String name = getJsonElementAsString(obj, "name").orElse(null);
202 String uri = getJsonElementAsString(obj, "uri").orElse(null);
203 EventLevel level = getJsonElementAsString(obj, "level").map(EventLevel::valueOfLevel).orElse(null);
204 EventHandling handling = getJsonElementAsString(obj, "handling").map(EventHandling::valueOfHandling)
206 ZonedDateTime creation = getJsonElementAsLong(obj, "timestamp").map(timestamp -> ZonedDateTime
207 .ofInstant(Instant.ofEpochSecond(timestamp), TimeZone.getDefault().toZoneId()))
208 .orElse(ZonedDateTime.now());
210 events.add(new Event(haId, type, key, name, uri, creation, level, handling, value, unit));
212 } catch (IllegalStateException e) {
213 logger.error("Could not parse event! haId={}, error={}", haId, e.getMessage());
216 events.add(new Event(haId, type));
222 private Optional<Long> getJsonElementAsLong(JsonObject jsonObject, String elementName) {
223 var element = jsonObject.get(elementName);
224 return element == null || element.isJsonNull() ? Optional.empty() : Optional.of(element.getAsLong());
227 private Optional<String> getJsonElementAsString(JsonObject jsonObject, String elementName) {
228 var element = jsonObject.get(elementName);
229 return element == null || element.isJsonNull() ? Optional.empty() : Optional.of(element.getAsString());