]> git.basschouten.com Git - openhab-addons.git/blob
0b21da6b1276d161ce849341564b96d1dcb74f3e
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.persistence.influxdb.internal.influx2;
14
15 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.*;
16
17 import java.time.Instant;
18 import java.time.OffsetDateTime;
19 import java.time.ZonedDateTime;
20 import java.util.Collections;
21 import java.util.LinkedHashMap;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Objects;
25 import java.util.Optional;
26 import java.util.stream.Stream;
27
28 import org.eclipse.jdt.annotation.NonNullByDefault;
29 import org.eclipse.jdt.annotation.Nullable;
30 import org.influxdb.InfluxDBIOException;
31 import org.openhab.core.persistence.FilterCriteria;
32 import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
33 import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
34 import org.openhab.persistence.influxdb.internal.InfluxDBConstants;
35 import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
36 import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
37 import org.openhab.persistence.influxdb.internal.InfluxPoint;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 import com.influxdb.client.DeleteApi;
42 import com.influxdb.client.InfluxDBClient;
43 import com.influxdb.client.InfluxDBClientFactory;
44 import com.influxdb.client.InfluxDBClientOptions;
45 import com.influxdb.client.QueryApi;
46 import com.influxdb.client.WriteApi;
47 import com.influxdb.client.domain.Ready;
48 import com.influxdb.client.domain.WritePrecision;
49 import com.influxdb.client.write.Point;
50 import com.influxdb.exceptions.InfluxException;
51 import com.influxdb.query.FluxTable;
52
53 /**
54  * Implementation of {@link InfluxDBRepository} for InfluxDB 2.0
55  *
56  * @author Joan Pujol Espinar - Initial contribution
57  */
58 @NonNullByDefault
59 public class InfluxDB2RepositoryImpl implements InfluxDBRepository {
60     private final Logger logger = LoggerFactory.getLogger(InfluxDB2RepositoryImpl.class);
61     private final InfluxDBConfiguration configuration;
62     private final InfluxDBMetadataService influxDBMetadataService;
63     private final FilterCriteriaQueryCreator queryCreator;
64
65     private @Nullable InfluxDBClient client;
66     private @Nullable QueryApi queryAPI;
67     private @Nullable WriteApi writeAPI;
68     private @Nullable DeleteApi deleteAPI;
69
70     public InfluxDB2RepositoryImpl(InfluxDBConfiguration configuration,
71             InfluxDBMetadataService influxDBMetadataService) {
72         this.configuration = configuration;
73         this.influxDBMetadataService = influxDBMetadataService;
74         this.queryCreator = new InfluxDB2FilterCriteriaQueryCreatorImpl(configuration, influxDBMetadataService);
75     }
76
77     @Override
78     public boolean isConnected() {
79         return client != null;
80     }
81
82     @Override
83     public boolean connect() {
84         InfluxDBClientOptions.Builder optionsBuilder = InfluxDBClientOptions.builder().url(configuration.getUrl())
85                 .org(configuration.getDatabaseName()).bucket(configuration.getRetentionPolicy());
86         char[] token = configuration.getToken().toCharArray();
87         if (token.length > 0) {
88             optionsBuilder.authenticateToken(token);
89         } else {
90             optionsBuilder.authenticate(configuration.getUser(), configuration.getPassword().toCharArray());
91         }
92         InfluxDBClientOptions clientOptions = optionsBuilder.build();
93
94         final InfluxDBClient createdClient = InfluxDBClientFactory.create(clientOptions);
95         this.client = createdClient;
96
97         queryAPI = createdClient.getQueryApi();
98         writeAPI = createdClient.getWriteApi();
99         deleteAPI = createdClient.getDeleteApi();
100         logger.debug("Successfully connected to InfluxDB. Instance ready={}", createdClient.ready());
101
102         return checkConnectionStatus();
103     }
104
105     @Override
106     public void disconnect() {
107         final InfluxDBClient currentClient = this.client;
108         if (currentClient != null) {
109             currentClient.close();
110         }
111         this.client = null;
112     }
113
114     @Override
115     public boolean checkConnectionStatus() {
116         final InfluxDBClient currentClient = client;
117         if (currentClient != null) {
118             Ready ready = currentClient.ready();
119             boolean isUp = ready != null && ready.getStatus() == Ready.StatusEnum.READY;
120             if (isUp) {
121                 logger.debug("database status is OK");
122             } else {
123                 logger.warn("database not ready");
124             }
125             return isUp;
126         } else {
127             logger.warn("checkConnection: database is not connected");
128             return false;
129         }
130     }
131
132     @Override
133     public boolean write(List<InfluxPoint> influxPoints) {
134         final WriteApi currentWriteAPI = writeAPI;
135         if (currentWriteAPI == null) {
136             return false;
137         }
138         try {
139             List<Point> clientPoints = influxPoints.stream().map(this::convertPointToClientFormat)
140                     .filter(Optional::isPresent).map(Optional::get).toList();
141             currentWriteAPI.writePoints(clientPoints);
142         } catch (InfluxException | InfluxDBIOException e) {
143             logger.debug("Writing to database failed", e);
144             return false;
145         }
146         return true;
147     }
148
149     @Override
150     public boolean remove(FilterCriteria filter) {
151         final DeleteApi currentDeleteApi = deleteAPI;
152         if (currentDeleteApi == null) {
153             return false;
154         }
155
156         if (filter.getState() != null) {
157             logger.warn("Deleting by value is not supported in InfluxDB v2.");
158             return false;
159         }
160         OffsetDateTime start = Objects.requireNonNullElse(filter.getBeginDate(), ZonedDateTime.now().minusYears(100))
161                 .toOffsetDateTime();
162         OffsetDateTime stop = Objects.requireNonNullElse(filter.getEndDate(), ZonedDateTime.now().plusYears(100))
163                 .toOffsetDateTime();
164
165         // create predicate
166         String predicate = "";
167         String itemName = filter.getItemName();
168         if (itemName != null) {
169             String name = influxDBMetadataService.getMeasurementNameOrDefault(itemName, itemName);
170             String measurementName = configuration.isReplaceUnderscore() ? name.replace('_', '.') : name;
171             predicate = "(_measurement=\"" + measurementName + "\")";
172         }
173
174         try {
175             deleteAPI.delete(start, stop, predicate, configuration.getRetentionPolicy(),
176                     configuration.getDatabaseName());
177         } catch (InfluxException | InfluxDBIOException e) {
178             logger.debug("Deleting from database failed", e);
179             return false;
180         }
181
182         return true;
183     }
184
185     private Optional<Point> convertPointToClientFormat(InfluxPoint point) {
186         Point clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime(), WritePrecision.MS);
187         @Nullable
188         Object value = point.getValue();
189         if (value instanceof String) {
190             clientPoint.addField(FIELD_VALUE_NAME, (String) value);
191         } else if (value instanceof Number) {
192             clientPoint.addField(FIELD_VALUE_NAME, (Number) value);
193         } else if (value instanceof Boolean) {
194             clientPoint.addField(FIELD_VALUE_NAME, (Boolean) value);
195         } else if (value == null) {
196             clientPoint.addField(FIELD_VALUE_NAME, (String) null);
197         } else {
198             logger.warn("Could not convert {}, discarding this datapoint)", clientPoint);
199             return Optional.empty();
200         }
201         point.getTags().forEach(clientPoint::addTag);
202         return Optional.of(clientPoint);
203     }
204
205     @Override
206     public List<InfluxRow> query(FilterCriteria filter, String retentionPolicy) {
207         try {
208             final QueryApi currentQueryAPI = queryAPI;
209             if (currentQueryAPI != null) {
210                 String query = queryCreator.createQuery(filter, retentionPolicy);
211                 logger.trace("Query {}", query);
212                 List<FluxTable> clientResult = currentQueryAPI.query(query);
213                 return clientResult.stream().flatMap(this::mapRawResultToHistoric).toList();
214             } else {
215                 throw new InfluxException("API not present");
216             }
217         } catch (InfluxException | InfluxDBIOException e) {
218             logger.warn("Failed to execute query '{}': {}", filter, e.getMessage());
219             return List.of();
220         }
221     }
222
223     private Stream<InfluxRow> mapRawResultToHistoric(FluxTable rawRow) {
224         return rawRow.getRecords().stream().map(r -> {
225             String itemName = (String) r.getValueByKey(InfluxDBConstants.TAG_ITEM_NAME);
226             if (itemName == null) {
227                 itemName = r.getMeasurement();
228             }
229             Object value = r.getValueByKey(COLUMN_VALUE_NAME_V2);
230             Instant time = (Instant) r.getValueByKey(COLUMN_TIME_NAME_V2);
231             return new InfluxRow(time, itemName, value);
232         });
233     }
234
235     @Override
236     public Map<String, Integer> getStoredItemsCount() {
237         final QueryApi currentQueryAPI = queryAPI;
238
239         if (currentQueryAPI != null) {
240             Map<String, Integer> result = new LinkedHashMap<>();
241             // Query wrote by hand https://github.com/influxdata/influxdb-client-java/issues/75
242             String query = "from(bucket: \"" + configuration.getRetentionPolicy() + "\")\n"
243                     + "  |> range(start:-365d)\n" + "  |> filter(fn: (r) => exists r." + TAG_ITEM_NAME + " )\n"
244                     + "  |> group(columns: [\"" + TAG_ITEM_NAME + "\"], mode:\"by\")\n" + "  |> count()\n"
245                     + "  |> group()";
246
247             List<FluxTable> queryResult = currentQueryAPI.query(query);
248             Objects.requireNonNull(queryResult.stream().findFirst().orElse(new FluxTable())).getRecords()
249                     .forEach(row -> {
250                         result.put((String) row.getValueByKey(TAG_ITEM_NAME), ((Number) row.getValue()).intValue());
251                     });
252             return result;
253         } else {
254             logger.warn("Returning empty result  because queryAPI isn't present");
255             return Collections.emptyMap();
256         }
257     }
258 }