2 * Copyright (c) 2010-2020 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.persistence.influxdb.internal.influx1;
15 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.COLUMN_TIME_NAME_V1;
16 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.COLUMN_VALUE_NAME_V1;
17 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.FIELD_VALUE_NAME;
19 import java.time.Instant;
20 import java.util.ArrayList;
21 import java.util.Collections;
22 import java.util.List;
24 import java.util.concurrent.TimeUnit;
26 import org.eclipse.jdt.annotation.NonNullByDefault;
27 import org.eclipse.jdt.annotation.Nullable;
28 import org.influxdb.InfluxDB;
29 import org.influxdb.InfluxDBFactory;
30 import org.influxdb.dto.Point;
31 import org.influxdb.dto.Pong;
32 import org.influxdb.dto.Query;
33 import org.influxdb.dto.QueryResult;
34 import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
35 import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
36 import org.openhab.persistence.influxdb.internal.InfluxPoint;
37 import org.openhab.persistence.influxdb.internal.InfluxRow;
38 import org.openhab.persistence.influxdb.internal.UnnexpectedConditionException;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
43 * Implementation of {@link InfluxDBRepository} for InfluxDB 1.0
45 * @author Joan Pujol Espinar - Initial contribution. Most code has been moved from
46 * {@link org.openhab.persistence.influxdb.InfluxDBPersistenceService} where it was in previous version
49 public class InfluxDB1RepositoryImpl implements InfluxDBRepository {
50 private final Logger logger = LoggerFactory.getLogger(InfluxDB1RepositoryImpl.class);
51 private InfluxDBConfiguration configuration;
53 private InfluxDB client;
55 public InfluxDB1RepositoryImpl(InfluxDBConfiguration configuration) {
56 this.configuration = configuration;
60 public boolean isConnected() {
61 return client != null;
65 public boolean connect() {
66 final InfluxDB createdClient = InfluxDBFactory.connect(configuration.getUrl(), configuration.getUser(),
67 configuration.getPassword());
68 createdClient.setDatabase(configuration.getDatabaseName());
69 createdClient.setRetentionPolicy(configuration.getRetentionPolicy());
70 createdClient.enableBatch(200, 100, TimeUnit.MILLISECONDS);
71 this.client = createdClient;
72 return checkConnectionStatus();
76 public void disconnect() {
81 public boolean checkConnectionStatus() {
82 boolean dbStatus = false;
83 final InfluxDB currentClient = client;
84 if (currentClient != null) {
86 Pong pong = currentClient.ping();
87 String version = pong.getVersion();
88 // may be check for version >= 0.9
89 if (version != null && !version.contains("unknown")) {
91 logger.debug("database status is OK, version is {}", version);
93 logger.warn("database ping error, version is: \"{}\" response time was \"{}\"", version,
94 pong.getResponseTime());
97 } catch (RuntimeException e) {
99 logger.error("database connection failed", e);
100 handleDatabaseException(e);
103 logger.warn("checkConnection: database is not connected");
108 private void handleDatabaseException(Exception e) {
109 logger.warn("database error: {}", e.getMessage(), e);
113 public void write(InfluxPoint point) {
114 final InfluxDB currentClient = this.client;
115 if (currentClient != null) {
116 Point clientPoint = convertPointToClientFormat(point);
117 currentClient.write(configuration.getDatabaseName(), configuration.getRetentionPolicy(), clientPoint);
119 logger.warn("Write point {} ignored due to client isn't connected", point);
123 private Point convertPointToClientFormat(InfluxPoint point) {
124 Point.Builder clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime().toEpochMilli(),
125 TimeUnit.MILLISECONDS);
126 setPointValue(point.getValue(), clientPoint);
127 point.getTags().entrySet().forEach(e -> clientPoint.tag(e.getKey(), e.getValue()));
128 return clientPoint.build();
131 private void setPointValue(@Nullable Object value, Point.Builder point) {
132 if (value instanceof String) {
133 point.addField(FIELD_VALUE_NAME, (String) value);
134 } else if (value instanceof Number) {
135 point.addField(FIELD_VALUE_NAME, (Number) value);
136 } else if (value instanceof Boolean) {
137 point.addField(FIELD_VALUE_NAME, (Boolean) value);
138 } else if (value == null) {
139 point.addField(FIELD_VALUE_NAME, (String) null);
141 throw new UnnexpectedConditionException("Not expected value type");
146 public List<InfluxRow> query(String query) {
147 final InfluxDB currentClient = client;
148 if (currentClient != null) {
149 Query parsedQuery = new Query(query, configuration.getDatabaseName());
150 List<QueryResult.Result> results = currentClient.query(parsedQuery, TimeUnit.MILLISECONDS).getResults();
151 return convertClientResutToRepository(results);
153 logger.warn("Returning empty list because queryAPI isn't present");
154 return Collections.emptyList();
158 private List<InfluxRow> convertClientResutToRepository(List<QueryResult.Result> results) {
159 List<InfluxRow> rows = new ArrayList<>();
160 for (QueryResult.Result result : results) {
161 List<QueryResult.Series> seriess = result.getSeries();
162 if (result.getError() != null) {
163 logger.warn("{}", result.getError());
166 if (seriess == null) {
167 logger.debug("query returned no series");
169 for (QueryResult.Series series : seriess) {
170 logger.trace("series {}", series.toString());
171 String itemName = series.getName();
172 List<List<Object>> valuess = series.getValues();
173 if (valuess == null) {
174 logger.debug("query returned no values");
176 List<String> columns = series.getColumns();
177 logger.trace("columns {}", columns);
178 if (columns != null) {
179 Integer timestampColumn = null;
180 Integer valueColumn = null;
181 for (int i = 0; i < columns.size(); i++) {
182 String columnName = columns.get(i);
183 if (columnName.equals(COLUMN_TIME_NAME_V1)) {
185 } else if (columnName.equals(COLUMN_VALUE_NAME_V1)) {
189 if (valueColumn == null || timestampColumn == null) {
190 throw new IllegalStateException("missing column");
192 for (int i = 0; i < valuess.size(); i++) {
193 Double rawTime = (Double) valuess.get(i).get(timestampColumn);
194 Instant time = Instant.ofEpochMilli(rawTime.longValue());
195 Object value = valuess.get(i).get(valueColumn);
196 logger.trace("adding historic item {}: time {} value {}", itemName, time, value);
197 rows.add(new InfluxRow(time, itemName, value));
208 public Map<String, Integer> getStoredItemsCount() {
209 return Collections.emptyMap();