2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.persistence.influxdb.internal.influx1;
15 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.COLUMN_TIME_NAME_V1;
16 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.COLUMN_VALUE_NAME_V1;
17 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.FIELD_VALUE_NAME;
18 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.TAG_ITEM_NAME;
20 import java.time.Instant;
21 import java.util.ArrayList;
22 import java.util.Collections;
23 import java.util.List;
25 import java.util.Objects;
26 import java.util.concurrent.TimeUnit;
28 import org.eclipse.jdt.annotation.NonNullByDefault;
29 import org.eclipse.jdt.annotation.Nullable;
30 import org.influxdb.InfluxDB;
31 import org.influxdb.InfluxDBFactory;
32 import org.influxdb.dto.Point;
33 import org.influxdb.dto.Pong;
34 import org.influxdb.dto.Query;
35 import org.influxdb.dto.QueryResult;
36 import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
37 import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
38 import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
39 import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
40 import org.openhab.persistence.influxdb.internal.InfluxPoint;
41 import org.openhab.persistence.influxdb.internal.UnexpectedConditionException;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
46 * Implementation of {@link InfluxDBRepository} for InfluxDB 1.0
48 * @author Joan Pujol Espinar - Initial contribution. Most code has been moved
50 * {@link org.openhab.persistence.influxdb.InfluxDBPersistenceService}
51 * where it was in previous version
54 public class InfluxDB1RepositoryImpl implements InfluxDBRepository {
55 private final Logger logger = LoggerFactory.getLogger(InfluxDB1RepositoryImpl.class);
56 private final InfluxDBConfiguration configuration;
57 private final InfluxDBMetadataService influxDBMetadataService;
58 private @Nullable InfluxDB client;
60 public InfluxDB1RepositoryImpl(InfluxDBConfiguration configuration,
61 InfluxDBMetadataService influxDBMetadataService) {
62 this.configuration = configuration;
63 this.influxDBMetadataService = influxDBMetadataService;
67 public boolean isConnected() {
68 return client != null;
72 public boolean connect() {
73 final InfluxDB createdClient = InfluxDBFactory.connect(configuration.getUrl(), configuration.getUser(),
74 configuration.getPassword());
75 createdClient.setDatabase(configuration.getDatabaseName());
76 createdClient.setRetentionPolicy(configuration.getRetentionPolicy());
77 createdClient.enableBatch(200, 100, TimeUnit.MILLISECONDS);
78 this.client = createdClient;
79 return checkConnectionStatus();
83 public void disconnect() {
84 final InfluxDB currentClient = client;
85 if (currentClient != null) {
86 currentClient.close();
92 public boolean checkConnectionStatus() {
93 final InfluxDB currentClient = client;
94 if (currentClient != null) {
96 Pong pong = currentClient.ping();
97 String version = pong.getVersion();
98 // may be check for version >= 0.9
99 if (version != null && !version.contains("unknown")) {
100 logger.debug("database status is OK, version is {}", version);
103 logger.warn("database ping error, version is: \"{}\" response time was \"{}\"", version,
104 pong.getResponseTime());
106 } catch (RuntimeException e) {
107 logger.warn("database error: {}", e.getMessage(), e);
110 logger.warn("checkConnection: database is not connected");
116 public void write(InfluxPoint point) throws UnexpectedConditionException {
117 final InfluxDB currentClient = this.client;
118 if (currentClient != null) {
119 Point clientPoint = convertPointToClientFormat(point);
120 currentClient.write(configuration.getDatabaseName(), configuration.getRetentionPolicy(), clientPoint);
122 logger.warn("Write point {} ignored due to client isn't connected", point);
126 private Point convertPointToClientFormat(InfluxPoint point) throws UnexpectedConditionException {
127 Point.Builder clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime().toEpochMilli(),
128 TimeUnit.MILLISECONDS);
129 Object value = point.getValue();
130 if (value instanceof String) {
131 clientPoint.addField(FIELD_VALUE_NAME, (String) value);
132 } else if (value instanceof Number) {
133 clientPoint.addField(FIELD_VALUE_NAME, (Number) value);
134 } else if (value instanceof Boolean) {
135 clientPoint.addField(FIELD_VALUE_NAME, (Boolean) value);
136 } else if (value == null) {
137 clientPoint.addField(FIELD_VALUE_NAME, "null");
139 throw new UnexpectedConditionException("Not expected value type");
141 point.getTags().forEach(clientPoint::tag);
142 return clientPoint.build();
146 public List<InfluxRow> query(String query) {
147 final InfluxDB currentClient = client;
148 if (currentClient != null) {
149 Query parsedQuery = new Query(query, configuration.getDatabaseName());
150 List<QueryResult.Result> results = currentClient.query(parsedQuery, TimeUnit.MILLISECONDS).getResults();
151 return convertClientResultToRepository(results);
153 logger.warn("Returning empty list because queryAPI isn't present");
158 private List<InfluxRow> convertClientResultToRepository(List<QueryResult.Result> results) {
159 List<InfluxRow> rows = new ArrayList<>();
160 for (QueryResult.Result result : results) {
161 List<QueryResult.Series> allSeries = result.getSeries();
162 if (result.getError() != null) {
163 logger.warn("{}", result.getError());
166 if (allSeries == null) {
167 logger.debug("query returned no series");
169 for (QueryResult.Series series : allSeries) {
170 logger.trace("series {}", series);
171 String defaultItemName = series.getName();
172 List<List<Object>> allValues = series.getValues();
173 if (allValues == null) {
174 logger.debug("query returned no values");
176 List<String> columns = series.getColumns();
177 logger.trace("columns {}", columns);
178 if (columns != null) {
179 int timestampColumn = columns.indexOf(COLUMN_TIME_NAME_V1);
180 int valueColumn = columns.indexOf(COLUMN_VALUE_NAME_V1);
181 int itemNameColumn = columns.indexOf(TAG_ITEM_NAME);
182 if (valueColumn == -1 || timestampColumn == -1) {
183 throw new IllegalStateException("missing column");
185 for (List<Object> valueObject : allValues) {
186 Double rawTime = (Double) valueObject.get(timestampColumn);
187 Instant time = Instant.ofEpochMilli(rawTime.longValue());
188 Object value = valueObject.get(valueColumn);
189 String itemName = itemNameColumn == -1 ? defaultItemName
190 : Objects.requireNonNullElse((String) valueObject.get(itemNameColumn),
192 logger.trace("adding historic item {}: time {} value {}", itemName, time, value);
193 rows.add(new InfluxRow(time, itemName, value));
204 public Map<String, Integer> getStoredItemsCount() {
205 return Collections.emptyMap();
209 public FilterCriteriaQueryCreator createQueryCreator() {
210 return new InfluxDB1FilterCriteriaQueryCreatorImpl(configuration, influxDBMetadataService);