2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.persistence.influxdb.internal.influx1;
15 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.COLUMN_TIME_NAME_V1;
16 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.COLUMN_VALUE_NAME_V1;
17 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.FIELD_VALUE_NAME;
18 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.TAG_ITEM_NAME;
20 import java.time.Instant;
21 import java.util.ArrayList;
22 import java.util.Collections;
23 import java.util.List;
25 import java.util.Objects;
26 import java.util.Optional;
27 import java.util.concurrent.TimeUnit;
29 import org.eclipse.jdt.annotation.NonNullByDefault;
30 import org.eclipse.jdt.annotation.Nullable;
31 import org.influxdb.InfluxDB;
32 import org.influxdb.InfluxDBFactory;
33 import org.influxdb.dto.Point;
34 import org.influxdb.dto.Pong;
35 import org.influxdb.dto.Query;
36 import org.influxdb.dto.QueryResult;
37 import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
38 import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
39 import org.openhab.persistence.influxdb.internal.InfluxPoint;
40 import org.openhab.persistence.influxdb.internal.InfluxRow;
41 import org.openhab.persistence.influxdb.internal.UnnexpectedConditionException;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
46 * Implementation of {@link InfluxDBRepository} for InfluxDB 1.0
48 * @author Joan Pujol Espinar - Initial contribution. Most code has been moved
50 * {@link org.openhab.persistence.influxdb.InfluxDBPersistenceService}
51 * where it was in previous version
54 public class InfluxDB1RepositoryImpl implements InfluxDBRepository {
55 private final Logger logger = LoggerFactory.getLogger(InfluxDB1RepositoryImpl.class);
56 private InfluxDBConfiguration configuration;
58 private InfluxDB client;
60 public InfluxDB1RepositoryImpl(InfluxDBConfiguration configuration) {
61 this.configuration = configuration;
65 public boolean isConnected() {
66 return client != null;
70 public boolean connect() {
71 final InfluxDB createdClient = InfluxDBFactory.connect(configuration.getUrl(), configuration.getUser(),
72 configuration.getPassword());
73 createdClient.setDatabase(configuration.getDatabaseName());
74 createdClient.setRetentionPolicy(configuration.getRetentionPolicy());
75 createdClient.enableBatch(200, 100, TimeUnit.MILLISECONDS);
76 this.client = createdClient;
77 return checkConnectionStatus();
81 public void disconnect() {
86 public boolean checkConnectionStatus() {
87 boolean dbStatus = false;
88 final InfluxDB currentClient = client;
89 if (currentClient != null) {
91 Pong pong = currentClient.ping();
92 String version = pong.getVersion();
93 // may be check for version >= 0.9
94 if (version != null && !version.contains("unknown")) {
96 logger.debug("database status is OK, version is {}", version);
98 logger.warn("database ping error, version is: \"{}\" response time was \"{}\"", version,
99 pong.getResponseTime());
102 } catch (RuntimeException e) {
104 logger.error("database connection failed", e);
105 handleDatabaseException(e);
108 logger.warn("checkConnection: database is not connected");
113 private void handleDatabaseException(Exception e) {
114 logger.warn("database error: {}", e.getMessage(), e);
118 public void write(InfluxPoint point) {
119 final InfluxDB currentClient = this.client;
120 if (currentClient != null) {
121 Point clientPoint = convertPointToClientFormat(point);
122 currentClient.write(configuration.getDatabaseName(), configuration.getRetentionPolicy(), clientPoint);
124 logger.warn("Write point {} ignored due to client isn't connected", point);
128 private Point convertPointToClientFormat(InfluxPoint point) {
129 Point.Builder clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime().toEpochMilli(),
130 TimeUnit.MILLISECONDS);
131 setPointValue(point.getValue(), clientPoint);
132 point.getTags().entrySet().forEach(e -> clientPoint.tag(e.getKey(), e.getValue()));
133 return clientPoint.build();
136 private void setPointValue(@Nullable Object value, Point.Builder point) {
137 if (value instanceof String) {
138 point.addField(FIELD_VALUE_NAME, (String) value);
139 } else if (value instanceof Number) {
140 point.addField(FIELD_VALUE_NAME, (Number) value);
141 } else if (value instanceof Boolean) {
142 point.addField(FIELD_VALUE_NAME, (Boolean) value);
143 } else if (value == null) {
144 point.addField(FIELD_VALUE_NAME, (String) null);
146 throw new UnnexpectedConditionException("Not expected value type");
151 public List<InfluxRow> query(String query) {
152 final InfluxDB currentClient = client;
153 if (currentClient != null) {
154 Query parsedQuery = new Query(query, configuration.getDatabaseName());
155 List<QueryResult.Result> results = currentClient.query(parsedQuery, TimeUnit.MILLISECONDS).getResults();
156 return convertClientResutToRepository(results);
158 logger.warn("Returning empty list because queryAPI isn't present");
159 return Collections.emptyList();
163 private List<InfluxRow> convertClientResutToRepository(List<QueryResult.Result> results) {
164 List<InfluxRow> rows = new ArrayList<>();
165 for (QueryResult.Result result : results) {
166 List<QueryResult.Series> seriess = result.getSeries();
167 if (result.getError() != null) {
168 logger.warn("{}", result.getError());
171 if (seriess == null) {
172 logger.debug("query returned no series");
174 for (QueryResult.Series series : seriess) {
175 logger.trace("series {}", series.toString());
176 List<List<@Nullable Object>> valuess = series.getValues();
177 if (valuess == null) {
178 logger.debug("query returned no values");
180 List<String> columns = series.getColumns();
181 logger.trace("columns {}", columns);
182 if (columns != null) {
183 Integer timestampColumn = null;
184 Integer valueColumn = null;
185 Integer itemNameColumn = null;
186 for (int i = 0; i < columns.size(); i++) {
187 String columnName = columns.get(i);
188 if (columnName.equals(COLUMN_TIME_NAME_V1)) {
190 } else if (columnName.equals(COLUMN_VALUE_NAME_V1)) {
192 } else if (columnName.equals(TAG_ITEM_NAME)) {
196 if (valueColumn == null || timestampColumn == null) {
197 throw new IllegalStateException("missing column");
199 for (int i = 0; i < valuess.size(); i++) {
200 Double rawTime = (Double) Objects.requireNonNull(valuess.get(i).get(timestampColumn));
201 Instant time = Instant.ofEpochMilli(rawTime.longValue());
203 Object value = valuess.get(i).get(valueColumn);
205 String itemName = Optional.ofNullable(itemNameColumn)
206 .flatMap(inc -> Optional.ofNullable((String) valuess.get(currentI).get(inc)))
207 .orElse(series.getName());
208 logger.trace("adding historic item {}: time {} value {}", itemName, time, value);
209 rows.add(new InfluxRow(time, itemName, value));
220 public Map<String, Integer> getStoredItemsCount() {
221 return Collections.emptyMap();