2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.nest.internal.sdm.api;
15 import static org.eclipse.jetty.http.HttpHeader.*;
16 import static org.eclipse.jetty.http.HttpMethod.POST;
17 import static org.openhab.binding.nest.internal.sdm.dto.SDMGson.GSON;
19 import java.io.IOException;
20 import java.time.Duration;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.List;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.ScheduledThreadPoolExecutor;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.TimeoutException;
30 import java.util.function.Consumer;
31 import java.util.stream.Collectors;
33 import org.eclipse.jdt.annotation.NonNullByDefault;
34 import org.eclipse.jetty.client.HttpClient;
35 import org.eclipse.jetty.client.util.StringContentProvider;
36 import org.eclipse.jetty.http.HttpMethod;
37 import org.openhab.binding.nest.internal.sdm.dto.PubSubRequestsResponses.PubSubAcknowledgeRequest;
38 import org.openhab.binding.nest.internal.sdm.dto.PubSubRequestsResponses.PubSubCreateRequest;
39 import org.openhab.binding.nest.internal.sdm.dto.PubSubRequestsResponses.PubSubPullRequest;
40 import org.openhab.binding.nest.internal.sdm.dto.PubSubRequestsResponses.PubSubPullResponse;
41 import org.openhab.binding.nest.internal.sdm.exception.FailedSendingPubSubDataException;
42 import org.openhab.binding.nest.internal.sdm.exception.InvalidPubSubAccessTokenException;
43 import org.openhab.binding.nest.internal.sdm.exception.InvalidPubSubAuthorizationCodeException;
44 import org.openhab.binding.nest.internal.sdm.listener.PubSubSubscriptionListener;
45 import org.openhab.core.auth.client.oauth2.AccessTokenResponse;
46 import org.openhab.core.auth.client.oauth2.OAuthClientService;
47 import org.openhab.core.auth.client.oauth2.OAuthException;
48 import org.openhab.core.auth.client.oauth2.OAuthFactory;
49 import org.openhab.core.auth.client.oauth2.OAuthResponseException;
50 import org.openhab.core.common.NamedThreadFactory;
51 import org.openhab.core.io.net.http.HttpClientFactory;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
56 * The {@link PubSubAPI} implements a subset of the Pub/Sub REST API which allows for subscribing to SDM events.
58 * @author Wouter Born - Initial contribution
60 * @see https://cloud.google.com/pubsub/docs/reference/rest
61 * @see https://developers.google.com/nest/device-access/api/events
64 public class PubSubAPI {
66 private class Subscriber implements Runnable {
68 private final String subscriptionId;
70 Subscriber(String subscriptionId) {
71 this.subscriptionId = subscriptionId;
76 if (!subscriptionListeners.containsKey(subscriptionId)) {
77 logger.debug("Stop receiving subscription '{}' messages since there are no listeners", subscriptionId);
82 checkAccessTokenValidity();
83 String messages = pullSubscriptionMessages(subscriptionId);
85 PubSubPullResponse pullResponse = GSON.fromJson(messages, PubSubPullResponse.class);
87 if (pullResponse != null && pullResponse.receivedMessages != null) {
88 logger.debug("Subscription '{}' has {} new message(s)", subscriptionId,
89 pullResponse.receivedMessages.size());
90 forEachListener(listener -> pullResponse.receivedMessages
91 .forEach(message -> listener.onMessage(message.message)));
92 List<String> ackIds = pullResponse.receivedMessages.stream().map(message -> message.ackId)
93 .collect(Collectors.toList());
94 acknowledgeSubscriptionMessages(subscriptionId, ackIds);
96 forEachListener(PubSubSubscriptionListener::onNoNewMessages);
99 scheduler.submit(this);
100 } catch (FailedSendingPubSubDataException e) {
101 logger.debug("Expected exception while pulling message for '{}' subscription", subscriptionId, e);
102 Throwable cause = e.getCause();
103 if (!(cause instanceof InterruptedException)) {
104 forEachListener(listener -> listener.onError(e));
105 scheduler.schedule(this, RETRY_TIMEOUT.toNanos(), TimeUnit.NANOSECONDS);
107 } catch (InvalidPubSubAccessTokenException e) {
108 logger.warn("Cannot pull messages for '{}' subscription (access or refresh token invalid)",
110 forEachListener(listener -> listener.onError(e));
111 } catch (Exception e) {
112 logger.warn("Unexpected exception while pulling message for '{}' subscription", subscriptionId, e);
113 forEachListener(listener -> listener.onError(e));
114 scheduler.schedule(this, RETRY_TIMEOUT.toNanos(), TimeUnit.NANOSECONDS);
118 private void forEachListener(Consumer<PubSubSubscriptionListener> consumer) {
119 Set<PubSubSubscriptionListener> listeners = subscriptionListeners.get(subscriptionId);
120 if (listeners != null) {
121 listeners.forEach(consumer::accept);
123 logger.debug("Subscription '{}' has no listeners", subscriptionId);
128 private static final String AUTH_URL = "https://accounts.google.com/o/oauth2/auth";
129 private static final String TOKEN_URL = "https://accounts.google.com/o/oauth2/token";
130 private static final String REDIRECT_URI = "https://www.google.com";
132 private static final String PUBSUB_HANDLE_FORMAT = "%s.pubsub";
133 private static final String PUBSUB_SCOPE = "https://www.googleapis.com/auth/pubsub";
135 private static final String PUBSUB_URL_PREFIX = "https://pubsub.googleapis.com/v1/";
136 private static final int PUBSUB_PULL_MAX_MESSAGES = 10;
138 private static final String APPLICATION_JSON = "application/json";
139 private static final String BEARER = "Bearer ";
141 private static final Duration REQUEST_TIMEOUT = Duration.ofMinutes(1);
142 private static final Duration RETRY_TIMEOUT = Duration.ofSeconds(30);
144 private final Logger logger = LoggerFactory.getLogger(PubSubAPI.class);
146 private final HttpClient httpClient;
147 private final OAuthClientService oAuthService;
148 private final String projectId;
149 private final ScheduledThreadPoolExecutor scheduler;
150 private final Map<String, Set<PubSubSubscriptionListener>> subscriptionListeners = new HashMap<>();
152 public PubSubAPI(HttpClientFactory httpClientFactory, OAuthFactory oAuthFactory, String ownerId, String projectId,
153 String clientId, String clientSecret) {
154 this.httpClient = httpClientFactory.getCommonHttpClient();
155 this.projectId = projectId;
156 this.oAuthService = oAuthFactory.createOAuthClientService(String.format(PUBSUB_HANDLE_FORMAT, ownerId),
157 TOKEN_URL, AUTH_URL, clientId, clientSecret, PUBSUB_SCOPE, false);
158 scheduler = new ScheduledThreadPoolExecutor(3, new NamedThreadFactory(ownerId, true));
161 public void dispose() {
162 subscriptionListeners.clear();
163 scheduler.shutdownNow();
166 public void authorizeClient(String authorizationCode) throws InvalidPubSubAuthorizationCodeException, IOException {
168 oAuthService.getAccessTokenResponseByAuthorizationCode(authorizationCode, REDIRECT_URI);
169 } catch (OAuthException | OAuthResponseException e) {
170 throw new InvalidPubSubAuthorizationCodeException(
171 "Failed to authorize Pub/Sub client. Check the authorization code or generate a new one.", e);
175 public void checkAccessTokenValidity() throws InvalidPubSubAccessTokenException, IOException {
176 getAuthorizationHeader();
179 private String acknowledgeSubscriptionMessages(String subscriptionId, List<String> ackIds)
180 throws FailedSendingPubSubDataException, InvalidPubSubAccessTokenException {
181 logger.debug("Acknowleding {} message(s) for '{}' subscription", ackIds.size(), subscriptionId);
182 String url = getSubscriptionUrl(subscriptionId) + ":acknowledge";
183 String requestContent = GSON.toJson(new PubSubAcknowledgeRequest(ackIds));
184 return postJson(url, requestContent);
187 public void addSubscriptionListener(String subscriptionId, PubSubSubscriptionListener listener) {
188 synchronized (subscriptionListeners) {
189 Set<PubSubSubscriptionListener> listeners = subscriptionListeners.get(subscriptionId);
190 if (listeners == null) {
191 listeners = new HashSet<>();
192 subscriptionListeners.put(subscriptionId, listeners);
194 listeners.add(listener);
195 if (listeners.size() == 1) {
196 scheduler.submit(new Subscriber(subscriptionId));
201 public void removeSubscriptionListener(String subscriptionId, PubSubSubscriptionListener listener) {
202 synchronized (subscriptionListeners) {
203 Set<PubSubSubscriptionListener> listeners = subscriptionListeners.get(subscriptionId);
204 if (listeners != null) {
205 listeners.remove(listener);
206 if (listeners.isEmpty()) {
207 subscriptionListeners.remove(subscriptionId);
208 scheduler.getQueue().removeIf(runnable -> runnable instanceof Subscriber
209 && ((Subscriber) runnable).subscriptionId.equals(subscriptionId));
215 public void createSubscription(String subscriptionId, String topicName)
216 throws FailedSendingPubSubDataException, InvalidPubSubAccessTokenException {
217 logger.debug("Creating '{}' subscription", subscriptionId);
218 String url = getSubscriptionUrl(subscriptionId);
219 String requestContent = GSON.toJson(new PubSubCreateRequest(topicName, true));
220 putJson(url, requestContent);
223 private String getAuthorizationHeader() throws InvalidPubSubAccessTokenException, IOException {
225 AccessTokenResponse response = oAuthService.getAccessTokenResponse();
226 if (response == null || response.getAccessToken() == null || response.getAccessToken().isEmpty()) {
227 throw new InvalidPubSubAccessTokenException(
228 "No Pub/Sub access token. Client may not have been authorized.");
230 if (response.getRefreshToken() == null || response.getRefreshToken().isEmpty()) {
231 throw new InvalidPubSubAccessTokenException(
232 "No Pub/Sub refresh token. Delete and readd credentials, then reauthorize.");
234 return BEARER + response.getAccessToken();
235 } catch (OAuthException | OAuthResponseException e) {
236 throw new InvalidPubSubAccessTokenException(
237 "Error fetching Pub/Sub access token. Check the authorization code or generate a new one.", e);
241 private String getSubscriptionUrl(String subscriptionId) {
242 return PUBSUB_URL_PREFIX + "projects/" + projectId + "/subscriptions/" + subscriptionId;
245 private String postJson(String url, String requestContent)
246 throws FailedSendingPubSubDataException, InvalidPubSubAccessTokenException {
248 logger.debug("Posting JSON to: {}", url);
249 String response = httpClient.newRequest(url) //
251 .header(ACCEPT, APPLICATION_JSON) //
252 .header(AUTHORIZATION, getAuthorizationHeader()) //
253 .content(new StringContentProvider(requestContent), APPLICATION_JSON) //
254 .timeout(REQUEST_TIMEOUT.toNanos(), TimeUnit.NANOSECONDS) //
256 .getContentAsString();
257 logger.debug("Response: {}", response);
259 } catch (ExecutionException | InterruptedException | IOException | TimeoutException e) {
260 throw new FailedSendingPubSubDataException("Failed to send JSON POST request", e);
264 private String pullSubscriptionMessages(String subscriptionId)
265 throws FailedSendingPubSubDataException, InvalidPubSubAccessTokenException {
266 logger.debug("Pulling messages for '{}' subscription", subscriptionId);
267 String url = getSubscriptionUrl(subscriptionId) + ":pull";
268 String requestContent = GSON.toJson(new PubSubPullRequest(PUBSUB_PULL_MAX_MESSAGES));
269 return postJson(url, requestContent);
272 private String putJson(String url, String requestContent)
273 throws FailedSendingPubSubDataException, InvalidPubSubAccessTokenException {
275 logger.debug("Putting JSON to: {}", url);
276 String response = httpClient.newRequest(url) //
277 .method(HttpMethod.PUT) //
278 .header(ACCEPT, APPLICATION_JSON) //
279 .header(AUTHORIZATION, getAuthorizationHeader()) //
280 .content(new StringContentProvider(requestContent), APPLICATION_JSON) //
281 .timeout(REQUEST_TIMEOUT.toNanos(), TimeUnit.NANOSECONDS) //
283 .getContentAsString();
284 logger.debug("Response: {}", response);
286 } catch (ExecutionException | InterruptedException | IOException | TimeoutException e) {
287 throw new FailedSendingPubSubDataException("Failed to send JSON PUT request", e);