]> git.basschouten.com Git - openhab-addons.git/blob
742df8235809ade60cab1ff21b430e4095cf614f
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.io.openhabcloud.internal;
14
15 import java.io.IOException;
16 import java.net.MalformedURLException;
17 import java.net.URI;
18 import java.net.URISyntaxException;
19 import java.net.URL;
20 import java.net.URLEncoder;
21 import java.nio.ByteBuffer;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Set;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.TimeUnit;
28
29 import org.eclipse.jdt.annotation.Nullable;
30 import org.eclipse.jetty.client.HttpClient;
31 import org.eclipse.jetty.client.api.Request;
32 import org.eclipse.jetty.client.api.Request.FailureListener;
33 import org.eclipse.jetty.client.api.Response;
34 import org.eclipse.jetty.client.api.Response.ContentListener;
35 import org.eclipse.jetty.client.api.Response.HeadersListener;
36 import org.eclipse.jetty.client.api.Result;
37 import org.eclipse.jetty.client.util.BytesContentProvider;
38 import org.eclipse.jetty.http.HttpField;
39 import org.eclipse.jetty.http.HttpFields;
40 import org.eclipse.jetty.http.HttpMethod;
41 import org.eclipse.jetty.http.HttpStatus;
42 import org.eclipse.jetty.util.BufferUtil;
43 import org.eclipse.jetty.util.URIUtil;
44 import org.json.JSONException;
45 import org.json.JSONObject;
46 import org.openhab.core.OpenHAB;
47 import org.openhab.core.common.ThreadPoolManager;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 import io.socket.client.IO;
52 import io.socket.client.Manager;
53 import io.socket.client.Socket;
54 import io.socket.emitter.Emitter;
55 import io.socket.engineio.client.Transport;
56
57 /**
58  * This class provides communication between openHAB and the openHAB Cloud service.
59  * It also implements async http proxy for serving requests from user to
60  * openHAB through the openHAB Cloud. It uses Socket.IO connection to connect to
61  * the openHAB Cloud service and Jetty Http client to send local http requests to
62  * openHAB.
63  *
64  * @author Victor Belov - Initial contribution
65  * @author Kai Kreuzer - migrated code to new Jetty client and ESH APIs
66  */
67 public class CloudClient {
68     /*
69      * Logger for this class
70      */
71     private final Logger logger = LoggerFactory.getLogger(CloudClient.class);
72
73     /*
74      * This variable holds base URL for the openHAB Cloud connections
75      */
76     private final String baseURL;
77
78     /*
79      * This variable holds openHAB's UUID for authenticating and connecting to the openHAB Cloud
80      */
81     private final String uuid;
82
83     /*
84      * This variable holds openHAB's secret for authenticating and connecting to the openHAB Cloud
85      */
86     private final String secret;
87
88     /*
89      * This variable holds local openHAB's base URL for connecting to the local openHAB instance
90      */
91     private final String localBaseUrl;
92
93     /*
94      * This variable holds instance of Jetty HTTP client to make requests to local openHAB
95      */
96     private final HttpClient jettyClient;
97
98     /*
99      * This map holds HTTP requests to local openHAB which are currently running
100      */
101     private final Map<Integer, Request> runningRequests = new ConcurrentHashMap<>();
102
103     /*
104      * This variable indicates if connection to the openHAB Cloud is currently in an established state
105      */
106     private boolean isConnected;
107
108     /*
109      * This variable holds version of local openHAB
110      */
111     private String openHABVersion;
112
113     /*
114      * This variable holds instance of Socket.IO client class which provides communication
115      * with the openHAB Cloud
116      */
117     private Socket socket;
118
119     /*
120      * The protocol of the openHAB-cloud URL.
121      */
122     private String protocol = "https";
123
124     /*
125      * This variable holds instance of CloudClientListener which provides callbacks to communicate
126      * certain events from the openHAB Cloud back to openHAB
127      */
128     private CloudClientListener listener;
129     private boolean remoteAccessEnabled;
130     private Set<String> exposedItems;
131
132     /**
133      * Constructor of CloudClient
134      *
135      * @param uuid openHAB's UUID to connect to the openHAB Cloud
136      * @param secret openHAB's Secret to connect to the openHAB Cloud
137      * @param remoteAccessEnabled Allow the openHAB Cloud to be used as a remote proxy
138      * @param exposedItems Items that are made available to apps connected to the openHAB Cloud
139      */
140     public CloudClient(HttpClient httpClient, String uuid, String secret, String baseURL, String localBaseUrl,
141             boolean remoteAccessEnabled, Set<String> exposedItems) {
142         this.uuid = uuid;
143         this.secret = secret;
144         this.baseURL = baseURL;
145         this.localBaseUrl = localBaseUrl;
146         this.remoteAccessEnabled = remoteAccessEnabled;
147         this.exposedItems = exposedItems;
148         this.jettyClient = httpClient;
149     }
150
151     /**
152      * Connect to the openHAB Cloud
153      */
154
155     public void connect() {
156         try {
157             socket = IO.socket(baseURL);
158             URL parsed = new URL(baseURL);
159             protocol = parsed.getProtocol();
160         } catch (URISyntaxException e) {
161             logger.error("Error creating Socket.IO: {}", e.getMessage());
162         } catch (MalformedURLException e) {
163             logger.error("Error parsing baseURL to get protocol, assuming https. Error: {}", e.getMessage());
164         }
165         socket.io().on(Manager.EVENT_TRANSPORT, new Emitter.Listener() {
166             @Override
167             public void call(Object... args) {
168                 logger.trace("Manager.EVENT_TRANSPORT");
169                 Transport transport = (Transport) args[0];
170                 transport.on(Transport.EVENT_REQUEST_HEADERS, new Emitter.Listener() {
171                     @Override
172                     public void call(Object... args) {
173                         logger.trace("Transport.EVENT_REQUEST_HEADERS");
174                         @SuppressWarnings("unchecked")
175                         Map<String, List<String>> headers = (Map<String, List<String>>) args[0];
176                         headers.put("uuid", List.of(uuid));
177                         headers.put("secret", List.of(secret));
178                         headers.put("openhabversion", List.of(OpenHAB.getVersion()));
179                         headers.put("clientversion", List.of(CloudService.clientVersion));
180                         headers.put("remoteaccess", List.of(((Boolean) remoteAccessEnabled).toString()));
181                     }
182                 });
183             }
184         });
185         socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
186             @Override
187             public void call(Object... args) {
188                 logger.debug("Socket.IO connected");
189                 isConnected = true;
190                 onConnect();
191             }
192         }).on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {
193             @Override
194             public void call(Object... args) {
195                 logger.debug("Socket.IO disconnected");
196                 isConnected = false;
197                 onDisconnect();
198             }
199         }).on(Socket.EVENT_ERROR, new Emitter.Listener() {
200             @Override
201             public void call(Object... args) {
202                 logger.error("Error connecting to the openHAB Cloud instance: {}", args[0]);
203             }
204         }).on("request", new Emitter.Listener() {
205             @Override
206             public void call(Object... args) {
207                 onEvent("request", (JSONObject) args[0]);
208             }
209         }).on("cancel", new Emitter.Listener() {
210             @Override
211             public void call(Object... args) {
212                 onEvent("cancel", (JSONObject) args[0]);
213             }
214         }).on("command", new Emitter.Listener() {
215
216             @Override
217             public void call(Object... args) {
218                 onEvent("command", (JSONObject) args[0]);
219             }
220         });
221         socket.connect();
222     }
223
224     /**
225      * Callback method for socket.io client which is called when connection is established
226      */
227
228     public void onConnect() {
229         logger.info("Connected to the openHAB Cloud service (UUID = {}, base URL = {})", this.uuid, this.localBaseUrl);
230         isConnected = true;
231     }
232
233     /**
234      * Callback method for socket.io client which is called when disconnect occurs
235      */
236
237     public void onDisconnect() {
238         logger.info("Disconnected from the openHAB Cloud service (UUID = {}, base URL = {})", this.uuid,
239                 this.localBaseUrl);
240         isConnected = false;
241         // And clean up the list of running requests
242         runningRequests.clear();
243     }
244
245     /**
246      * Callback method for socket.io client which is called when an error occurs
247      */
248
249     public void onError(IOException error) {
250         logger.debug("{}", error.getMessage());
251     }
252
253     /**
254      * Callback method for socket.io client which is called when a message is received
255      */
256
257     public void onEvent(String event, JSONObject data) {
258         logger.debug("on(): {}", event);
259         if ("command".equals(event)) {
260             handleCommandEvent(data);
261             return;
262         }
263         if (remoteAccessEnabled) {
264             if ("request".equals(event)) {
265                 handleRequestEvent(data);
266             } else if ("cancel".equals(event)) {
267                 handleCancelEvent(data);
268             } else {
269                 logger.warn("Unsupported event from openHAB Cloud: {}", event);
270             }
271         }
272     }
273
274     private void handleRequestEvent(JSONObject data) {
275         try {
276             // Get unique request Id
277             int requestId = data.getInt("id");
278             logger.debug("Got request {}", requestId);
279             // Get request path
280             String requestPath = data.getString("path");
281             // Get request method
282             String requestMethod = data.getString("method");
283             // Get request body
284             String requestBody = data.getString("body");
285             // Get JSONObject for request headers
286             JSONObject requestHeadersJson = data.getJSONObject("headers");
287             logger.debug("{}", requestHeadersJson.toString());
288             // Get JSONObject for request query parameters
289             JSONObject requestQueryJson = data.getJSONObject("query");
290             // Create URI builder with base request URI of openHAB and path from request
291             String newPath = URIUtil.addPaths(localBaseUrl, requestPath);
292             Iterator<String> queryIterator = requestQueryJson.keys();
293             // Add query parameters to URI builder, if any
294             newPath += "?";
295             while (queryIterator.hasNext()) {
296                 String queryName = queryIterator.next();
297                 newPath += queryName;
298                 newPath += "=";
299                 newPath += URLEncoder.encode(requestQueryJson.getString(queryName), "UTF-8");
300                 if (queryIterator.hasNext()) {
301                     newPath += "&";
302                 }
303             }
304             // Finally get the future request URI
305             URI requestUri = new URI(newPath);
306             // All preparations which are common for different methods are done
307             // Now perform the request to openHAB
308             // If method is GET
309             logger.debug("Request method is {}", requestMethod);
310             Request request = jettyClient.newRequest(requestUri);
311             setRequestHeaders(request, requestHeadersJson);
312             String proto = protocol;
313             if (data.has("protocol")) {
314                 proto = data.getString("protocol");
315             }
316             request.header("X-Forwarded-Proto", proto);
317
318             if (requestMethod.equals("GET")) {
319                 request.method(HttpMethod.GET);
320             } else if (requestMethod.equals("POST")) {
321                 request.method(HttpMethod.POST);
322                 request.content(new BytesContentProvider(requestBody.getBytes()));
323             } else if (requestMethod.equals("PUT")) {
324                 request.method(HttpMethod.PUT);
325                 request.content(new BytesContentProvider(requestBody.getBytes()));
326             } else {
327                 // TODO: Reject unsupported methods
328                 logger.warn("Unsupported request method {}", requestMethod);
329                 return;
330             }
331             ResponseListener listener = new ResponseListener(requestId);
332             request.onResponseHeaders(listener).onResponseContent(listener).onRequestFailure(listener).send(listener);
333             // If successfully submitted request to http client, add it to the list of currently
334             // running requests to be able to cancel it if needed
335             runningRequests.put(requestId, request);
336         } catch (JSONException | IOException | URISyntaxException e) {
337             logger.debug("{}", e.getMessage());
338         }
339     }
340
341     private void setRequestHeaders(Request request, JSONObject requestHeadersJson) {
342         Iterator<String> headersIterator = requestHeadersJson.keys();
343         // Convert JSONObject of headers into Header ArrayList
344         while (headersIterator.hasNext()) {
345             String headerName = headersIterator.next();
346             String headerValue;
347             try {
348                 headerValue = requestHeadersJson.getString(headerName);
349                 logger.debug("Jetty set header {} = {}", headerName, headerValue);
350                 if (!headerName.equalsIgnoreCase("Content-Length")) {
351                     request.header(headerName, headerValue);
352                 }
353             } catch (JSONException e) {
354                 logger.warn("Error processing request headers: {}", e.getMessage());
355             }
356         }
357     }
358
359     private void handleCancelEvent(JSONObject data) {
360         try {
361             int requestId = data.getInt("id");
362             logger.debug("Received cancel for request {}", requestId);
363             // Find and abort running request
364             Request request = runningRequests.get(requestId);
365             if (request != null) {
366                 request.abort(new InterruptedException());
367                 runningRequests.remove(requestId);
368             }
369         } catch (JSONException e) {
370             logger.debug("{}", e.getMessage());
371         }
372     }
373
374     private void handleCommandEvent(JSONObject data) {
375         String itemName = data.getString("item");
376         if (exposedItems.contains(itemName)) {
377             try {
378                 logger.debug("Received command {} for item {}.", data.getString("command"), itemName);
379                 if (this.listener != null) {
380                     this.listener.sendCommand(itemName, data.getString("command"));
381                 }
382             } catch (JSONException e) {
383                 logger.debug("{}", e.getMessage());
384             }
385         } else {
386             logger.warn("Received command from openHAB Cloud for item '{}', which is not exposed.", itemName);
387         }
388     }
389
390     /**
391      * This method sends notification to the openHAB Cloud
392      *
393      * @param userId openHAB Cloud user id
394      * @param message notification message text
395      * @param icon name of the icon for this notification
396      * @param severity severity name for this notification
397      */
398     public void sendNotification(String userId, String message, @Nullable String icon, @Nullable String severity) {
399         if (isConnected()) {
400             JSONObject notificationMessage = new JSONObject();
401             try {
402                 notificationMessage.put("userId", userId);
403                 notificationMessage.put("message", message);
404                 notificationMessage.put("icon", icon);
405                 notificationMessage.put("severity", severity);
406                 socket.emit("notification", notificationMessage);
407             } catch (JSONException e) {
408                 logger.debug("{}", e.getMessage());
409             }
410         } else {
411             logger.debug("No connection, notification is not sent");
412         }
413     }
414
415     /**
416      * This method sends log notification to the openHAB Cloud
417      *
418      * @param message notification message text
419      * @param icon name of the icon for this notification
420      * @param severity severity name for this notification
421      */
422     public void sendLogNotification(String message, @Nullable String icon, @Nullable String severity) {
423         if (isConnected()) {
424             JSONObject notificationMessage = new JSONObject();
425             try {
426                 notificationMessage.put("message", message);
427                 notificationMessage.put("icon", icon);
428                 notificationMessage.put("severity", severity);
429                 socket.emit("lognotification", notificationMessage);
430             } catch (JSONException e) {
431                 logger.debug("{}", e.getMessage());
432             }
433         } else {
434             logger.debug("No connection, notification is not sent");
435         }
436     }
437
438     /**
439      * This method sends broadcast notification to the openHAB Cloud
440      *
441      * @param message notification message text
442      * @param icon name of the icon for this notification
443      * @param severity severity name for this notification
444      */
445     public void sendBroadcastNotification(String message, @Nullable String icon, @Nullable String severity) {
446         if (isConnected()) {
447             JSONObject notificationMessage = new JSONObject();
448             try {
449                 notificationMessage.put("message", message);
450                 notificationMessage.put("icon", icon);
451                 notificationMessage.put("severity", severity);
452                 socket.emit("broadcastnotification", notificationMessage);
453             } catch (JSONException e) {
454                 logger.debug("{}", e.getMessage());
455             }
456         } else {
457             logger.debug("No connection, notification is not sent");
458         }
459     }
460
461     /**
462      * Send item update to openHAB Cloud
463      *
464      * @param itemName the name of the item
465      * @param itemState updated item state
466      *
467      */
468     public void sendItemUpdate(String itemName, String itemState) {
469         if (isConnected()) {
470             logger.debug("Sending update '{}' for item '{}'", itemState, itemName);
471             JSONObject itemUpdateMessage = new JSONObject();
472             try {
473                 itemUpdateMessage.put("itemName", itemName);
474                 itemUpdateMessage.put("itemStatus", itemState);
475                 socket.emit("itemupdate", itemUpdateMessage);
476             } catch (JSONException e) {
477                 logger.debug("{}", e.getMessage());
478             }
479         } else {
480             logger.debug("No connection, Item update is not sent");
481         }
482     }
483
484     /**
485      * Returns true if openHAB Cloud connection is active
486      */
487     public boolean isConnected() {
488         return isConnected;
489     }
490
491     /**
492      * Disconnect from openHAB Cloud
493      */
494     public void shutdown() {
495         logger.info("Shutting down openHAB Cloud service connection");
496         socket.disconnect();
497     }
498
499     public String getOpenHABVersion() {
500         return openHABVersion;
501     }
502
503     public void setOpenHABVersion(String openHABVersion) {
504         this.openHABVersion = openHABVersion;
505     }
506
507     public void setListener(CloudClientListener listener) {
508         this.listener = listener;
509     }
510
511     /*
512      * An internal class which forwards response headers and data back to the openHAB Cloud
513      */
514     private class ResponseListener
515             implements Response.CompleteListener, HeadersListener, ContentListener, FailureListener {
516
517         private static final String THREADPOOL_OPENHABCLOUD = "openhabcloud";
518         private int mRequestId;
519         private boolean mHeadersSent = false;
520
521         public ResponseListener(int requestId) {
522             mRequestId = requestId;
523         }
524
525         private JSONObject getJSONHeaders(HttpFields httpFields) {
526             JSONObject headersJSON = new JSONObject();
527             try {
528                 for (HttpField field : httpFields) {
529                     headersJSON.put(field.getName(), field.getValue());
530                 }
531             } catch (JSONException e) {
532                 logger.warn("Error forming response headers: {}", e.getMessage());
533             }
534             return headersJSON;
535         }
536
537         @Override
538         public void onComplete(Result result) {
539             // Remove this request from list of running requests
540             runningRequests.remove(mRequestId);
541             if ((result != null && result.isFailed())
542                     && (result.getResponse() != null && result.getResponse().getStatus() != HttpStatus.OK_200)) {
543                 if (result.getFailure() != null) {
544                     logger.warn("Jetty request {} failed: {}", mRequestId, result.getFailure().getMessage());
545                 }
546                 if (result.getRequestFailure() != null) {
547                     logger.warn("Request Failure: {}", result.getRequestFailure().getMessage());
548                 }
549                 if (result.getResponseFailure() != null) {
550                     logger.warn("Response Failure: {}", result.getResponseFailure().getMessage());
551                 }
552             }
553
554             /**
555              * What is this? In some cases where latency is very low the myopenhab service
556              * can receive responseFinished before the headers or content are received and I
557              * cannot find another workaround to prevent it.
558              */
559             ThreadPoolManager.getScheduledPool(THREADPOOL_OPENHABCLOUD).schedule(() -> {
560                 JSONObject responseJson = new JSONObject();
561                 try {
562                     responseJson.put("id", mRequestId);
563                     socket.emit("responseFinished", responseJson);
564                     logger.debug("Finished responding to request {}", mRequestId);
565                 } catch (JSONException e) {
566                     logger.debug("{}", e.getMessage());
567                 }
568             }, 1, TimeUnit.MILLISECONDS);
569         }
570
571         @Override
572         public synchronized void onFailure(Request request, Throwable failure) {
573             JSONObject responseJson = new JSONObject();
574             try {
575                 responseJson.put("id", mRequestId);
576                 responseJson.put("responseStatusText", "openHAB connection error: " + failure.getMessage());
577                 socket.emit("responseError", responseJson);
578             } catch (JSONException e) {
579                 logger.debug("{}", e.getMessage());
580             }
581         }
582
583         @Override
584         public void onContent(Response response, ByteBuffer content) {
585             logger.debug("Jetty received response content of size {}", String.valueOf(content.remaining()));
586             JSONObject responseJson = new JSONObject();
587             try {
588                 responseJson.put("id", mRequestId);
589                 responseJson.put("body", BufferUtil.toArray(content));
590                 socket.emit("responseContentBinary", responseJson);
591                 logger.debug("Sent content to request {}", mRequestId);
592             } catch (JSONException e) {
593                 logger.debug("{}", e.getMessage());
594             }
595         }
596
597         @Override
598         public void onHeaders(Response response) {
599             if (!mHeadersSent) {
600                 logger.debug("Jetty finished receiving response header");
601                 JSONObject responseJson = new JSONObject();
602                 mHeadersSent = true;
603                 try {
604                     responseJson.put("id", mRequestId);
605                     responseJson.put("headers", getJSONHeaders(response.getHeaders()));
606                     responseJson.put("responseStatusCode", response.getStatus());
607                     responseJson.put("responseStatusText", "OK");
608                     socket.emit("responseHeader", responseJson);
609                     logger.debug("Sent headers to request {}", mRequestId);
610                     logger.debug("{}", responseJson.toString());
611                 } catch (JSONException e) {
612                     logger.debug("{}", e.getMessage());
613                 }
614             }
615         }
616     }
617 }