]> git.basschouten.com Git - openhab-addons.git/blob
08e61bdfe5346ac86769d4007b9d98547d9e8490
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.nest.internal.sdm.api;
14
15 import static org.eclipse.jetty.http.HttpHeader.*;
16 import static org.eclipse.jetty.http.HttpMethod.POST;
17 import static org.openhab.binding.nest.internal.sdm.dto.SDMGson.GSON;
18
19 import java.io.IOException;
20 import java.time.Duration;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Set;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.ScheduledThreadPoolExecutor;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.TimeoutException;
30 import java.util.function.Consumer;
31 import java.util.stream.Collectors;
32
33 import org.eclipse.jdt.annotation.NonNullByDefault;
34 import org.eclipse.jetty.client.HttpClient;
35 import org.eclipse.jetty.client.util.StringContentProvider;
36 import org.eclipse.jetty.http.HttpMethod;
37 import org.openhab.binding.nest.internal.sdm.dto.PubSubRequestsResponses.PubSubAcknowledgeRequest;
38 import org.openhab.binding.nest.internal.sdm.dto.PubSubRequestsResponses.PubSubCreateRequest;
39 import org.openhab.binding.nest.internal.sdm.dto.PubSubRequestsResponses.PubSubPullRequest;
40 import org.openhab.binding.nest.internal.sdm.dto.PubSubRequestsResponses.PubSubPullResponse;
41 import org.openhab.binding.nest.internal.sdm.exception.FailedSendingPubSubDataException;
42 import org.openhab.binding.nest.internal.sdm.exception.InvalidPubSubAccessTokenException;
43 import org.openhab.binding.nest.internal.sdm.exception.InvalidPubSubAuthorizationCodeException;
44 import org.openhab.binding.nest.internal.sdm.listener.PubSubSubscriptionListener;
45 import org.openhab.core.auth.client.oauth2.AccessTokenResponse;
46 import org.openhab.core.auth.client.oauth2.OAuthClientService;
47 import org.openhab.core.auth.client.oauth2.OAuthException;
48 import org.openhab.core.auth.client.oauth2.OAuthFactory;
49 import org.openhab.core.auth.client.oauth2.OAuthResponseException;
50 import org.openhab.core.common.NamedThreadFactory;
51 import org.openhab.core.io.net.http.HttpClientFactory;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 /**
56  * The {@link PubSubAPI} implements a subset of the Pub/Sub REST API which allows for subscribing to SDM events.
57  *
58  * @author Wouter Born - Initial contribution
59  *
60  * @see https://cloud.google.com/pubsub/docs/reference/rest
61  * @see https://developers.google.com/nest/device-access/api/events
62  */
63 @NonNullByDefault
64 public class PubSubAPI {
65
66     private class Subscriber implements Runnable {
67
68         private final String subscriptionId;
69
70         Subscriber(String subscriptionId) {
71             this.subscriptionId = subscriptionId;
72         }
73
74         @Override
75         public void run() {
76             if (!subscriptionListeners.containsKey(subscriptionId)) {
77                 logger.debug("Stop receiving subscription '{}' messages since there are no listeners", subscriptionId);
78                 return;
79             }
80
81             try {
82                 checkAccessTokenValidity();
83                 String messages = pullSubscriptionMessages(subscriptionId);
84
85                 PubSubPullResponse pullResponse = GSON.fromJson(messages, PubSubPullResponse.class);
86
87                 if (pullResponse != null && pullResponse.receivedMessages != null) {
88                     logger.debug("Subscription '{}' has {} new message(s)", subscriptionId,
89                             pullResponse.receivedMessages.size());
90                     forEachListener(listener -> pullResponse.receivedMessages
91                             .forEach(message -> listener.onMessage(message.message)));
92                     List<String> ackIds = pullResponse.receivedMessages.stream().map(message -> message.ackId)
93                             .collect(Collectors.toList());
94                     acknowledgeSubscriptionMessages(subscriptionId, ackIds);
95                 } else {
96                     forEachListener(PubSubSubscriptionListener::onNoNewMessages);
97                 }
98
99                 scheduler.submit(this);
100             } catch (FailedSendingPubSubDataException e) {
101                 logger.debug("Expected exception while pulling message for '{}' subscription", subscriptionId, e);
102                 Throwable cause = e.getCause();
103                 if (!(cause instanceof InterruptedException)) {
104                     forEachListener(listener -> listener.onError(e));
105                     scheduler.schedule(this, RETRY_TIMEOUT.toNanos(), TimeUnit.NANOSECONDS);
106                 }
107             } catch (InvalidPubSubAccessTokenException e) {
108                 logger.warn("Cannot pull messages for '{}' subscription (access or refresh token invalid)",
109                         subscriptionId, e);
110                 forEachListener(listener -> listener.onError(e));
111             } catch (Exception e) {
112                 logger.warn("Unexpected exception while pulling message for '{}' subscription", subscriptionId, e);
113                 forEachListener(listener -> listener.onError(e));
114                 scheduler.schedule(this, RETRY_TIMEOUT.toNanos(), TimeUnit.NANOSECONDS);
115             }
116         }
117
118         private void forEachListener(Consumer<PubSubSubscriptionListener> consumer) {
119             Set<PubSubSubscriptionListener> listeners = subscriptionListeners.get(subscriptionId);
120             if (listeners != null) {
121                 listeners.forEach(consumer::accept);
122             } else {
123                 logger.debug("Subscription '{}' has no listeners", subscriptionId);
124             }
125         }
126     }
127
128     private static final String AUTH_URL = "https://accounts.google.com/o/oauth2/auth";
129     private static final String TOKEN_URL = "https://accounts.google.com/o/oauth2/token";
130     private static final String REDIRECT_URI = "https://www.google.com";
131
132     private static final String PUBSUB_HANDLE_FORMAT = "%s.pubsub";
133     private static final String PUBSUB_SCOPE = "https://www.googleapis.com/auth/pubsub";
134
135     private static final String PUBSUB_URL_PREFIX = "https://pubsub.googleapis.com/v1/";
136     private static final int PUBSUB_PULL_MAX_MESSAGES = 10;
137
138     private static final String APPLICATION_JSON = "application/json";
139     private static final String BEARER = "Bearer ";
140
141     private static final Duration REQUEST_TIMEOUT = Duration.ofMinutes(1);
142     private static final Duration RETRY_TIMEOUT = Duration.ofSeconds(30);
143
144     private final Logger logger = LoggerFactory.getLogger(PubSubAPI.class);
145
146     private final HttpClient httpClient;
147     private final OAuthFactory oAuthFactory;
148     private final OAuthClientService oAuthService;
149     private final String oAuthServiceHandleId;
150     private final String projectId;
151     private final ScheduledThreadPoolExecutor scheduler;
152     private final Map<String, Set<PubSubSubscriptionListener>> subscriptionListeners = new HashMap<>();
153
154     public PubSubAPI(HttpClientFactory httpClientFactory, OAuthFactory oAuthFactory, String ownerId, String projectId,
155             String clientId, String clientSecret) {
156         this.httpClient = httpClientFactory.getCommonHttpClient();
157         this.projectId = projectId;
158         this.oAuthFactory = oAuthFactory;
159         this.oAuthServiceHandleId = String.format(PUBSUB_HANDLE_FORMAT, ownerId);
160         this.oAuthService = oAuthFactory.createOAuthClientService(oAuthServiceHandleId, TOKEN_URL, AUTH_URL, clientId,
161                 clientSecret, PUBSUB_SCOPE, false);
162         scheduler = new ScheduledThreadPoolExecutor(3, new NamedThreadFactory(ownerId, true));
163     }
164
165     public void dispose() {
166         subscriptionListeners.clear();
167         scheduler.shutdownNow();
168         oAuthFactory.ungetOAuthService(oAuthServiceHandleId);
169     }
170
171     public void deleteOAuthServiceAndAccessToken() {
172         oAuthFactory.deleteServiceAndAccessToken(oAuthServiceHandleId);
173     }
174
175     public void authorizeClient(String authorizationCode) throws InvalidPubSubAuthorizationCodeException, IOException {
176         try {
177             oAuthService.getAccessTokenResponseByAuthorizationCode(authorizationCode, REDIRECT_URI);
178         } catch (OAuthException | OAuthResponseException e) {
179             throw new InvalidPubSubAuthorizationCodeException(
180                     "Failed to authorize Pub/Sub client. Check the authorization code or generate a new one.", e);
181         }
182     }
183
184     public void checkAccessTokenValidity() throws InvalidPubSubAccessTokenException, IOException {
185         getAuthorizationHeader();
186     }
187
188     private String acknowledgeSubscriptionMessages(String subscriptionId, List<String> ackIds)
189             throws FailedSendingPubSubDataException, InvalidPubSubAccessTokenException {
190         logger.debug("Acknowleding {} message(s) for '{}' subscription", ackIds.size(), subscriptionId);
191         String url = getSubscriptionUrl(subscriptionId) + ":acknowledge";
192         String requestContent = GSON.toJson(new PubSubAcknowledgeRequest(ackIds));
193         return postJson(url, requestContent);
194     }
195
196     public void addSubscriptionListener(String subscriptionId, PubSubSubscriptionListener listener) {
197         synchronized (subscriptionListeners) {
198             Set<PubSubSubscriptionListener> listeners = subscriptionListeners.get(subscriptionId);
199             if (listeners == null) {
200                 listeners = new HashSet<>();
201                 subscriptionListeners.put(subscriptionId, listeners);
202             }
203             listeners.add(listener);
204             if (listeners.size() == 1) {
205                 scheduler.submit(new Subscriber(subscriptionId));
206             }
207         }
208     }
209
210     public void removeSubscriptionListener(String subscriptionId, PubSubSubscriptionListener listener) {
211         synchronized (subscriptionListeners) {
212             Set<PubSubSubscriptionListener> listeners = subscriptionListeners.get(subscriptionId);
213             if (listeners != null) {
214                 listeners.remove(listener);
215                 if (listeners.isEmpty()) {
216                     subscriptionListeners.remove(subscriptionId);
217                     scheduler.getQueue().removeIf(
218                             runnable -> runnable instanceof Subscriber s && s.subscriptionId.equals(subscriptionId));
219                 }
220             }
221         }
222     }
223
224     public void createSubscription(String subscriptionId, String topicName)
225             throws FailedSendingPubSubDataException, InvalidPubSubAccessTokenException {
226         logger.debug("Creating '{}' subscription", subscriptionId);
227         String url = getSubscriptionUrl(subscriptionId);
228         String requestContent = GSON.toJson(new PubSubCreateRequest(topicName, true));
229         putJson(url, requestContent);
230     }
231
232     private String getAuthorizationHeader() throws InvalidPubSubAccessTokenException, IOException {
233         try {
234             AccessTokenResponse response = oAuthService.getAccessTokenResponse();
235             if (response == null || response.getAccessToken() == null || response.getAccessToken().isEmpty()) {
236                 throw new InvalidPubSubAccessTokenException(
237                         "No Pub/Sub access token. Client may not have been authorized.");
238             }
239             if (response.getRefreshToken() == null || response.getRefreshToken().isEmpty()) {
240                 throw new InvalidPubSubAccessTokenException(
241                         "No Pub/Sub refresh token. Delete and readd credentials, then reauthorize.");
242             }
243             return BEARER + response.getAccessToken();
244         } catch (OAuthException | OAuthResponseException e) {
245             throw new InvalidPubSubAccessTokenException(
246                     "Error fetching Pub/Sub access token. Check the authorization code or generate a new one.", e);
247         }
248     }
249
250     private String getSubscriptionUrl(String subscriptionId) {
251         return PUBSUB_URL_PREFIX + "projects/" + projectId + "/subscriptions/" + subscriptionId;
252     }
253
254     private String postJson(String url, String requestContent)
255             throws FailedSendingPubSubDataException, InvalidPubSubAccessTokenException {
256         try {
257             logger.debug("Posting JSON to: {}", url);
258             String response = httpClient.newRequest(url) //
259                     .method(POST) //
260                     .header(ACCEPT, APPLICATION_JSON) //
261                     .header(AUTHORIZATION, getAuthorizationHeader()) //
262                     .content(new StringContentProvider(requestContent), APPLICATION_JSON) //
263                     .timeout(REQUEST_TIMEOUT.toNanos(), TimeUnit.NANOSECONDS) //
264                     .send() //
265                     .getContentAsString();
266             logger.debug("Response: {}", response);
267             return response;
268         } catch (ExecutionException | InterruptedException | IOException | TimeoutException e) {
269             throw new FailedSendingPubSubDataException("Failed to send JSON POST request", e);
270         }
271     }
272
273     private String pullSubscriptionMessages(String subscriptionId)
274             throws FailedSendingPubSubDataException, InvalidPubSubAccessTokenException {
275         logger.debug("Pulling messages for '{}' subscription", subscriptionId);
276         String url = getSubscriptionUrl(subscriptionId) + ":pull";
277         String requestContent = GSON.toJson(new PubSubPullRequest(PUBSUB_PULL_MAX_MESSAGES));
278         return postJson(url, requestContent);
279     }
280
281     private String putJson(String url, String requestContent)
282             throws FailedSendingPubSubDataException, InvalidPubSubAccessTokenException {
283         try {
284             logger.debug("Putting JSON to: {}", url);
285             String response = httpClient.newRequest(url) //
286                     .method(HttpMethod.PUT) //
287                     .header(ACCEPT, APPLICATION_JSON) //
288                     .header(AUTHORIZATION, getAuthorizationHeader()) //
289                     .content(new StringContentProvider(requestContent), APPLICATION_JSON) //
290                     .timeout(REQUEST_TIMEOUT.toNanos(), TimeUnit.NANOSECONDS) //
291                     .send() //
292                     .getContentAsString();
293             logger.debug("Response: {}", response);
294             return response;
295         } catch (ExecutionException | InterruptedException | IOException | TimeoutException e) {
296             throw new FailedSendingPubSubDataException("Failed to send JSON PUT request", e);
297         }
298     }
299 }