2 * Copyright (c) 2010-2021 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.homeconnect.internal.client;
15 import static java.time.LocalDateTime.now;
16 import static org.openhab.binding.homeconnect.internal.client.model.EventType.*;
18 import java.time.Instant;
19 import java.time.LocalDateTime;
20 import java.time.ZonedDateTime;
21 import java.time.temporal.ChronoUnit;
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.Optional;
25 import java.util.TimeZone;
26 import java.util.concurrent.ScheduledExecutorService;
27 import java.util.concurrent.ScheduledFuture;
28 import java.util.concurrent.TimeUnit;
30 import javax.ws.rs.NotAuthorizedException;
31 import javax.ws.rs.sse.InboundSseEvent;
33 import org.eclipse.jdt.annotation.NonNullByDefault;
34 import org.eclipse.jdt.annotation.Nullable;
35 import org.eclipse.jetty.http.HttpStatus;
36 import org.openhab.binding.homeconnect.internal.client.listener.HomeConnectEventListener;
37 import org.openhab.binding.homeconnect.internal.client.model.Event;
38 import org.openhab.binding.homeconnect.internal.client.model.EventHandling;
39 import org.openhab.binding.homeconnect.internal.client.model.EventLevel;
40 import org.openhab.binding.homeconnect.internal.client.model.EventType;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
44 import com.google.gson.JsonArray;
45 import com.google.gson.JsonObject;
48 * Event source listener (Server-Sent-Events).
50 * @author Jonas BrĂ¼stel - Initial contribution
51 * @author Laurent Garnier - Replace okhttp SSE by JAX-RS SSE
55 public class HomeConnectEventSourceListener {
56 private static final String EMPTY_DATA = "\"\"";
57 private static final int SSE_MONITOR_INITIAL_DELAY_MIN = 1;
58 private static final int SSE_MONITOR_INTERVAL_MIN = 5;
59 private static final int SSE_MONITOR_BROKEN_CONNECTION_TIMEOUT_MIN = 3;
61 private final String haId;
62 private final HomeConnectEventListener eventListener;
63 private final HomeConnectEventSourceClient client;
64 private final Logger logger = LoggerFactory.getLogger(HomeConnectEventSourceListener.class);
65 private final ScheduledFuture<?> eventSourceMonitorFuture;
66 private final CircularQueue<Event> eventQueue;
67 private final ScheduledExecutorService scheduledExecutorService;
69 private @Nullable LocalDateTime lastEventReceived;
71 public HomeConnectEventSourceListener(String haId, final HomeConnectEventListener eventListener,
72 final HomeConnectEventSourceClient client, final ScheduledExecutorService scheduler,
73 CircularQueue<Event> eventQueue) {
75 this.eventListener = eventListener;
77 this.eventQueue = eventQueue;
78 this.scheduledExecutorService = scheduler;
80 eventSourceMonitorFuture = createMonitor(scheduler);
83 public void onEvent(InboundSseEvent inboundEvent) {
84 String id = inboundEvent.getId();
85 String type = inboundEvent.getName();
86 String data = inboundEvent.readData();
88 lastEventReceived = now();
90 EventType eventType = valueOfType(type);
91 if (eventType != null) {
92 mapEventSourceEventToEvent(haId, eventType, data).forEach(event -> {
93 eventQueue.add(event);
94 logger.debug("Received event ({}): {}", haId, event);
96 eventListener.onEvent(event);
97 } catch (Exception e) {
98 logger.error("Could not publish event to Listener!", e);
102 logger.warn("Received unknown event source type! haId={}, id={}, type={}, data={}", haId, id, type, data);
106 public void onComplete() {
107 logger.debug("Event source listener channel closed ({}).", haId);
109 client.unregisterEventListener(eventListener, true);
112 eventListener.onClosed();
113 } catch (Exception e) {
114 logger.error("Could not publish closed event to listener ({})!", haId, e);
119 public void onError(Throwable error) {
120 String throwableMessage = error.getMessage();
121 String throwableClass = error.getClass().getName();
123 logger.debug("Event source listener connection failure occurred. haId={}, throwable={}, throwableMessage={}",
124 haId, throwableClass, throwableMessage);
126 client.unregisterEventListener(eventListener);
129 if (throwableMessage != null
130 && throwableMessage.contains(String.valueOf(HttpStatus.TOO_MANY_REQUESTS_429))) {
132 "More than 10 active event monitoring channels was reached. Further event monitoring requests are blocked. haId={}",
134 eventListener.onRateLimitReached();
136 // The SSE connection is closed by the server every 24 hours.
137 // When you try to reconnect, it often fails with a NotAuthorizedException (401) for the next few
138 // seconds. So we wait few seconds before trying again.
139 if (error instanceof NotAuthorizedException) {
141 "Event source listener connection failure due to unauthorized exception : wait 10 seconds... haId={}",
143 scheduledExecutorService.schedule(() -> eventListener.onClosed(), 10, TimeUnit.SECONDS);
145 eventListener.onClosed();
148 } catch (Exception e) {
149 logger.error("Could not publish closed event to listener ({})!", haId, e);
154 private ScheduledFuture<?> createMonitor(ScheduledExecutorService scheduler) {
155 return scheduler.scheduleWithFixedDelay(() -> {
156 logger.trace("Check event source connection ({}). Last event package received at {}.", haId,
158 if (lastEventReceived != null && ChronoUnit.MINUTES.between(lastEventReceived,
159 now()) > SSE_MONITOR_BROKEN_CONNECTION_TIMEOUT_MIN) {
160 logger.warn("Dead event source connection detected ({}).", haId);
162 client.unregisterEventListener(eventListener);
165 eventListener.onClosed();
166 } catch (Exception e) {
167 logger.error("Could not publish closed event to listener ({})!", haId, e);
171 }, SSE_MONITOR_INITIAL_DELAY_MIN, SSE_MONITOR_INTERVAL_MIN, TimeUnit.MINUTES);
174 public void stopMonitor() {
175 if (!eventSourceMonitorFuture.isDone()) {
176 logger.debug("Dispose event source connection monitor of appliance ({}).", haId);
177 eventSourceMonitorFuture.cancel(true);
181 private List<Event> mapEventSourceEventToEvent(String haId, EventType type, @Nullable String data) {
182 List<Event> events = new ArrayList<>();
184 if ((STATUS.equals(type) || EVENT.equals(type) || NOTIFY.equals(type)) && data != null && !data.trim().isEmpty()
185 && !EMPTY_DATA.equals(data)) {
187 JsonObject responseObject = HttpHelper.parseString(data).getAsJsonObject();
188 JsonArray items = responseObject.getAsJsonArray("items");
190 items.forEach(item -> {
191 JsonObject obj = (JsonObject) item;
192 String key = getJsonElementAsString(obj, "key").orElse(null);
193 String value = getJsonElementAsString(obj, "value").orElse(null);
194 String unit = getJsonElementAsString(obj, "unit").orElse(null);
195 String name = getJsonElementAsString(obj, "name").orElse(null);
196 String uri = getJsonElementAsString(obj, "uri").orElse(null);
197 EventLevel level = getJsonElementAsString(obj, "level").map(EventLevel::valueOfLevel).orElse(null);
198 EventHandling handling = getJsonElementAsString(obj, "handling").map(EventHandling::valueOfHandling)
200 ZonedDateTime creation = getJsonElementAsLong(obj, "timestamp").map(timestamp -> ZonedDateTime
201 .ofInstant(Instant.ofEpochSecond(timestamp), TimeZone.getDefault().toZoneId()))
202 .orElse(ZonedDateTime.now());
204 events.add(new Event(haId, type, key, name, uri, creation, level, handling, value, unit));
206 } catch (IllegalStateException e) {
207 logger.error("Could not parse event! haId={}, error={}", haId, e.getMessage());
210 events.add(new Event(haId, type));
216 private Optional<Long> getJsonElementAsLong(JsonObject jsonObject, String elementName) {
217 var element = jsonObject.get(elementName);
218 return element == null || element.isJsonNull() ? Optional.empty() : Optional.of(element.getAsLong());
221 private Optional<String> getJsonElementAsString(JsonObject jsonObject, String elementName) {
222 var element = jsonObject.get(elementName);
223 return element == null || element.isJsonNull() ? Optional.empty() : Optional.of(element.getAsString());