]> git.basschouten.com Git - openhab-addons.git/blob
e10ff0730f26046ba17ead4644c3c02c27ff59aa
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.homeconnect.internal.client;
14
15 import static java.time.LocalDateTime.now;
16 import static org.openhab.binding.homeconnect.internal.client.model.EventType.*;
17
18 import java.time.Instant;
19 import java.time.LocalDateTime;
20 import java.time.ZonedDateTime;
21 import java.time.temporal.ChronoUnit;
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.Optional;
25 import java.util.TimeZone;
26 import java.util.concurrent.ScheduledExecutorService;
27 import java.util.concurrent.ScheduledFuture;
28 import java.util.concurrent.TimeUnit;
29
30 import javax.ws.rs.InternalServerErrorException;
31 import javax.ws.rs.NotAuthorizedException;
32 import javax.ws.rs.sse.InboundSseEvent;
33
34 import org.eclipse.jdt.annotation.NonNullByDefault;
35 import org.eclipse.jdt.annotation.Nullable;
36 import org.eclipse.jetty.http.HttpStatus;
37 import org.openhab.binding.homeconnect.internal.client.listener.HomeConnectEventListener;
38 import org.openhab.binding.homeconnect.internal.client.model.Event;
39 import org.openhab.binding.homeconnect.internal.client.model.EventHandling;
40 import org.openhab.binding.homeconnect.internal.client.model.EventLevel;
41 import org.openhab.binding.homeconnect.internal.client.model.EventType;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 import com.google.gson.JsonArray;
46 import com.google.gson.JsonObject;
47
48 /**
49  * Event source listener (Server-Sent-Events).
50  *
51  * @author Jonas BrĂ¼stel - Initial contribution
52  * @author Laurent Garnier - Replace okhttp SSE by JAX-RS SSE
53  *
54  */
55 @NonNullByDefault
56 public class HomeConnectEventSourceListener {
57     private static final String EMPTY_DATA = "\"\"";
58     private static final int SSE_MONITOR_INITIAL_DELAY_MIN = 1;
59     private static final int SSE_MONITOR_INTERVAL_MIN = 5;
60     private static final int SSE_MONITOR_BROKEN_CONNECTION_TIMEOUT_MIN = 3;
61
62     private final String haId;
63     private final HomeConnectEventListener eventListener;
64     private final HomeConnectEventSourceClient client;
65     private final Logger logger = LoggerFactory.getLogger(HomeConnectEventSourceListener.class);
66     private final ScheduledFuture<?> eventSourceMonitorFuture;
67     private final CircularQueue<Event> eventQueue;
68     private final ScheduledExecutorService scheduledExecutorService;
69
70     private @Nullable LocalDateTime lastEventReceived;
71
72     public HomeConnectEventSourceListener(String haId, final HomeConnectEventListener eventListener,
73             final HomeConnectEventSourceClient client, final ScheduledExecutorService scheduler,
74             CircularQueue<Event> eventQueue) {
75         this.haId = haId;
76         this.eventListener = eventListener;
77         this.client = client;
78         this.eventQueue = eventQueue;
79         this.scheduledExecutorService = scheduler;
80
81         eventSourceMonitorFuture = createMonitor(scheduler);
82     }
83
84     public void onEvent(InboundSseEvent inboundEvent) {
85         String id = inboundEvent.getId();
86         String type = inboundEvent.getName();
87         String data = inboundEvent.readData();
88
89         lastEventReceived = now();
90
91         EventType eventType = valueOfType(type);
92         if (eventType != null) {
93             mapEventSourceEventToEvent(haId, eventType, data).forEach(event -> {
94                 eventQueue.add(event);
95                 logger.debug("Received event ({}): {}", haId, event);
96                 try {
97                     eventListener.onEvent(event);
98                 } catch (Exception e) {
99                     logger.error("Could not publish event to Listener!", e);
100                 }
101             });
102         } else {
103             logger.warn("Received unknown event source type! haId={}, id={}, type={}, data={}", haId, id, type, data);
104         }
105     }
106
107     public void onComplete() {
108         logger.debug("Event source listener channel closed ({}).", haId);
109
110         client.unregisterEventListener(eventListener, true);
111
112         try {
113             eventListener.onClosed();
114         } catch (Exception e) {
115             logger.error("Could not publish closed event to listener ({})!", haId, e);
116         }
117         stopMonitor();
118     }
119
120     public void onError(Throwable error) {
121         String throwableMessage = error.getMessage();
122         String throwableClass = error.getClass().getName();
123
124         logger.debug("Event source listener connection failure occurred. haId={}, throwable={}, throwableMessage={}",
125                 haId, throwableClass, throwableMessage);
126
127         client.unregisterEventListener(eventListener);
128
129         try {
130             if (throwableMessage != null
131                     && throwableMessage.contains(String.valueOf(HttpStatus.TOO_MANY_REQUESTS_429))) {
132                 logger.warn(
133                         "More than 10 active event monitoring channels was reached. Further event monitoring requests are blocked. haId={}",
134                         haId);
135                 eventListener.onRateLimitReached();
136             } else {
137                 // The SSE connection is closed by the server every 24 hours.
138                 // When you try to reconnect, it often fails with a NotAuthorizedException (401) for the next few
139                 // seconds. So we wait few seconds before trying again.
140                 if (error instanceof NotAuthorizedException) {
141                     logger.debug(
142                             "Event source listener connection failure due to unauthorized exception : wait 20 seconds... haId={}",
143                             haId);
144                     scheduledExecutorService.schedule(() -> eventListener.onClosed(), 20, TimeUnit.SECONDS);
145                 } else if (error instanceof InternalServerErrorException) {
146                     logger.debug(
147                             "Event source listener connection failure due to internal server exception : wait 2 seconds... haId={}",
148                             haId);
149                     scheduledExecutorService.schedule(() -> eventListener.onClosed(), 2, TimeUnit.SECONDS);
150                 } else {
151                     eventListener.onClosed();
152                 }
153             }
154         } catch (Exception e) {
155             logger.error("Could not publish closed event to listener ({})!", haId, e);
156         }
157         stopMonitor();
158     }
159
160     private ScheduledFuture<?> createMonitor(ScheduledExecutorService scheduler) {
161         return scheduler.scheduleWithFixedDelay(() -> {
162             logger.trace("Check event source connection ({}). Last event package received at {}.", haId,
163                     lastEventReceived);
164             if (lastEventReceived != null && ChronoUnit.MINUTES.between(lastEventReceived,
165                     now()) > SSE_MONITOR_BROKEN_CONNECTION_TIMEOUT_MIN) {
166                 logger.warn("Dead event source connection detected ({}).", haId);
167
168                 client.unregisterEventListener(eventListener);
169
170                 try {
171                     eventListener.onClosed();
172                 } catch (Exception e) {
173                     logger.error("Could not publish closed event to listener ({})!", haId, e);
174                 }
175                 stopMonitor();
176             }
177         }, SSE_MONITOR_INITIAL_DELAY_MIN, SSE_MONITOR_INTERVAL_MIN, TimeUnit.MINUTES);
178     }
179
180     public void stopMonitor() {
181         if (!eventSourceMonitorFuture.isDone()) {
182             logger.debug("Dispose event source connection monitor of appliance ({}).", haId);
183             eventSourceMonitorFuture.cancel(true);
184         }
185     }
186
187     private List<Event> mapEventSourceEventToEvent(String haId, EventType type, @Nullable String data) {
188         List<Event> events = new ArrayList<>();
189
190         if ((STATUS.equals(type) || EVENT.equals(type) || NOTIFY.equals(type)) && data != null && !data.trim().isEmpty()
191                 && !EMPTY_DATA.equals(data)) {
192             try {
193                 JsonObject responseObject = HttpHelper.parseString(data).getAsJsonObject();
194                 JsonArray items = responseObject.getAsJsonArray("items");
195
196                 items.forEach(item -> {
197                     JsonObject obj = (JsonObject) item;
198                     String key = getJsonElementAsString(obj, "key").orElse(null);
199                     String value = getJsonElementAsString(obj, "value").orElse(null);
200                     String unit = getJsonElementAsString(obj, "unit").orElse(null);
201                     String name = getJsonElementAsString(obj, "name").orElse(null);
202                     String uri = getJsonElementAsString(obj, "uri").orElse(null);
203                     EventLevel level = getJsonElementAsString(obj, "level").map(EventLevel::valueOfLevel).orElse(null);
204                     EventHandling handling = getJsonElementAsString(obj, "handling").map(EventHandling::valueOfHandling)
205                             .orElse(null);
206                     ZonedDateTime creation = getJsonElementAsLong(obj, "timestamp").map(timestamp -> ZonedDateTime
207                             .ofInstant(Instant.ofEpochSecond(timestamp), TimeZone.getDefault().toZoneId()))
208                             .orElse(ZonedDateTime.now());
209
210                     events.add(new Event(haId, type, key, name, uri, creation, level, handling, value, unit));
211                 });
212             } catch (IllegalStateException e) {
213                 logger.error("Could not parse event! haId={}, error={}", haId, e.getMessage());
214             }
215         } else {
216             events.add(new Event(haId, type));
217         }
218
219         return events;
220     }
221
222     private Optional<Long> getJsonElementAsLong(JsonObject jsonObject, String elementName) {
223         var element = jsonObject.get(elementName);
224         return element == null || element.isJsonNull() ? Optional.empty() : Optional.of(element.getAsLong());
225     }
226
227     private Optional<String> getJsonElementAsString(JsonObject jsonObject, String elementName) {
228         var element = jsonObject.get(elementName);
229         return element == null || element.isJsonNull() ? Optional.empty() : Optional.of(element.getAsString());
230     }
231 }