2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.persistence.influxdb.internal.influx2;
15 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.*;
17 import java.time.Instant;
18 import java.time.OffsetDateTime;
19 import java.time.ZonedDateTime;
20 import java.util.Collections;
21 import java.util.LinkedHashMap;
22 import java.util.List;
24 import java.util.Objects;
25 import java.util.Optional;
26 import java.util.stream.Stream;
28 import org.eclipse.jdt.annotation.NonNullByDefault;
29 import org.eclipse.jdt.annotation.Nullable;
30 import org.influxdb.InfluxDBIOException;
31 import org.openhab.core.persistence.FilterCriteria;
32 import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
33 import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
34 import org.openhab.persistence.influxdb.internal.InfluxDBConstants;
35 import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
36 import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
37 import org.openhab.persistence.influxdb.internal.InfluxPoint;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
41 import com.influxdb.client.DeleteApi;
42 import com.influxdb.client.InfluxDBClient;
43 import com.influxdb.client.InfluxDBClientFactory;
44 import com.influxdb.client.InfluxDBClientOptions;
45 import com.influxdb.client.QueryApi;
46 import com.influxdb.client.WriteApi;
47 import com.influxdb.client.domain.HealthCheck;
48 import com.influxdb.client.domain.Ready;
49 import com.influxdb.client.domain.WritePrecision;
50 import com.influxdb.client.write.Point;
51 import com.influxdb.exceptions.InfluxException;
52 import com.influxdb.query.FluxTable;
55 * Implementation of {@link InfluxDBRepository} for InfluxDB 2.0
57 * @author Joan Pujol Espinar - Initial contribution
60 public class InfluxDB2RepositoryImpl implements InfluxDBRepository {
61 private final Logger logger = LoggerFactory.getLogger(InfluxDB2RepositoryImpl.class);
62 private final InfluxDBConfiguration configuration;
63 private final InfluxDBMetadataService influxDBMetadataService;
64 private final FilterCriteriaQueryCreator queryCreator;
66 private @Nullable InfluxDBClient client;
67 private @Nullable QueryApi queryAPI;
68 private @Nullable WriteApi writeAPI;
69 private @Nullable DeleteApi deleteAPI;
71 public InfluxDB2RepositoryImpl(InfluxDBConfiguration configuration,
72 InfluxDBMetadataService influxDBMetadataService) {
73 this.configuration = configuration;
74 this.influxDBMetadataService = influxDBMetadataService;
75 this.queryCreator = new InfluxDB2FilterCriteriaQueryCreatorImpl(configuration, influxDBMetadataService);
79 public boolean isConnected() {
80 InfluxDBClient client = this.client;
81 return client != null && client.health().getStatus() == HealthCheck.StatusEnum.PASS;
85 public boolean connect() {
86 InfluxDBClientOptions.Builder optionsBuilder = InfluxDBClientOptions.builder().url(configuration.getUrl())
87 .org(configuration.getDatabaseName()).bucket(configuration.getRetentionPolicy());
88 char[] token = configuration.getToken().toCharArray();
89 if (token.length > 0) {
90 optionsBuilder.authenticateToken(token);
92 optionsBuilder.authenticate(configuration.getUser(), configuration.getPassword().toCharArray());
94 InfluxDBClientOptions clientOptions = optionsBuilder.build();
96 final InfluxDBClient createdClient = InfluxDBClientFactory.create(clientOptions);
97 this.client = createdClient;
99 queryAPI = createdClient.getQueryApi();
100 writeAPI = createdClient.getWriteApi();
101 deleteAPI = createdClient.getDeleteApi();
102 logger.debug("Successfully connected to InfluxDB. Instance ready={}", createdClient.ready());
104 return checkConnectionStatus();
108 public void disconnect() {
109 final InfluxDBClient currentClient = this.client;
110 if (currentClient != null) {
111 currentClient.close();
117 public boolean checkConnectionStatus() {
118 final InfluxDBClient currentClient = client;
119 if (currentClient != null) {
120 Ready ready = currentClient.ready();
121 boolean isUp = ready != null && ready.getStatus() == Ready.StatusEnum.READY;
123 logger.debug("database status is OK");
125 logger.warn("database not ready");
129 logger.warn("checkConnection: database is not connected");
135 public boolean write(List<InfluxPoint> influxPoints) {
136 final WriteApi currentWriteAPI = writeAPI;
137 if (currentWriteAPI == null) {
141 List<Point> clientPoints = influxPoints.stream().map(this::convertPointToClientFormat)
142 .filter(Optional::isPresent).map(Optional::get).toList();
143 currentWriteAPI.writePoints(clientPoints);
144 } catch (InfluxException | InfluxDBIOException e) {
145 logger.debug("Writing to database failed", e);
152 public boolean remove(FilterCriteria filter) {
153 final DeleteApi currentDeleteApi = deleteAPI;
154 if (currentDeleteApi == null) {
158 if (filter.getState() != null) {
159 logger.warn("Deleting by value is not supported in InfluxDB v2.");
162 OffsetDateTime start = Objects.requireNonNullElse(filter.getBeginDate(), ZonedDateTime.now().minusYears(100))
164 OffsetDateTime stop = Objects.requireNonNullElse(filter.getEndDate(), ZonedDateTime.now().plusYears(100))
168 String predicate = "";
169 String itemName = filter.getItemName();
170 if (itemName != null) {
171 String name = influxDBMetadataService.getMeasurementNameOrDefault(itemName, itemName);
172 String measurementName = configuration.isReplaceUnderscore() ? name.replace('_', '.') : name;
173 predicate = "(_measurement=\"" + measurementName + "\")";
177 deleteAPI.delete(start, stop, predicate, configuration.getRetentionPolicy(),
178 configuration.getDatabaseName());
179 } catch (InfluxException | InfluxDBIOException e) {
180 logger.debug("Deleting from database failed", e);
187 private Optional<Point> convertPointToClientFormat(InfluxPoint point) {
188 Point clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime(), WritePrecision.MS);
190 Object value = point.getValue();
191 if (value instanceof String string) {
192 clientPoint.addField(FIELD_VALUE_NAME, string);
193 } else if (value instanceof Number number) {
194 clientPoint.addField(FIELD_VALUE_NAME, number);
195 } else if (value instanceof Boolean boolean1) {
196 clientPoint.addField(FIELD_VALUE_NAME, boolean1);
197 } else if (value == null) {
198 clientPoint.addField(FIELD_VALUE_NAME, (String) null);
200 logger.warn("Could not convert {}, discarding this datapoint)", clientPoint);
201 return Optional.empty();
203 point.getTags().forEach(clientPoint::addTag);
204 return Optional.of(clientPoint);
208 public List<InfluxRow> query(FilterCriteria filter, String retentionPolicy) {
210 final QueryApi currentQueryAPI = queryAPI;
211 if (currentQueryAPI != null) {
212 String query = queryCreator.createQuery(filter, retentionPolicy);
213 logger.trace("Query {}", query);
214 List<FluxTable> clientResult = currentQueryAPI.query(query);
215 return clientResult.stream().flatMap(this::mapRawResultToHistoric).toList();
217 throw new InfluxException("API not present");
219 } catch (InfluxException | InfluxDBIOException e) {
220 logger.warn("Failed to execute query '{}': {}", filter, e.getMessage());
225 private Stream<InfluxRow> mapRawResultToHistoric(FluxTable rawRow) {
226 return rawRow.getRecords().stream().map(r -> {
227 String itemName = (String) r.getValueByKey(InfluxDBConstants.TAG_ITEM_NAME);
228 if (itemName == null) {
229 itemName = r.getMeasurement();
231 Object value = r.getValueByKey(COLUMN_VALUE_NAME_V2);
232 Instant time = (Instant) r.getValueByKey(COLUMN_TIME_NAME_V2);
233 return new InfluxRow(time, itemName, value);
238 public Map<String, Integer> getStoredItemsCount() {
239 final QueryApi currentQueryAPI = queryAPI;
241 if (currentQueryAPI != null) {
242 Map<String, Integer> result = new LinkedHashMap<>();
243 // Query wrote by hand https://github.com/influxdata/influxdb-client-java/issues/75
244 String query = "from(bucket: \"" + configuration.getRetentionPolicy() + "\")\n"
245 + " |> range(start:-365d)\n" + " |> filter(fn: (r) => exists r." + TAG_ITEM_NAME + " )\n"
246 + " |> group(columns: [\"" + TAG_ITEM_NAME + "\"], mode:\"by\")\n" + " |> count()\n"
249 List<FluxTable> queryResult = currentQueryAPI.query(query);
250 Objects.requireNonNull(queryResult.stream().findFirst().orElse(new FluxTable())).getRecords()
252 result.put((String) row.getValueByKey(TAG_ITEM_NAME), ((Number) row.getValue()).intValue());
256 logger.warn("Returning empty result because queryAPI isn't present");
257 return Collections.emptyMap();