]> git.basschouten.com Git - openhab-addons.git/blob
e33b0456835a17d84992377dec3f8634bab67b64
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2021 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.homeconnect.internal.client;
14
15 import static java.time.LocalDateTime.now;
16 import static org.openhab.binding.homeconnect.internal.client.model.EventType.*;
17
18 import java.time.Instant;
19 import java.time.LocalDateTime;
20 import java.time.ZonedDateTime;
21 import java.time.temporal.ChronoUnit;
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.Optional;
25 import java.util.TimeZone;
26 import java.util.concurrent.ScheduledExecutorService;
27 import java.util.concurrent.ScheduledFuture;
28 import java.util.concurrent.TimeUnit;
29
30 import javax.ws.rs.NotAuthorizedException;
31 import javax.ws.rs.sse.InboundSseEvent;
32
33 import org.eclipse.jdt.annotation.NonNullByDefault;
34 import org.eclipse.jdt.annotation.Nullable;
35 import org.eclipse.jetty.http.HttpStatus;
36 import org.openhab.binding.homeconnect.internal.client.listener.HomeConnectEventListener;
37 import org.openhab.binding.homeconnect.internal.client.model.Event;
38 import org.openhab.binding.homeconnect.internal.client.model.EventHandling;
39 import org.openhab.binding.homeconnect.internal.client.model.EventLevel;
40 import org.openhab.binding.homeconnect.internal.client.model.EventType;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 import com.google.gson.JsonArray;
45 import com.google.gson.JsonObject;
46
47 /**
48  * Event source listener (Server-Sent-Events).
49  *
50  * @author Jonas BrĂ¼stel - Initial contribution
51  * @author Laurent Garnier - Replace okhttp SSE by JAX-RS SSE
52  *
53  */
54 @NonNullByDefault
55 public class HomeConnectEventSourceListener {
56     private static final String EMPTY_DATA = "\"\"";
57     private static final int SSE_MONITOR_INITIAL_DELAY_MIN = 1;
58     private static final int SSE_MONITOR_INTERVAL_MIN = 5;
59     private static final int SSE_MONITOR_BROKEN_CONNECTION_TIMEOUT_MIN = 3;
60
61     private final String haId;
62     private final HomeConnectEventListener eventListener;
63     private final HomeConnectEventSourceClient client;
64     private final Logger logger = LoggerFactory.getLogger(HomeConnectEventSourceListener.class);
65     private final ScheduledFuture<?> eventSourceMonitorFuture;
66     private final CircularQueue<Event> eventQueue;
67     private final ScheduledExecutorService scheduledExecutorService;
68
69     private @Nullable LocalDateTime lastEventReceived;
70
71     public HomeConnectEventSourceListener(String haId, final HomeConnectEventListener eventListener,
72             final HomeConnectEventSourceClient client, final ScheduledExecutorService scheduler,
73             CircularQueue<Event> eventQueue) {
74         this.haId = haId;
75         this.eventListener = eventListener;
76         this.client = client;
77         this.eventQueue = eventQueue;
78         this.scheduledExecutorService = scheduler;
79
80         eventSourceMonitorFuture = createMonitor(scheduler);
81     }
82
83     public void onEvent(InboundSseEvent inboundEvent) {
84         String id = inboundEvent.getId();
85         String type = inboundEvent.getName();
86         String data = inboundEvent.readData();
87
88         lastEventReceived = now();
89
90         EventType eventType = valueOfType(type);
91         if (eventType != null) {
92             mapEventSourceEventToEvent(haId, eventType, data).forEach(event -> {
93                 eventQueue.add(event);
94                 logger.debug("Received event ({}): {}", haId, event);
95                 try {
96                     eventListener.onEvent(event);
97                 } catch (Exception e) {
98                     logger.error("Could not publish event to Listener!", e);
99                 }
100             });
101         } else {
102             logger.warn("Received unknown event source type! haId={}, id={}, type={}, data={}", haId, id, type, data);
103         }
104     }
105
106     public void onComplete() {
107         logger.debug("Event source listener channel closed ({}).", haId);
108
109         client.unregisterEventListener(eventListener, true);
110
111         try {
112             eventListener.onClosed();
113         } catch (Exception e) {
114             logger.error("Could not publish closed event to listener ({})!", haId, e);
115         }
116         stopMonitor();
117     }
118
119     public void onError(Throwable error) {
120         String throwableMessage = error.getMessage();
121         String throwableClass = error.getClass().getName();
122
123         logger.debug("Event source listener connection failure occurred. haId={}, throwable={}, throwableMessage={}",
124                 haId, throwableClass, throwableMessage);
125
126         client.unregisterEventListener(eventListener);
127
128         try {
129             if (throwableMessage != null
130                     && throwableMessage.contains(String.valueOf(HttpStatus.TOO_MANY_REQUESTS_429))) {
131                 logger.warn(
132                         "More than 10 active event monitoring channels was reached. Further event monitoring requests are blocked. haId={}",
133                         haId);
134                 eventListener.onRateLimitReached();
135             } else {
136                 // The SSE connection is closed by the server every 24 hours.
137                 // When you try to reconnect, it often fails with a NotAuthorizedException (401) for the next few
138                 // seconds. So we wait few seconds before trying again.
139                 if (error instanceof NotAuthorizedException) {
140                     logger.debug(
141                             "Event source listener connection failure due to unauthorized exception : wait 10 seconds... haId={}",
142                             haId);
143                     scheduledExecutorService.schedule(() -> eventListener.onClosed(), 10, TimeUnit.SECONDS);
144                 } else {
145                     eventListener.onClosed();
146                 }
147             }
148         } catch (Exception e) {
149             logger.error("Could not publish closed event to listener ({})!", haId, e);
150         }
151         stopMonitor();
152     }
153
154     private ScheduledFuture<?> createMonitor(ScheduledExecutorService scheduler) {
155         return scheduler.scheduleWithFixedDelay(() -> {
156             logger.trace("Check event source connection ({}). Last event package received at {}.", haId,
157                     lastEventReceived);
158             if (lastEventReceived != null && ChronoUnit.MINUTES.between(lastEventReceived,
159                     now()) > SSE_MONITOR_BROKEN_CONNECTION_TIMEOUT_MIN) {
160                 logger.warn("Dead event source connection detected ({}).", haId);
161
162                 client.unregisterEventListener(eventListener);
163
164                 try {
165                     eventListener.onClosed();
166                 } catch (Exception e) {
167                     logger.error("Could not publish closed event to listener ({})!", haId, e);
168                 }
169                 stopMonitor();
170             }
171         }, SSE_MONITOR_INITIAL_DELAY_MIN, SSE_MONITOR_INTERVAL_MIN, TimeUnit.MINUTES);
172     }
173
174     public void stopMonitor() {
175         if (!eventSourceMonitorFuture.isDone()) {
176             logger.debug("Dispose event source connection monitor of appliance ({}).", haId);
177             eventSourceMonitorFuture.cancel(true);
178         }
179     }
180
181     private List<Event> mapEventSourceEventToEvent(String haId, EventType type, @Nullable String data) {
182         List<Event> events = new ArrayList<>();
183
184         if ((STATUS.equals(type) || EVENT.equals(type) || NOTIFY.equals(type)) && data != null && !data.trim().isEmpty()
185                 && !EMPTY_DATA.equals(data)) {
186             try {
187                 JsonObject responseObject = HttpHelper.parseString(data).getAsJsonObject();
188                 JsonArray items = responseObject.getAsJsonArray("items");
189
190                 items.forEach(item -> {
191                     JsonObject obj = (JsonObject) item;
192                     String key = getJsonElementAsString(obj, "key").orElse(null);
193                     String value = getJsonElementAsString(obj, "value").orElse(null);
194                     String unit = getJsonElementAsString(obj, "unit").orElse(null);
195                     String name = getJsonElementAsString(obj, "name").orElse(null);
196                     String uri = getJsonElementAsString(obj, "uri").orElse(null);
197                     EventLevel level = getJsonElementAsString(obj, "level").map(EventLevel::valueOfLevel).orElse(null);
198                     EventHandling handling = getJsonElementAsString(obj, "handling").map(EventHandling::valueOfHandling)
199                             .orElse(null);
200                     ZonedDateTime creation = getJsonElementAsLong(obj, "timestamp").map(timestamp -> ZonedDateTime
201                             .ofInstant(Instant.ofEpochSecond(timestamp), TimeZone.getDefault().toZoneId()))
202                             .orElse(ZonedDateTime.now());
203
204                     events.add(new Event(haId, type, key, name, uri, creation, level, handling, value, unit));
205                 });
206             } catch (IllegalStateException e) {
207                 logger.error("Could not parse event! haId={}, error={}", haId, e.getMessage());
208             }
209         } else {
210             events.add(new Event(haId, type));
211         }
212
213         return events;
214     }
215
216     private Optional<Long> getJsonElementAsLong(JsonObject jsonObject, String elementName) {
217         var element = jsonObject.get(elementName);
218         return element == null || element.isJsonNull() ? Optional.empty() : Optional.of(element.getAsLong());
219     }
220
221     private Optional<String> getJsonElementAsString(JsonObject jsonObject, String elementName) {
222         var element = jsonObject.get(elementName);
223         return element == null || element.isJsonNull() ? Optional.empty() : Optional.of(element.getAsString());
224     }
225 }