2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.persistence.influxdb.internal.influx1;
15 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.COLUMN_TIME_NAME_V1;
16 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.COLUMN_VALUE_NAME_V1;
17 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.FIELD_VALUE_NAME;
18 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.TAG_ITEM_NAME;
20 import java.time.Instant;
21 import java.util.ArrayList;
22 import java.util.Collections;
23 import java.util.List;
25 import java.util.Objects;
26 import java.util.Optional;
27 import java.util.concurrent.TimeUnit;
29 import org.eclipse.jdt.annotation.NonNullByDefault;
30 import org.eclipse.jdt.annotation.Nullable;
31 import org.influxdb.InfluxDB;
32 import org.influxdb.InfluxDBFactory;
33 import org.influxdb.dto.BatchPoints;
34 import org.influxdb.dto.Point;
35 import org.influxdb.dto.Pong;
36 import org.influxdb.dto.Query;
37 import org.influxdb.dto.QueryResult;
38 import org.openhab.core.persistence.FilterCriteria;
39 import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
40 import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
41 import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
42 import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
43 import org.openhab.persistence.influxdb.internal.InfluxPoint;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
47 import com.influxdb.exceptions.InfluxException;
50 * Implementation of {@link InfluxDBRepository} for InfluxDB 1.0
52 * @author Joan Pujol Espinar - Initial contribution. Most code has been moved
54 * {@link org.openhab.persistence.influxdb.InfluxDBPersistenceService}
55 * where it was in previous version
58 public class InfluxDB1RepositoryImpl implements InfluxDBRepository {
59 private final Logger logger = LoggerFactory.getLogger(InfluxDB1RepositoryImpl.class);
60 private final InfluxDBConfiguration configuration;
61 private final InfluxDBMetadataService influxDBMetadataService;
62 private final FilterCriteriaQueryCreator queryCreator;
63 private @Nullable InfluxDB client;
65 public InfluxDB1RepositoryImpl(InfluxDBConfiguration configuration,
66 InfluxDBMetadataService influxDBMetadataService) {
67 this.configuration = configuration;
68 this.influxDBMetadataService = influxDBMetadataService;
69 this.queryCreator = new InfluxDB1FilterCriteriaQueryCreatorImpl(configuration, influxDBMetadataService);
73 public boolean isConnected() {
74 return client != null;
78 public boolean connect() {
79 final InfluxDB createdClient = InfluxDBFactory.connect(configuration.getUrl(), configuration.getUser(),
80 configuration.getPassword());
81 createdClient.setDatabase(configuration.getDatabaseName());
82 createdClient.setRetentionPolicy(configuration.getRetentionPolicy());
83 createdClient.enableBatch(200, 100, TimeUnit.MILLISECONDS);
84 this.client = createdClient;
85 return checkConnectionStatus();
89 public void disconnect() {
90 final InfluxDB currentClient = client;
91 if (currentClient != null) {
92 currentClient.close();
98 public boolean checkConnectionStatus() {
99 final InfluxDB currentClient = client;
100 if (currentClient != null) {
102 Pong pong = currentClient.ping();
103 String version = pong.getVersion();
104 // may be check for version >= 0.9
105 if (version != null && !version.contains("unknown")) {
106 logger.debug("database status is OK, version is {}", version);
109 logger.warn("database ping error, version is: \"{}\" response time was \"{}\"", version,
110 pong.getResponseTime());
112 } catch (RuntimeException e) {
113 logger.warn("database error: {}", e.getMessage(), e);
116 logger.warn("checkConnection: database is not connected");
122 public boolean write(List<InfluxPoint> influxPoints) {
123 final InfluxDB currentClient = this.client;
124 if (currentClient == null) {
128 List<Point> points = influxPoints.stream().map(this::convertPointToClientFormat).filter(Optional::isPresent)
129 .map(Optional::get).toList();
130 BatchPoints batchPoints = BatchPoints.database(configuration.getDatabaseName())
131 .retentionPolicy(configuration.getRetentionPolicy()).points(points).build();
132 currentClient.write(batchPoints);
133 } catch (InfluxException e) {
134 logger.debug("Writing to database failed", e);
141 public boolean remove(FilterCriteria filter) {
142 logger.warn("Removing data is not supported in InfluxDB v1.");
146 private Optional<Point> convertPointToClientFormat(InfluxPoint point) {
147 Point.Builder clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime().toEpochMilli(),
148 TimeUnit.MILLISECONDS);
149 Object value = point.getValue();
150 if (value instanceof String) {
151 clientPoint.addField(FIELD_VALUE_NAME, (String) value);
152 } else if (value instanceof Number) {
153 clientPoint.addField(FIELD_VALUE_NAME, (Number) value);
154 } else if (value instanceof Boolean) {
155 clientPoint.addField(FIELD_VALUE_NAME, (Boolean) value);
156 } else if (value == null) {
157 clientPoint.addField(FIELD_VALUE_NAME, "null");
159 logger.warn("Could not convert {}, discarding this datapoint", point);
160 return Optional.empty();
162 point.getTags().forEach(clientPoint::tag);
163 return Optional.of(clientPoint.build());
167 public List<InfluxRow> query(FilterCriteria filter, String retentionPolicy) {
168 final InfluxDB currentClient = client;
169 if (currentClient != null) {
170 String query = queryCreator.createQuery(filter, retentionPolicy);
171 logger.trace("Query {}", query);
172 Query parsedQuery = new Query(query, configuration.getDatabaseName());
173 List<QueryResult.Result> results = currentClient.query(parsedQuery, TimeUnit.MILLISECONDS).getResults();
174 return convertClientResultToRepository(results);
176 logger.warn("Returning empty list because queryAPI isn't present");
181 private List<InfluxRow> convertClientResultToRepository(List<QueryResult.Result> results) {
182 List<InfluxRow> rows = new ArrayList<>();
183 for (QueryResult.Result result : results) {
184 List<QueryResult.Series> allSeries = result.getSeries();
185 if (result.getError() != null) {
186 logger.warn("{}", result.getError());
189 if (allSeries == null) {
190 logger.debug("query returned no series");
192 for (QueryResult.Series series : allSeries) {
193 logger.trace("series {}", series);
194 String defaultItemName = series.getName();
195 List<List<Object>> allValues = series.getValues();
196 if (allValues == null) {
197 logger.debug("query returned no values");
199 List<String> columns = series.getColumns();
200 logger.trace("columns {}", columns);
201 if (columns != null) {
202 int timestampColumn = columns.indexOf(COLUMN_TIME_NAME_V1);
203 int valueColumn = columns.indexOf(COLUMN_VALUE_NAME_V1);
204 int itemNameColumn = columns.indexOf(TAG_ITEM_NAME);
205 if (valueColumn == -1 || timestampColumn == -1) {
206 throw new IllegalStateException("missing column");
208 for (List<Object> valueObject : allValues) {
209 Double rawTime = (Double) valueObject.get(timestampColumn);
210 Instant time = Instant.ofEpochMilli(rawTime.longValue());
211 Object value = valueObject.get(valueColumn);
212 String itemName = itemNameColumn == -1 ? defaultItemName
213 : Objects.requireNonNullElse((String) valueObject.get(itemNameColumn),
215 logger.trace("adding historic item {}: time {} value {}", itemName, time, value);
216 rows.add(new InfluxRow(time, itemName, value));
227 public Map<String, Integer> getStoredItemsCount() {
228 return Collections.emptyMap();