]> git.basschouten.com Git - openhab-addons.git/blob
950b97ca42a0fb8354bfda32279ca90c85d4ed4b
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.persistence.influxdb.internal.influx2;
14
15 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.*;
16
17 import java.time.Instant;
18 import java.util.Collections;
19 import java.util.LinkedHashMap;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.stream.Collectors;
23 import java.util.stream.Stream;
24
25 import org.eclipse.jdt.annotation.NonNullByDefault;
26 import org.eclipse.jdt.annotation.Nullable;
27 import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
28 import org.openhab.persistence.influxdb.internal.InfluxDBConstants;
29 import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
30 import org.openhab.persistence.influxdb.internal.InfluxPoint;
31 import org.openhab.persistence.influxdb.internal.InfluxRow;
32 import org.openhab.persistence.influxdb.internal.UnnexpectedConditionException;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 import com.influxdb.client.InfluxDBClient;
37 import com.influxdb.client.InfluxDBClientFactory;
38 import com.influxdb.client.InfluxDBClientOptions;
39 import com.influxdb.client.QueryApi;
40 import com.influxdb.client.WriteApi;
41 import com.influxdb.client.domain.Ready;
42 import com.influxdb.client.domain.WritePrecision;
43 import com.influxdb.client.write.Point;
44 import com.influxdb.query.FluxTable;
45
46 /**
47  * Implementation of {@link InfluxDBRepository} for InfluxDB 2.0
48  *
49  * @author Joan Pujol Espinar - Initial contribution
50  */
51 @NonNullByDefault
52 public class InfluxDB2RepositoryImpl implements InfluxDBRepository {
53     private final Logger logger = LoggerFactory.getLogger(InfluxDB2RepositoryImpl.class);
54     private InfluxDBConfiguration configuration;
55     @Nullable
56     private InfluxDBClient client;
57     @Nullable
58     private QueryApi queryAPI;
59     @Nullable
60     private WriteApi writeAPI;
61
62     public InfluxDB2RepositoryImpl(InfluxDBConfiguration configuration) {
63         this.configuration = configuration;
64     }
65
66     /**
67      * Returns if the client has been successfully connected to server
68      *
69      * @return True if it's connected, otherwise false
70      */
71     @Override
72     public boolean isConnected() {
73         return client != null;
74     }
75
76     /**
77      * Connect to InfluxDB server
78      *
79      * @return True if successful, otherwise false
80      */
81     @Override
82     public boolean connect() {
83         InfluxDBClientOptions.Builder optionsBuilder = InfluxDBClientOptions.builder().url(configuration.getUrl())
84                 .org(configuration.getDatabaseName()).bucket(configuration.getRetentionPolicy());
85         char[] token = configuration.getTokenAsCharArray();
86         if (token.length > 0) {
87             optionsBuilder.authenticateToken(token);
88         } else {
89             optionsBuilder.authenticate(configuration.getUser(), configuration.getPassword().toCharArray());
90         }
91         InfluxDBClientOptions clientOptions = optionsBuilder.build();
92
93         final InfluxDBClient createdClient = InfluxDBClientFactory.create(clientOptions);
94         this.client = createdClient;
95         logger.debug("Succesfully connected to InfluxDB. Instance ready={}", createdClient.ready());
96         queryAPI = createdClient.getQueryApi();
97         writeAPI = createdClient.getWriteApi();
98         return checkConnectionStatus();
99     }
100
101     /**
102      * Disconnect from InfluxDB server
103      */
104     @Override
105     public void disconnect() {
106         final InfluxDBClient currentClient = this.client;
107         if (currentClient != null) {
108             currentClient.close();
109         }
110         this.client = null;
111     }
112
113     /**
114      * Check if connection is currently ready
115      *
116      * @return True if its ready, otherwise false
117      */
118     @Override
119     public boolean checkConnectionStatus() {
120         final InfluxDBClient currentClient = client;
121         if (currentClient != null) {
122             Ready ready = currentClient.ready();
123             boolean isUp = ready != null && ready.getStatus() == Ready.StatusEnum.READY;
124             if (isUp) {
125                 logger.debug("database status is OK");
126             } else {
127                 logger.warn("database not ready");
128             }
129             return isUp;
130         } else {
131             logger.warn("checkConnection: database is not connected");
132             return false;
133         }
134     }
135
136     /**
137      * Write point to database
138      *
139      * @param point
140      */
141     @Override
142     public void write(InfluxPoint point) {
143         final WriteApi currentWriteAPI = writeAPI;
144         if (currentWriteAPI != null) {
145             currentWriteAPI.writePoint(convertPointToClientFormat(point));
146         } else {
147             logger.warn("Write point {} ignored due to writeAPI isn't present", point);
148         }
149     }
150
151     private Point convertPointToClientFormat(InfluxPoint point) {
152         Point clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime(), WritePrecision.MS);
153         setPointValue(point.getValue(), clientPoint);
154         point.getTags().entrySet().forEach(e -> clientPoint.addTag(e.getKey(), e.getValue()));
155         return clientPoint;
156     }
157
158     private void setPointValue(@Nullable Object value, Point point) {
159         if (value instanceof String) {
160             point.addField(FIELD_VALUE_NAME, (String) value);
161         } else if (value instanceof Number) {
162             point.addField(FIELD_VALUE_NAME, (Number) value);
163         } else if (value instanceof Boolean) {
164             point.addField(FIELD_VALUE_NAME, (Boolean) value);
165         } else if (value == null) {
166             point.addField(FIELD_VALUE_NAME, (String) null);
167         } else {
168             throw new UnnexpectedConditionException("Not expected value type");
169         }
170     }
171
172     /**
173      * Executes Flux query
174      *
175      * @param query Query
176      * @return Query results
177      */
178     @Override
179     public List<InfluxRow> query(String query) {
180         final QueryApi currentQueryAPI = queryAPI;
181         if (currentQueryAPI != null) {
182             List<FluxTable> clientResult = currentQueryAPI.query(query);
183             return convertClientResutToRepository(clientResult);
184         } else {
185             logger.warn("Returning empty list because queryAPI isn't present");
186             return Collections.emptyList();
187         }
188     }
189
190     private List<InfluxRow> convertClientResutToRepository(List<FluxTable> clientResult) {
191         return clientResult.stream().flatMap(this::mapRawResultToHistoric).collect(Collectors.toList());
192     }
193
194     private Stream<InfluxRow> mapRawResultToHistoric(FluxTable rawRow) {
195         return rawRow.getRecords().stream().map(r -> {
196             String itemName = (String) r.getValueByKey(InfluxDBConstants.TAG_ITEM_NAME);
197             if (itemName == null) { // use measurement name if item is not tagged
198                 itemName = r.getMeasurement();
199             }
200             Object value = r.getValueByKey(COLUMN_VALUE_NAME_V2);
201             Instant time = (Instant) r.getValueByKey(COLUMN_TIME_NAME_V2);
202             return new InfluxRow(time, itemName, value);
203         });
204     }
205
206     /**
207      * Return all stored item names with it's count of stored points
208      *
209      * @return Map with <ItemName,ItemCount> entries
210      */
211     @Override
212     public Map<String, Integer> getStoredItemsCount() {
213         final QueryApi currentQueryAPI = queryAPI;
214
215         if (currentQueryAPI != null) {
216             Map<String, Integer> result = new LinkedHashMap<>();
217             // Query wrote by hand https://github.com/influxdata/influxdb-client-java/issues/75
218             String query = "from(bucket: \"" + configuration.getRetentionPolicy() + "\")\n"
219                     + "  |> range(start:-365d)\n" + "  |> filter(fn: (r) => exists r." + TAG_ITEM_NAME + " )\n"
220                     + "  |> group(columns: [\"" + TAG_ITEM_NAME + "\"], mode:\"by\")\n" + "  |> count()\n"
221                     + "  |> group()";
222
223             List<FluxTable> queryResult = currentQueryAPI.query(query);
224             queryResult.stream().findFirst().orElse(new FluxTable()).getRecords().forEach(row -> {
225                 result.put((String) row.getValueByKey(TAG_ITEM_NAME), ((Number) row.getValue()).intValue());
226             });
227             return result;
228         } else {
229             logger.warn("Returning empty result  because queryAPI isn't present");
230             return Collections.emptyMap();
231         }
232     }
233 }