]> git.basschouten.com Git - openhab-addons.git/blob
f675bf8241e5749d3d586fb985360a65fc6cd3bb
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.persistence.influxdb.internal.influx2;
14
15 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.*;
16
17 import java.time.Instant;
18 import java.time.OffsetDateTime;
19 import java.time.ZonedDateTime;
20 import java.util.Collections;
21 import java.util.LinkedHashMap;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Objects;
25 import java.util.Optional;
26 import java.util.stream.Stream;
27
28 import org.eclipse.jdt.annotation.NonNullByDefault;
29 import org.eclipse.jdt.annotation.Nullable;
30 import org.openhab.core.persistence.FilterCriteria;
31 import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
32 import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
33 import org.openhab.persistence.influxdb.internal.InfluxDBConstants;
34 import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
35 import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
36 import org.openhab.persistence.influxdb.internal.InfluxPoint;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 import com.influxdb.client.DeleteApi;
41 import com.influxdb.client.InfluxDBClient;
42 import com.influxdb.client.InfluxDBClientFactory;
43 import com.influxdb.client.InfluxDBClientOptions;
44 import com.influxdb.client.QueryApi;
45 import com.influxdb.client.WriteApi;
46 import com.influxdb.client.domain.Ready;
47 import com.influxdb.client.domain.WritePrecision;
48 import com.influxdb.client.write.Point;
49 import com.influxdb.exceptions.InfluxException;
50 import com.influxdb.query.FluxTable;
51
52 /**
53  * Implementation of {@link InfluxDBRepository} for InfluxDB 2.0
54  *
55  * @author Joan Pujol Espinar - Initial contribution
56  */
57 @NonNullByDefault
58 public class InfluxDB2RepositoryImpl implements InfluxDBRepository {
59     private final Logger logger = LoggerFactory.getLogger(InfluxDB2RepositoryImpl.class);
60     private final InfluxDBConfiguration configuration;
61     private final InfluxDBMetadataService influxDBMetadataService;
62     private final FilterCriteriaQueryCreator queryCreator;
63
64     private @Nullable InfluxDBClient client;
65     private @Nullable QueryApi queryAPI;
66     private @Nullable WriteApi writeAPI;
67     private @Nullable DeleteApi deleteAPI;
68
69     public InfluxDB2RepositoryImpl(InfluxDBConfiguration configuration,
70             InfluxDBMetadataService influxDBMetadataService) {
71         this.configuration = configuration;
72         this.influxDBMetadataService = influxDBMetadataService;
73         this.queryCreator = new InfluxDB2FilterCriteriaQueryCreatorImpl(configuration, influxDBMetadataService);
74     }
75
76     @Override
77     public boolean isConnected() {
78         return client != null;
79     }
80
81     @Override
82     public boolean connect() {
83         InfluxDBClientOptions.Builder optionsBuilder = InfluxDBClientOptions.builder().url(configuration.getUrl())
84                 .org(configuration.getDatabaseName()).bucket(configuration.getRetentionPolicy());
85         char[] token = configuration.getToken().toCharArray();
86         if (token.length > 0) {
87             optionsBuilder.authenticateToken(token);
88         } else {
89             optionsBuilder.authenticate(configuration.getUser(), configuration.getPassword().toCharArray());
90         }
91         InfluxDBClientOptions clientOptions = optionsBuilder.build();
92
93         final InfluxDBClient createdClient = InfluxDBClientFactory.create(clientOptions);
94         this.client = createdClient;
95
96         queryAPI = createdClient.getQueryApi();
97         writeAPI = createdClient.getWriteApi();
98         deleteAPI = createdClient.getDeleteApi();
99         logger.debug("Successfully connected to InfluxDB. Instance ready={}", createdClient.ready());
100
101         return checkConnectionStatus();
102     }
103
104     @Override
105     public void disconnect() {
106         final InfluxDBClient currentClient = this.client;
107         if (currentClient != null) {
108             currentClient.close();
109         }
110         this.client = null;
111     }
112
113     @Override
114     public boolean checkConnectionStatus() {
115         final InfluxDBClient currentClient = client;
116         if (currentClient != null) {
117             Ready ready = currentClient.ready();
118             boolean isUp = ready != null && ready.getStatus() == Ready.StatusEnum.READY;
119             if (isUp) {
120                 logger.debug("database status is OK");
121             } else {
122                 logger.warn("database not ready");
123             }
124             return isUp;
125         } else {
126             logger.warn("checkConnection: database is not connected");
127             return false;
128         }
129     }
130
131     @Override
132     public boolean write(List<InfluxPoint> influxPoints) {
133         final WriteApi currentWriteAPI = writeAPI;
134         if (currentWriteAPI == null) {
135             return false;
136         }
137         try {
138             List<Point> clientPoints = influxPoints.stream().map(this::convertPointToClientFormat)
139                     .filter(Optional::isPresent).map(Optional::get).toList();
140             currentWriteAPI.writePoints(clientPoints);
141         } catch (InfluxException e) {
142             logger.debug("Writing to database failed", e);
143             return false;
144         }
145         return true;
146     }
147
148     @Override
149     public boolean remove(FilterCriteria filter) {
150         final DeleteApi currentDeleteApi = deleteAPI;
151         if (currentDeleteApi == null) {
152             return false;
153         }
154
155         if (filter.getState() != null) {
156             logger.warn("Deleting by value is not supported in InfluxDB v2.");
157             return false;
158         }
159         OffsetDateTime start = Objects.requireNonNullElse(filter.getBeginDate(), ZonedDateTime.now().minusYears(100))
160                 .toOffsetDateTime();
161         OffsetDateTime stop = Objects.requireNonNullElse(filter.getEndDate(), ZonedDateTime.now().plusYears(100))
162                 .toOffsetDateTime();
163
164         // create predicate
165         String predicate = "";
166         String itemName = filter.getItemName();
167         if (itemName != null) {
168             String name = influxDBMetadataService.getMeasurementNameOrDefault(itemName, itemName);
169             String measurementName = configuration.isReplaceUnderscore() ? name.replace('_', '.') : name;
170             predicate = "(_measurement=\"" + measurementName + "\")";
171         }
172
173         try {
174             deleteAPI.delete(start, stop, predicate, configuration.getRetentionPolicy(),
175                     configuration.getDatabaseName());
176         } catch (InfluxException e) {
177             logger.debug("Deleting from database failed", e);
178             return false;
179         }
180
181         return true;
182     }
183
184     private Optional<Point> convertPointToClientFormat(InfluxPoint point) {
185         Point clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime(), WritePrecision.MS);
186         @Nullable
187         Object value = point.getValue();
188         if (value instanceof String) {
189             clientPoint.addField(FIELD_VALUE_NAME, (String) value);
190         } else if (value instanceof Number) {
191             clientPoint.addField(FIELD_VALUE_NAME, (Number) value);
192         } else if (value instanceof Boolean) {
193             clientPoint.addField(FIELD_VALUE_NAME, (Boolean) value);
194         } else if (value == null) {
195             clientPoint.addField(FIELD_VALUE_NAME, (String) null);
196         } else {
197             logger.warn("Could not convert {}, discarding this datapoint)", clientPoint);
198             return Optional.empty();
199         }
200         point.getTags().forEach(clientPoint::addTag);
201         return Optional.of(clientPoint);
202     }
203
204     @Override
205     public List<InfluxRow> query(FilterCriteria filter, String retentionPolicy) {
206         final QueryApi currentQueryAPI = queryAPI;
207         if (currentQueryAPI != null) {
208             String query = queryCreator.createQuery(filter, retentionPolicy);
209             logger.trace("Query {}", query);
210             List<FluxTable> clientResult = currentQueryAPI.query(query);
211             return clientResult.stream().flatMap(this::mapRawResultToHistoric).toList();
212         } else {
213             logger.warn("Returning empty list because queryAPI isn't present");
214             return List.of();
215         }
216     }
217
218     private Stream<InfluxRow> mapRawResultToHistoric(FluxTable rawRow) {
219         return rawRow.getRecords().stream().map(r -> {
220             String itemName = (String) r.getValueByKey(InfluxDBConstants.TAG_ITEM_NAME);
221             if (itemName == null) {
222                 itemName = r.getMeasurement();
223             }
224             Object value = r.getValueByKey(COLUMN_VALUE_NAME_V2);
225             Instant time = (Instant) r.getValueByKey(COLUMN_TIME_NAME_V2);
226             return new InfluxRow(time, itemName, value);
227         });
228     }
229
230     @Override
231     public Map<String, Integer> getStoredItemsCount() {
232         final QueryApi currentQueryAPI = queryAPI;
233
234         if (currentQueryAPI != null) {
235             Map<String, Integer> result = new LinkedHashMap<>();
236             // Query wrote by hand https://github.com/influxdata/influxdb-client-java/issues/75
237             String query = "from(bucket: \"" + configuration.getRetentionPolicy() + "\")\n"
238                     + "  |> range(start:-365d)\n" + "  |> filter(fn: (r) => exists r." + TAG_ITEM_NAME + " )\n"
239                     + "  |> group(columns: [\"" + TAG_ITEM_NAME + "\"], mode:\"by\")\n" + "  |> count()\n"
240                     + "  |> group()";
241
242             List<FluxTable> queryResult = currentQueryAPI.query(query);
243             Objects.requireNonNull(queryResult.stream().findFirst().orElse(new FluxTable())).getRecords()
244                     .forEach(row -> {
245                         result.put((String) row.getValueByKey(TAG_ITEM_NAME), ((Number) row.getValue()).intValue());
246                     });
247             return result;
248         } else {
249             logger.warn("Returning empty result  because queryAPI isn't present");
250             return Collections.emptyMap();
251         }
252     }
253 }