2 * Copyright (c) 2010-2021 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.persistence.influxdb.internal.influx1;
15 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.COLUMN_TIME_NAME_V1;
16 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.COLUMN_VALUE_NAME_V1;
17 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.FIELD_VALUE_NAME;
18 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.TAG_ITEM_NAME;
20 import java.time.Instant;
21 import java.util.ArrayList;
22 import java.util.Collections;
23 import java.util.List;
25 import java.util.concurrent.TimeUnit;
27 import org.eclipse.jdt.annotation.NonNullByDefault;
28 import org.eclipse.jdt.annotation.Nullable;
29 import org.influxdb.InfluxDB;
30 import org.influxdb.InfluxDBFactory;
31 import org.influxdb.dto.Point;
32 import org.influxdb.dto.Pong;
33 import org.influxdb.dto.Query;
34 import org.influxdb.dto.QueryResult;
35 import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
36 import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
37 import org.openhab.persistence.influxdb.internal.InfluxPoint;
38 import org.openhab.persistence.influxdb.internal.InfluxRow;
39 import org.openhab.persistence.influxdb.internal.UnnexpectedConditionException;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
44 * Implementation of {@link InfluxDBRepository} for InfluxDB 1.0
46 * @author Joan Pujol Espinar - Initial contribution. Most code has been moved
48 * {@link org.openhab.persistence.influxdb.InfluxDBPersistenceService}
49 * where it was in previous version
52 public class InfluxDB1RepositoryImpl implements InfluxDBRepository {
53 private final Logger logger = LoggerFactory.getLogger(InfluxDB1RepositoryImpl.class);
54 private InfluxDBConfiguration configuration;
56 private InfluxDB client;
58 public InfluxDB1RepositoryImpl(InfluxDBConfiguration configuration) {
59 this.configuration = configuration;
63 public boolean isConnected() {
64 return client != null;
68 public boolean connect() {
69 final InfluxDB createdClient = InfluxDBFactory.connect(configuration.getUrl(), configuration.getUser(),
70 configuration.getPassword());
71 createdClient.setDatabase(configuration.getDatabaseName());
72 createdClient.setRetentionPolicy(configuration.getRetentionPolicy());
73 createdClient.enableBatch(200, 100, TimeUnit.MILLISECONDS);
74 this.client = createdClient;
75 return checkConnectionStatus();
79 public void disconnect() {
84 public boolean checkConnectionStatus() {
85 boolean dbStatus = false;
86 final InfluxDB currentClient = client;
87 if (currentClient != null) {
89 Pong pong = currentClient.ping();
90 String version = pong.getVersion();
91 // may be check for version >= 0.9
92 if (version != null && !version.contains("unknown")) {
94 logger.debug("database status is OK, version is {}", version);
96 logger.warn("database ping error, version is: \"{}\" response time was \"{}\"", version,
97 pong.getResponseTime());
100 } catch (RuntimeException e) {
102 logger.error("database connection failed", e);
103 handleDatabaseException(e);
106 logger.warn("checkConnection: database is not connected");
111 private void handleDatabaseException(Exception e) {
112 logger.warn("database error: {}", e.getMessage(), e);
116 public void write(InfluxPoint point) {
117 final InfluxDB currentClient = this.client;
118 if (currentClient != null) {
119 Point clientPoint = convertPointToClientFormat(point);
120 currentClient.write(configuration.getDatabaseName(), configuration.getRetentionPolicy(), clientPoint);
122 logger.warn("Write point {} ignored due to client isn't connected", point);
126 private Point convertPointToClientFormat(InfluxPoint point) {
127 Point.Builder clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime().toEpochMilli(),
128 TimeUnit.MILLISECONDS);
129 setPointValue(point.getValue(), clientPoint);
130 point.getTags().entrySet().forEach(e -> clientPoint.tag(e.getKey(), e.getValue()));
131 return clientPoint.build();
134 private void setPointValue(@Nullable Object value, Point.Builder point) {
135 if (value instanceof String) {
136 point.addField(FIELD_VALUE_NAME, (String) value);
137 } else if (value instanceof Number) {
138 point.addField(FIELD_VALUE_NAME, (Number) value);
139 } else if (value instanceof Boolean) {
140 point.addField(FIELD_VALUE_NAME, (Boolean) value);
141 } else if (value == null) {
142 point.addField(FIELD_VALUE_NAME, (String) null);
144 throw new UnnexpectedConditionException("Not expected value type");
149 public List<InfluxRow> query(String query) {
150 final InfluxDB currentClient = client;
151 if (currentClient != null) {
152 Query parsedQuery = new Query(query, configuration.getDatabaseName());
153 List<QueryResult.Result> results = currentClient.query(parsedQuery, TimeUnit.MILLISECONDS).getResults();
154 return convertClientResutToRepository(results);
156 logger.warn("Returning empty list because queryAPI isn't present");
157 return Collections.emptyList();
161 private List<InfluxRow> convertClientResutToRepository(List<QueryResult.Result> results) {
162 List<InfluxRow> rows = new ArrayList<>();
163 for (QueryResult.Result result : results) {
164 List<QueryResult.Series> seriess = result.getSeries();
165 if (result.getError() != null) {
166 logger.warn("{}", result.getError());
169 if (seriess == null) {
170 logger.debug("query returned no series");
172 for (QueryResult.Series series : seriess) {
173 logger.trace("series {}", series.toString());
174 String itemName = series.getName();
175 List<List<Object>> valuess = series.getValues();
176 if (valuess == null) {
177 logger.debug("query returned no values");
179 List<String> columns = series.getColumns();
180 logger.trace("columns {}", columns);
181 if (columns != null) {
182 Integer timestampColumn = null;
183 Integer valueColumn = null;
184 Integer itemNameColumn = null;
185 for (int i = 0; i < columns.size(); i++) {
186 String columnName = columns.get(i);
187 if (columnName.equals(COLUMN_TIME_NAME_V1)) {
189 } else if (columnName.equals(COLUMN_VALUE_NAME_V1)) {
191 } else if (columnName.equals(TAG_ITEM_NAME)) {
195 if (valueColumn == null || timestampColumn == null) {
196 throw new IllegalStateException("missing column");
198 for (int i = 0; i < valuess.size(); i++) {
199 Double rawTime = (Double) valuess.get(i).get(timestampColumn);
200 Instant time = Instant.ofEpochMilli(rawTime.longValue());
201 Object value = valuess.get(i).get(valueColumn);
202 if (itemNameColumn != null) {
203 itemName = (String) valuess.get(i).get(itemNameColumn);
205 logger.trace("adding historic item {}: time {} value {}", itemName, time, value);
206 rows.add(new InfluxRow(time, itemName, value));
217 public Map<String, Integer> getStoredItemsCount() {
218 return Collections.emptyMap();