2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.nest.internal.sdm.api;
15 import static org.eclipse.jetty.http.HttpHeader.*;
16 import static org.eclipse.jetty.http.HttpMethod.POST;
17 import static org.openhab.binding.nest.internal.sdm.dto.SDMGson.GSON;
19 import java.io.IOException;
20 import java.time.Duration;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.List;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.ScheduledThreadPoolExecutor;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.TimeoutException;
30 import java.util.function.Consumer;
31 import java.util.stream.Collectors;
33 import org.eclipse.jdt.annotation.NonNullByDefault;
34 import org.eclipse.jetty.client.HttpClient;
35 import org.eclipse.jetty.client.util.StringContentProvider;
36 import org.eclipse.jetty.http.HttpMethod;
37 import org.openhab.binding.nest.internal.sdm.dto.PubSubRequestsResponses.PubSubAcknowledgeRequest;
38 import org.openhab.binding.nest.internal.sdm.dto.PubSubRequestsResponses.PubSubCreateRequest;
39 import org.openhab.binding.nest.internal.sdm.dto.PubSubRequestsResponses.PubSubPullRequest;
40 import org.openhab.binding.nest.internal.sdm.dto.PubSubRequestsResponses.PubSubPullResponse;
41 import org.openhab.binding.nest.internal.sdm.exception.FailedSendingPubSubDataException;
42 import org.openhab.binding.nest.internal.sdm.exception.InvalidPubSubAccessTokenException;
43 import org.openhab.binding.nest.internal.sdm.exception.InvalidPubSubAuthorizationCodeException;
44 import org.openhab.binding.nest.internal.sdm.listener.PubSubSubscriptionListener;
45 import org.openhab.core.auth.client.oauth2.AccessTokenResponse;
46 import org.openhab.core.auth.client.oauth2.OAuthClientService;
47 import org.openhab.core.auth.client.oauth2.OAuthException;
48 import org.openhab.core.auth.client.oauth2.OAuthFactory;
49 import org.openhab.core.auth.client.oauth2.OAuthResponseException;
50 import org.openhab.core.common.NamedThreadFactory;
51 import org.openhab.core.io.net.http.HttpClientFactory;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
56 * The {@link PubSubAPI} implements a subset of the Pub/Sub REST API which allows for subscribing to SDM events.
58 * @author Wouter Born - Initial contribution
60 * @see <a href="https://cloud.google.com/pubsub/docs/reference/rest">
61 * https://cloud.google.com/pubsub/docs/reference/rest</a>
62 * @see <a href="https://developers.google.com/nest/device-access/api/events">
63 * https://developers.google.com/nest/device-access/api/events</a>
66 public class PubSubAPI {
68 private class Subscriber implements Runnable {
70 private final String subscriptionId;
72 Subscriber(String subscriptionId) {
73 this.subscriptionId = subscriptionId;
78 if (!subscriptionListeners.containsKey(subscriptionId)) {
79 logger.debug("Stop receiving subscription '{}' messages since there are no listeners", subscriptionId);
84 checkAccessTokenValidity();
85 String messages = pullSubscriptionMessages(subscriptionId);
87 PubSubPullResponse pullResponse = GSON.fromJson(messages, PubSubPullResponse.class);
89 if (pullResponse != null && pullResponse.receivedMessages != null) {
90 logger.debug("Subscription '{}' has {} new message(s)", subscriptionId,
91 pullResponse.receivedMessages.size());
92 forEachListener(listener -> pullResponse.receivedMessages
93 .forEach(message -> listener.onMessage(message.message)));
94 List<String> ackIds = pullResponse.receivedMessages.stream().map(message -> message.ackId)
95 .collect(Collectors.toList());
96 acknowledgeSubscriptionMessages(subscriptionId, ackIds);
98 forEachListener(PubSubSubscriptionListener::onNoNewMessages);
101 scheduler.submit(this);
102 } catch (FailedSendingPubSubDataException e) {
103 logger.debug("Expected exception while pulling message for '{}' subscription", subscriptionId, e);
104 Throwable cause = e.getCause();
105 if (!(cause instanceof InterruptedException)) {
106 forEachListener(listener -> listener.onError(e));
107 scheduler.schedule(this, RETRY_TIMEOUT.toNanos(), TimeUnit.NANOSECONDS);
109 } catch (InvalidPubSubAccessTokenException e) {
110 logger.warn("Cannot pull messages for '{}' subscription (access or refresh token invalid)",
112 forEachListener(listener -> listener.onError(e));
113 } catch (Exception e) {
114 logger.warn("Unexpected exception while pulling message for '{}' subscription", subscriptionId, e);
115 forEachListener(listener -> listener.onError(e));
116 scheduler.schedule(this, RETRY_TIMEOUT.toNanos(), TimeUnit.NANOSECONDS);
120 private void forEachListener(Consumer<PubSubSubscriptionListener> consumer) {
121 Set<PubSubSubscriptionListener> listeners = subscriptionListeners.get(subscriptionId);
122 if (listeners != null) {
123 listeners.forEach(consumer::accept);
125 logger.debug("Subscription '{}' has no listeners", subscriptionId);
130 private static final String AUTH_URL = "https://accounts.google.com/o/oauth2/auth";
131 private static final String TOKEN_URL = "https://accounts.google.com/o/oauth2/token";
132 private static final String REDIRECT_URI = "https://www.google.com";
134 private static final String PUBSUB_HANDLE_FORMAT = "%s.pubsub";
135 private static final String PUBSUB_SCOPE = "https://www.googleapis.com/auth/pubsub";
137 private static final String PUBSUB_URL_PREFIX = "https://pubsub.googleapis.com/v1/";
138 private static final int PUBSUB_PULL_MAX_MESSAGES = 10;
140 private static final String APPLICATION_JSON = "application/json";
141 private static final String BEARER = "Bearer ";
143 private static final Duration REQUEST_TIMEOUT = Duration.ofMinutes(1);
144 private static final Duration RETRY_TIMEOUT = Duration.ofSeconds(30);
146 private final Logger logger = LoggerFactory.getLogger(PubSubAPI.class);
148 private final HttpClient httpClient;
149 private final OAuthFactory oAuthFactory;
150 private final OAuthClientService oAuthService;
151 private final String oAuthServiceHandleId;
152 private final String projectId;
153 private final ScheduledThreadPoolExecutor scheduler;
154 private final Map<String, Set<PubSubSubscriptionListener>> subscriptionListeners = new HashMap<>();
156 public PubSubAPI(HttpClientFactory httpClientFactory, OAuthFactory oAuthFactory, String ownerId, String projectId,
157 String clientId, String clientSecret) {
158 this.httpClient = httpClientFactory.getCommonHttpClient();
159 this.projectId = projectId;
160 this.oAuthFactory = oAuthFactory;
161 this.oAuthServiceHandleId = String.format(PUBSUB_HANDLE_FORMAT, ownerId);
162 this.oAuthService = oAuthFactory.createOAuthClientService(oAuthServiceHandleId, TOKEN_URL, AUTH_URL, clientId,
163 clientSecret, PUBSUB_SCOPE, false);
164 scheduler = new ScheduledThreadPoolExecutor(3, new NamedThreadFactory(ownerId, true));
167 public void dispose() {
168 subscriptionListeners.clear();
169 scheduler.shutdownNow();
170 oAuthFactory.ungetOAuthService(oAuthServiceHandleId);
173 public void deleteOAuthServiceAndAccessToken() {
174 oAuthFactory.deleteServiceAndAccessToken(oAuthServiceHandleId);
177 public void authorizeClient(String authorizationCode) throws InvalidPubSubAuthorizationCodeException, IOException {
179 oAuthService.getAccessTokenResponseByAuthorizationCode(authorizationCode, REDIRECT_URI);
180 } catch (OAuthException | OAuthResponseException e) {
181 throw new InvalidPubSubAuthorizationCodeException(
182 "Failed to authorize Pub/Sub client. Check the authorization code or generate a new one.", e);
186 public void checkAccessTokenValidity() throws InvalidPubSubAccessTokenException, IOException {
187 getAuthorizationHeader();
190 private String acknowledgeSubscriptionMessages(String subscriptionId, List<String> ackIds)
191 throws FailedSendingPubSubDataException, InvalidPubSubAccessTokenException {
192 logger.debug("Acknowleding {} message(s) for '{}' subscription", ackIds.size(), subscriptionId);
193 String url = getSubscriptionUrl(subscriptionId) + ":acknowledge";
194 String requestContent = GSON.toJson(new PubSubAcknowledgeRequest(ackIds));
195 return postJson(url, requestContent);
198 public void addSubscriptionListener(String subscriptionId, PubSubSubscriptionListener listener) {
199 synchronized (subscriptionListeners) {
200 Set<PubSubSubscriptionListener> listeners = subscriptionListeners.get(subscriptionId);
201 if (listeners == null) {
202 listeners = new HashSet<>();
203 subscriptionListeners.put(subscriptionId, listeners);
205 listeners.add(listener);
206 if (listeners.size() == 1) {
207 scheduler.submit(new Subscriber(subscriptionId));
212 public void removeSubscriptionListener(String subscriptionId, PubSubSubscriptionListener listener) {
213 synchronized (subscriptionListeners) {
214 Set<PubSubSubscriptionListener> listeners = subscriptionListeners.get(subscriptionId);
215 if (listeners != null) {
216 listeners.remove(listener);
217 if (listeners.isEmpty()) {
218 subscriptionListeners.remove(subscriptionId);
219 scheduler.getQueue().removeIf(
220 runnable -> runnable instanceof Subscriber s && s.subscriptionId.equals(subscriptionId));
226 public void createSubscription(String subscriptionId, String topicName)
227 throws FailedSendingPubSubDataException, InvalidPubSubAccessTokenException {
228 logger.debug("Creating '{}' subscription", subscriptionId);
229 String url = getSubscriptionUrl(subscriptionId);
230 String requestContent = GSON.toJson(new PubSubCreateRequest(topicName, true));
231 putJson(url, requestContent);
234 private String getAuthorizationHeader() throws InvalidPubSubAccessTokenException, IOException {
236 AccessTokenResponse response = oAuthService.getAccessTokenResponse();
237 if (response == null || response.getAccessToken() == null || response.getAccessToken().isEmpty()) {
238 throw new InvalidPubSubAccessTokenException(
239 "No Pub/Sub access token. Client may not have been authorized.");
241 if (response.getRefreshToken() == null || response.getRefreshToken().isEmpty()) {
242 throw new InvalidPubSubAccessTokenException(
243 "No Pub/Sub refresh token. Delete and readd credentials, then reauthorize.");
245 return BEARER + response.getAccessToken();
246 } catch (OAuthException | OAuthResponseException e) {
247 throw new InvalidPubSubAccessTokenException(
248 "Error fetching Pub/Sub access token. Check the authorization code or generate a new one.", e);
252 private String getSubscriptionUrl(String subscriptionId) {
253 return PUBSUB_URL_PREFIX + "projects/" + projectId + "/subscriptions/" + subscriptionId;
256 private String postJson(String url, String requestContent)
257 throws FailedSendingPubSubDataException, InvalidPubSubAccessTokenException {
259 logger.debug("Posting JSON to: {}", url);
260 String response = httpClient.newRequest(url) //
262 .header(ACCEPT, APPLICATION_JSON) //
263 .header(AUTHORIZATION, getAuthorizationHeader()) //
264 .content(new StringContentProvider(requestContent), APPLICATION_JSON) //
265 .timeout(REQUEST_TIMEOUT.toNanos(), TimeUnit.NANOSECONDS) //
267 .getContentAsString();
268 logger.debug("Response: {}", response);
270 } catch (ExecutionException | InterruptedException | IOException | TimeoutException e) {
271 throw new FailedSendingPubSubDataException("Failed to send JSON POST request", e);
275 private String pullSubscriptionMessages(String subscriptionId)
276 throws FailedSendingPubSubDataException, InvalidPubSubAccessTokenException {
277 logger.debug("Pulling messages for '{}' subscription", subscriptionId);
278 String url = getSubscriptionUrl(subscriptionId) + ":pull";
279 String requestContent = GSON.toJson(new PubSubPullRequest(PUBSUB_PULL_MAX_MESSAGES));
280 return postJson(url, requestContent);
283 private String putJson(String url, String requestContent)
284 throws FailedSendingPubSubDataException, InvalidPubSubAccessTokenException {
286 logger.debug("Putting JSON to: {}", url);
287 String response = httpClient.newRequest(url) //
288 .method(HttpMethod.PUT) //
289 .header(ACCEPT, APPLICATION_JSON) //
290 .header(AUTHORIZATION, getAuthorizationHeader()) //
291 .content(new StringContentProvider(requestContent), APPLICATION_JSON) //
292 .timeout(REQUEST_TIMEOUT.toNanos(), TimeUnit.NANOSECONDS) //
294 .getContentAsString();
295 logger.debug("Response: {}", response);
297 } catch (ExecutionException | InterruptedException | IOException | TimeoutException e) {
298 throw new FailedSendingPubSubDataException("Failed to send JSON PUT request", e);