]> git.basschouten.com Git - openhab-addons.git/blob
05c1ab0e5a6f15290a1e1ed4d00bce05aeecc0e5
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.nest.internal.sdm.api;
14
15 import static org.eclipse.jetty.http.HttpHeader.*;
16 import static org.eclipse.jetty.http.HttpMethod.POST;
17 import static org.openhab.binding.nest.internal.sdm.dto.SDMGson.GSON;
18
19 import java.io.IOException;
20 import java.time.Duration;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Set;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.ScheduledThreadPoolExecutor;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.TimeoutException;
30 import java.util.function.Consumer;
31 import java.util.stream.Collectors;
32
33 import org.eclipse.jdt.annotation.NonNullByDefault;
34 import org.eclipse.jetty.client.HttpClient;
35 import org.eclipse.jetty.client.util.StringContentProvider;
36 import org.eclipse.jetty.http.HttpMethod;
37 import org.openhab.binding.nest.internal.sdm.dto.PubSubRequestsResponses.PubSubAcknowledgeRequest;
38 import org.openhab.binding.nest.internal.sdm.dto.PubSubRequestsResponses.PubSubCreateRequest;
39 import org.openhab.binding.nest.internal.sdm.dto.PubSubRequestsResponses.PubSubPullRequest;
40 import org.openhab.binding.nest.internal.sdm.dto.PubSubRequestsResponses.PubSubPullResponse;
41 import org.openhab.binding.nest.internal.sdm.exception.FailedSendingPubSubDataException;
42 import org.openhab.binding.nest.internal.sdm.exception.InvalidPubSubAccessTokenException;
43 import org.openhab.binding.nest.internal.sdm.exception.InvalidPubSubAuthorizationCodeException;
44 import org.openhab.binding.nest.internal.sdm.listener.PubSubSubscriptionListener;
45 import org.openhab.core.auth.client.oauth2.AccessTokenResponse;
46 import org.openhab.core.auth.client.oauth2.OAuthClientService;
47 import org.openhab.core.auth.client.oauth2.OAuthException;
48 import org.openhab.core.auth.client.oauth2.OAuthFactory;
49 import org.openhab.core.auth.client.oauth2.OAuthResponseException;
50 import org.openhab.core.common.NamedThreadFactory;
51 import org.openhab.core.io.net.http.HttpClientFactory;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 /**
56  * The {@link PubSubAPI} implements a subset of the Pub/Sub REST API which allows for subscribing to SDM events.
57  *
58  * @author Wouter Born - Initial contribution
59  *
60  * @see <a href="https://cloud.google.com/pubsub/docs/reference/rest">
61  *      https://cloud.google.com/pubsub/docs/reference/rest</a>
62  * @see <a href="https://developers.google.com/nest/device-access/api/events">
63  *      https://developers.google.com/nest/device-access/api/events</a>
64  */
65 @NonNullByDefault
66 public class PubSubAPI {
67
68     private class Subscriber implements Runnable {
69
70         private final String subscriptionId;
71
72         Subscriber(String subscriptionId) {
73             this.subscriptionId = subscriptionId;
74         }
75
76         @Override
77         public void run() {
78             if (!subscriptionListeners.containsKey(subscriptionId)) {
79                 logger.debug("Stop receiving subscription '{}' messages since there are no listeners", subscriptionId);
80                 return;
81             }
82
83             try {
84                 checkAccessTokenValidity();
85                 String messages = pullSubscriptionMessages(subscriptionId);
86
87                 PubSubPullResponse pullResponse = GSON.fromJson(messages, PubSubPullResponse.class);
88
89                 if (pullResponse != null && pullResponse.receivedMessages != null) {
90                     logger.debug("Subscription '{}' has {} new message(s)", subscriptionId,
91                             pullResponse.receivedMessages.size());
92                     forEachListener(listener -> pullResponse.receivedMessages
93                             .forEach(message -> listener.onMessage(message.message)));
94                     List<String> ackIds = pullResponse.receivedMessages.stream().map(message -> message.ackId)
95                             .collect(Collectors.toList());
96                     acknowledgeSubscriptionMessages(subscriptionId, ackIds);
97                 } else {
98                     forEachListener(PubSubSubscriptionListener::onNoNewMessages);
99                 }
100
101                 scheduler.submit(this);
102             } catch (FailedSendingPubSubDataException e) {
103                 logger.debug("Expected exception while pulling message for '{}' subscription", subscriptionId, e);
104                 Throwable cause = e.getCause();
105                 if (!(cause instanceof InterruptedException)) {
106                     forEachListener(listener -> listener.onError(e));
107                     scheduler.schedule(this, RETRY_TIMEOUT.toNanos(), TimeUnit.NANOSECONDS);
108                 }
109             } catch (InvalidPubSubAccessTokenException e) {
110                 logger.warn("Cannot pull messages for '{}' subscription (access or refresh token invalid)",
111                         subscriptionId, e);
112                 forEachListener(listener -> listener.onError(e));
113             } catch (Exception e) {
114                 logger.warn("Unexpected exception while pulling message for '{}' subscription", subscriptionId, e);
115                 forEachListener(listener -> listener.onError(e));
116                 scheduler.schedule(this, RETRY_TIMEOUT.toNanos(), TimeUnit.NANOSECONDS);
117             }
118         }
119
120         private void forEachListener(Consumer<PubSubSubscriptionListener> consumer) {
121             Set<PubSubSubscriptionListener> listeners = subscriptionListeners.get(subscriptionId);
122             if (listeners != null) {
123                 listeners.forEach(consumer::accept);
124             } else {
125                 logger.debug("Subscription '{}' has no listeners", subscriptionId);
126             }
127         }
128     }
129
130     private static final String AUTH_URL = "https://accounts.google.com/o/oauth2/auth";
131     private static final String TOKEN_URL = "https://accounts.google.com/o/oauth2/token";
132     private static final String REDIRECT_URI = "https://www.google.com";
133
134     private static final String PUBSUB_HANDLE_FORMAT = "%s.pubsub";
135     private static final String PUBSUB_SCOPE = "https://www.googleapis.com/auth/pubsub";
136
137     private static final String PUBSUB_URL_PREFIX = "https://pubsub.googleapis.com/v1/";
138     private static final int PUBSUB_PULL_MAX_MESSAGES = 10;
139
140     private static final String APPLICATION_JSON = "application/json";
141     private static final String BEARER = "Bearer ";
142
143     private static final Duration REQUEST_TIMEOUT = Duration.ofMinutes(1);
144     private static final Duration RETRY_TIMEOUT = Duration.ofSeconds(30);
145
146     private final Logger logger = LoggerFactory.getLogger(PubSubAPI.class);
147
148     private final HttpClient httpClient;
149     private final OAuthFactory oAuthFactory;
150     private final OAuthClientService oAuthService;
151     private final String oAuthServiceHandleId;
152     private final String projectId;
153     private final ScheduledThreadPoolExecutor scheduler;
154     private final Map<String, Set<PubSubSubscriptionListener>> subscriptionListeners = new HashMap<>();
155
156     public PubSubAPI(HttpClientFactory httpClientFactory, OAuthFactory oAuthFactory, String ownerId, String projectId,
157             String clientId, String clientSecret) {
158         this.httpClient = httpClientFactory.getCommonHttpClient();
159         this.projectId = projectId;
160         this.oAuthFactory = oAuthFactory;
161         this.oAuthServiceHandleId = String.format(PUBSUB_HANDLE_FORMAT, ownerId);
162         this.oAuthService = oAuthFactory.createOAuthClientService(oAuthServiceHandleId, TOKEN_URL, AUTH_URL, clientId,
163                 clientSecret, PUBSUB_SCOPE, false);
164         scheduler = new ScheduledThreadPoolExecutor(3, new NamedThreadFactory(ownerId, true));
165     }
166
167     public void dispose() {
168         subscriptionListeners.clear();
169         scheduler.shutdownNow();
170         oAuthFactory.ungetOAuthService(oAuthServiceHandleId);
171     }
172
173     public void deleteOAuthServiceAndAccessToken() {
174         oAuthFactory.deleteServiceAndAccessToken(oAuthServiceHandleId);
175     }
176
177     public void authorizeClient(String authorizationCode) throws InvalidPubSubAuthorizationCodeException, IOException {
178         try {
179             oAuthService.getAccessTokenResponseByAuthorizationCode(authorizationCode, REDIRECT_URI);
180         } catch (OAuthException | OAuthResponseException e) {
181             throw new InvalidPubSubAuthorizationCodeException(
182                     "Failed to authorize Pub/Sub client. Check the authorization code or generate a new one.", e);
183         }
184     }
185
186     public void checkAccessTokenValidity() throws InvalidPubSubAccessTokenException, IOException {
187         getAuthorizationHeader();
188     }
189
190     private String acknowledgeSubscriptionMessages(String subscriptionId, List<String> ackIds)
191             throws FailedSendingPubSubDataException, InvalidPubSubAccessTokenException {
192         logger.debug("Acknowleding {} message(s) for '{}' subscription", ackIds.size(), subscriptionId);
193         String url = getSubscriptionUrl(subscriptionId) + ":acknowledge";
194         String requestContent = GSON.toJson(new PubSubAcknowledgeRequest(ackIds));
195         return postJson(url, requestContent);
196     }
197
198     public void addSubscriptionListener(String subscriptionId, PubSubSubscriptionListener listener) {
199         synchronized (subscriptionListeners) {
200             Set<PubSubSubscriptionListener> listeners = subscriptionListeners.get(subscriptionId);
201             if (listeners == null) {
202                 listeners = new HashSet<>();
203                 subscriptionListeners.put(subscriptionId, listeners);
204             }
205             listeners.add(listener);
206             if (listeners.size() == 1) {
207                 scheduler.submit(new Subscriber(subscriptionId));
208             }
209         }
210     }
211
212     public void removeSubscriptionListener(String subscriptionId, PubSubSubscriptionListener listener) {
213         synchronized (subscriptionListeners) {
214             Set<PubSubSubscriptionListener> listeners = subscriptionListeners.get(subscriptionId);
215             if (listeners != null) {
216                 listeners.remove(listener);
217                 if (listeners.isEmpty()) {
218                     subscriptionListeners.remove(subscriptionId);
219                     scheduler.getQueue().removeIf(
220                             runnable -> runnable instanceof Subscriber s && s.subscriptionId.equals(subscriptionId));
221                 }
222             }
223         }
224     }
225
226     public void createSubscription(String subscriptionId, String topicName)
227             throws FailedSendingPubSubDataException, InvalidPubSubAccessTokenException {
228         logger.debug("Creating '{}' subscription", subscriptionId);
229         String url = getSubscriptionUrl(subscriptionId);
230         String requestContent = GSON.toJson(new PubSubCreateRequest(topicName, true));
231         putJson(url, requestContent);
232     }
233
234     private String getAuthorizationHeader() throws InvalidPubSubAccessTokenException, IOException {
235         try {
236             AccessTokenResponse response = oAuthService.getAccessTokenResponse();
237             if (response == null || response.getAccessToken() == null || response.getAccessToken().isEmpty()) {
238                 throw new InvalidPubSubAccessTokenException(
239                         "No Pub/Sub access token. Client may not have been authorized.");
240             }
241             if (response.getRefreshToken() == null || response.getRefreshToken().isEmpty()) {
242                 throw new InvalidPubSubAccessTokenException(
243                         "No Pub/Sub refresh token. Delete and readd credentials, then reauthorize.");
244             }
245             return BEARER + response.getAccessToken();
246         } catch (OAuthException | OAuthResponseException e) {
247             throw new InvalidPubSubAccessTokenException(
248                     "Error fetching Pub/Sub access token. Check the authorization code or generate a new one.", e);
249         }
250     }
251
252     private String getSubscriptionUrl(String subscriptionId) {
253         return PUBSUB_URL_PREFIX + "projects/" + projectId + "/subscriptions/" + subscriptionId;
254     }
255
256     private String postJson(String url, String requestContent)
257             throws FailedSendingPubSubDataException, InvalidPubSubAccessTokenException {
258         try {
259             logger.debug("Posting JSON to: {}", url);
260             String response = httpClient.newRequest(url) //
261                     .method(POST) //
262                     .header(ACCEPT, APPLICATION_JSON) //
263                     .header(AUTHORIZATION, getAuthorizationHeader()) //
264                     .content(new StringContentProvider(requestContent), APPLICATION_JSON) //
265                     .timeout(REQUEST_TIMEOUT.toNanos(), TimeUnit.NANOSECONDS) //
266                     .send() //
267                     .getContentAsString();
268             logger.debug("Response: {}", response);
269             return response;
270         } catch (ExecutionException | InterruptedException | IOException | TimeoutException e) {
271             throw new FailedSendingPubSubDataException("Failed to send JSON POST request", e);
272         }
273     }
274
275     private String pullSubscriptionMessages(String subscriptionId)
276             throws FailedSendingPubSubDataException, InvalidPubSubAccessTokenException {
277         logger.debug("Pulling messages for '{}' subscription", subscriptionId);
278         String url = getSubscriptionUrl(subscriptionId) + ":pull";
279         String requestContent = GSON.toJson(new PubSubPullRequest(PUBSUB_PULL_MAX_MESSAGES));
280         return postJson(url, requestContent);
281     }
282
283     private String putJson(String url, String requestContent)
284             throws FailedSendingPubSubDataException, InvalidPubSubAccessTokenException {
285         try {
286             logger.debug("Putting JSON to: {}", url);
287             String response = httpClient.newRequest(url) //
288                     .method(HttpMethod.PUT) //
289                     .header(ACCEPT, APPLICATION_JSON) //
290                     .header(AUTHORIZATION, getAuthorizationHeader()) //
291                     .content(new StringContentProvider(requestContent), APPLICATION_JSON) //
292                     .timeout(REQUEST_TIMEOUT.toNanos(), TimeUnit.NANOSECONDS) //
293                     .send() //
294                     .getContentAsString();
295             logger.debug("Response: {}", response);
296             return response;
297         } catch (ExecutionException | InterruptedException | IOException | TimeoutException e) {
298             throw new FailedSendingPubSubDataException("Failed to send JSON PUT request", e);
299         }
300     }
301 }