2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.persistence.influxdb.internal.influx2;
15 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.*;
17 import java.time.Instant;
18 import java.util.Collections;
19 import java.util.LinkedHashMap;
20 import java.util.List;
22 import java.util.stream.Collectors;
23 import java.util.stream.Stream;
25 import org.eclipse.jdt.annotation.NonNullByDefault;
26 import org.eclipse.jdt.annotation.Nullable;
27 import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
28 import org.openhab.persistence.influxdb.internal.InfluxDBConstants;
29 import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
30 import org.openhab.persistence.influxdb.internal.InfluxPoint;
31 import org.openhab.persistence.influxdb.internal.InfluxRow;
32 import org.openhab.persistence.influxdb.internal.UnnexpectedConditionException;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
36 import com.influxdb.client.InfluxDBClient;
37 import com.influxdb.client.InfluxDBClientFactory;
38 import com.influxdb.client.InfluxDBClientOptions;
39 import com.influxdb.client.QueryApi;
40 import com.influxdb.client.WriteApi;
41 import com.influxdb.client.domain.Ready;
42 import com.influxdb.client.domain.WritePrecision;
43 import com.influxdb.client.write.Point;
44 import com.influxdb.query.FluxTable;
47 * Implementation of {@link InfluxDBRepository} for InfluxDB 2.0
49 * @author Joan Pujol Espinar - Initial contribution
52 public class InfluxDB2RepositoryImpl implements InfluxDBRepository {
53 private final Logger logger = LoggerFactory.getLogger(InfluxDB2RepositoryImpl.class);
54 private InfluxDBConfiguration configuration;
56 private InfluxDBClient client;
58 private QueryApi queryAPI;
60 private WriteApi writeAPI;
62 public InfluxDB2RepositoryImpl(InfluxDBConfiguration configuration) {
63 this.configuration = configuration;
67 * Returns if the client has been successfully connected to server
69 * @return True if it's connected, otherwise false
72 public boolean isConnected() {
73 return client != null;
77 * Connect to InfluxDB server
79 * @return True if successful, otherwise false
82 public boolean connect() {
83 InfluxDBClientOptions.Builder optionsBuilder = InfluxDBClientOptions.builder().url(configuration.getUrl())
84 .org(configuration.getDatabaseName()).bucket(configuration.getRetentionPolicy());
85 char[] token = configuration.getTokenAsCharArray();
86 if (token.length > 0) {
87 optionsBuilder.authenticateToken(token);
89 optionsBuilder.authenticate(configuration.getUser(), configuration.getPassword().toCharArray());
91 InfluxDBClientOptions clientOptions = optionsBuilder.build();
93 final InfluxDBClient createdClient = InfluxDBClientFactory.create(clientOptions);
94 this.client = createdClient;
95 logger.debug("Succesfully connected to InfluxDB. Instance ready={}", createdClient.ready());
96 queryAPI = createdClient.getQueryApi();
97 writeAPI = createdClient.getWriteApi();
98 return checkConnectionStatus();
102 * Disconnect from InfluxDB server
105 public void disconnect() {
106 final InfluxDBClient currentClient = this.client;
107 if (currentClient != null) {
108 currentClient.close();
114 * Check if connection is currently ready
116 * @return True if its ready, otherwise false
119 public boolean checkConnectionStatus() {
120 final InfluxDBClient currentClient = client;
121 if (currentClient != null) {
122 Ready ready = currentClient.ready();
123 boolean isUp = ready != null && ready.getStatus() == Ready.StatusEnum.READY;
125 logger.debug("database status is OK");
127 logger.warn("database not ready");
131 logger.warn("checkConnection: database is not connected");
137 * Write point to database
142 public void write(InfluxPoint point) {
143 final WriteApi currentWriteAPI = writeAPI;
144 if (currentWriteAPI != null) {
145 currentWriteAPI.writePoint(convertPointToClientFormat(point));
147 logger.warn("Write point {} ignored due to writeAPI isn't present", point);
151 private Point convertPointToClientFormat(InfluxPoint point) {
152 Point clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime(), WritePrecision.MS);
153 setPointValue(point.getValue(), clientPoint);
154 point.getTags().entrySet().forEach(e -> clientPoint.addTag(e.getKey(), e.getValue()));
158 private void setPointValue(@Nullable Object value, Point point) {
159 if (value instanceof String) {
160 point.addField(FIELD_VALUE_NAME, (String) value);
161 } else if (value instanceof Number) {
162 point.addField(FIELD_VALUE_NAME, (Number) value);
163 } else if (value instanceof Boolean) {
164 point.addField(FIELD_VALUE_NAME, (Boolean) value);
165 } else if (value == null) {
166 point.addField(FIELD_VALUE_NAME, (String) null);
168 throw new UnnexpectedConditionException("Not expected value type");
173 * Executes Flux query
176 * @return Query results
179 public List<InfluxRow> query(String query) {
180 final QueryApi currentQueryAPI = queryAPI;
181 if (currentQueryAPI != null) {
182 List<FluxTable> clientResult = currentQueryAPI.query(query);
183 return convertClientResutToRepository(clientResult);
185 logger.warn("Returning empty list because queryAPI isn't present");
186 return Collections.emptyList();
190 private List<InfluxRow> convertClientResutToRepository(List<FluxTable> clientResult) {
191 return clientResult.stream().flatMap(this::mapRawResultToHistoric).collect(Collectors.toList());
194 private Stream<InfluxRow> mapRawResultToHistoric(FluxTable rawRow) {
195 return rawRow.getRecords().stream().map(r -> {
196 String itemName = (String) r.getValueByKey(InfluxDBConstants.TAG_ITEM_NAME);
197 if (itemName == null) { // use measurement name if item is not tagged
198 itemName = r.getMeasurement();
200 Object value = r.getValueByKey(COLUMN_VALUE_NAME_V2);
201 Instant time = (Instant) r.getValueByKey(COLUMN_TIME_NAME_V2);
202 return new InfluxRow(time, itemName, value);
207 * Return all stored item names with it's count of stored points
209 * @return Map with <ItemName,ItemCount> entries
212 public Map<String, Integer> getStoredItemsCount() {
213 final QueryApi currentQueryAPI = queryAPI;
215 if (currentQueryAPI != null) {
216 Map<String, Integer> result = new LinkedHashMap<>();
217 // Query wrote by hand https://github.com/influxdata/influxdb-client-java/issues/75
218 String query = "from(bucket: \"" + configuration.getRetentionPolicy() + "\")\n"
219 + " |> range(start:-365d)\n" + " |> filter(fn: (r) => exists r." + TAG_ITEM_NAME + " )\n"
220 + " |> group(columns: [\"" + TAG_ITEM_NAME + "\"], mode:\"by\")\n" + " |> count()\n"
223 List<FluxTable> queryResult = currentQueryAPI.query(query);
224 queryResult.stream().findFirst().orElse(new FluxTable()).getRecords().forEach(row -> {
225 result.put((String) row.getValueByKey(TAG_ITEM_NAME), ((Number) row.getValue()).intValue());
229 logger.warn("Returning empty result because queryAPI isn't present");
230 return Collections.emptyMap();