]> git.basschouten.com Git - openhab-addons.git/blob
094c8ab25c4481f71e07039d81ad2cebb445cb28
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2021 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.persistence.influxdb.internal.influx1;
14
15 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.COLUMN_TIME_NAME_V1;
16 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.COLUMN_VALUE_NAME_V1;
17 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.FIELD_VALUE_NAME;
18 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.TAG_ITEM_NAME;
19
20 import java.time.Instant;
21 import java.util.ArrayList;
22 import java.util.Collections;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.concurrent.TimeUnit;
26
27 import org.eclipse.jdt.annotation.NonNullByDefault;
28 import org.eclipse.jdt.annotation.Nullable;
29 import org.influxdb.InfluxDB;
30 import org.influxdb.InfluxDBFactory;
31 import org.influxdb.dto.Point;
32 import org.influxdb.dto.Pong;
33 import org.influxdb.dto.Query;
34 import org.influxdb.dto.QueryResult;
35 import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
36 import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
37 import org.openhab.persistence.influxdb.internal.InfluxPoint;
38 import org.openhab.persistence.influxdb.internal.InfluxRow;
39 import org.openhab.persistence.influxdb.internal.UnnexpectedConditionException;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 /**
44  * Implementation of {@link InfluxDBRepository} for InfluxDB 1.0
45  *
46  * @author Joan Pujol Espinar - Initial contribution. Most code has been moved
47  *         from
48  *         {@link org.openhab.persistence.influxdb.InfluxDBPersistenceService}
49  *         where it was in previous version
50  */
51 @NonNullByDefault
52 public class InfluxDB1RepositoryImpl implements InfluxDBRepository {
53     private final Logger logger = LoggerFactory.getLogger(InfluxDB1RepositoryImpl.class);
54     private InfluxDBConfiguration configuration;
55     @Nullable
56     private InfluxDB client;
57
58     public InfluxDB1RepositoryImpl(InfluxDBConfiguration configuration) {
59         this.configuration = configuration;
60     }
61
62     @Override
63     public boolean isConnected() {
64         return client != null;
65     }
66
67     @Override
68     public boolean connect() {
69         final InfluxDB createdClient = InfluxDBFactory.connect(configuration.getUrl(), configuration.getUser(),
70                 configuration.getPassword());
71         createdClient.setDatabase(configuration.getDatabaseName());
72         createdClient.setRetentionPolicy(configuration.getRetentionPolicy());
73         createdClient.enableBatch(200, 100, TimeUnit.MILLISECONDS);
74         this.client = createdClient;
75         return checkConnectionStatus();
76     }
77
78     @Override
79     public void disconnect() {
80         this.client = null;
81     }
82
83     @Override
84     public boolean checkConnectionStatus() {
85         boolean dbStatus = false;
86         final InfluxDB currentClient = client;
87         if (currentClient != null) {
88             try {
89                 Pong pong = currentClient.ping();
90                 String version = pong.getVersion();
91                 // may be check for version >= 0.9
92                 if (version != null && !version.contains("unknown")) {
93                     dbStatus = true;
94                     logger.debug("database status is OK, version is {}", version);
95                 } else {
96                     logger.warn("database ping error, version is: \"{}\" response time was \"{}\"", version,
97                             pong.getResponseTime());
98                     dbStatus = false;
99                 }
100             } catch (RuntimeException e) {
101                 dbStatus = false;
102                 logger.error("database connection failed", e);
103                 handleDatabaseException(e);
104             }
105         } else {
106             logger.warn("checkConnection: database is not connected");
107         }
108         return dbStatus;
109     }
110
111     private void handleDatabaseException(Exception e) {
112         logger.warn("database error: {}", e.getMessage(), e);
113     }
114
115     @Override
116     public void write(InfluxPoint point) {
117         final InfluxDB currentClient = this.client;
118         if (currentClient != null) {
119             Point clientPoint = convertPointToClientFormat(point);
120             currentClient.write(configuration.getDatabaseName(), configuration.getRetentionPolicy(), clientPoint);
121         } else {
122             logger.warn("Write point {} ignored due to client isn't connected", point);
123         }
124     }
125
126     private Point convertPointToClientFormat(InfluxPoint point) {
127         Point.Builder clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime().toEpochMilli(),
128                 TimeUnit.MILLISECONDS);
129         setPointValue(point.getValue(), clientPoint);
130         point.getTags().entrySet().forEach(e -> clientPoint.tag(e.getKey(), e.getValue()));
131         return clientPoint.build();
132     }
133
134     private void setPointValue(@Nullable Object value, Point.Builder point) {
135         if (value instanceof String) {
136             point.addField(FIELD_VALUE_NAME, (String) value);
137         } else if (value instanceof Number) {
138             point.addField(FIELD_VALUE_NAME, (Number) value);
139         } else if (value instanceof Boolean) {
140             point.addField(FIELD_VALUE_NAME, (Boolean) value);
141         } else if (value == null) {
142             point.addField(FIELD_VALUE_NAME, (String) null);
143         } else {
144             throw new UnnexpectedConditionException("Not expected value type");
145         }
146     }
147
148     @Override
149     public List<InfluxRow> query(String query) {
150         final InfluxDB currentClient = client;
151         if (currentClient != null) {
152             Query parsedQuery = new Query(query, configuration.getDatabaseName());
153             List<QueryResult.Result> results = currentClient.query(parsedQuery, TimeUnit.MILLISECONDS).getResults();
154             return convertClientResutToRepository(results);
155         } else {
156             logger.warn("Returning empty list because queryAPI isn't present");
157             return Collections.emptyList();
158         }
159     }
160
161     private List<InfluxRow> convertClientResutToRepository(List<QueryResult.Result> results) {
162         List<InfluxRow> rows = new ArrayList<>();
163         for (QueryResult.Result result : results) {
164             List<QueryResult.Series> seriess = result.getSeries();
165             if (result.getError() != null) {
166                 logger.warn("{}", result.getError());
167                 continue;
168             }
169             if (seriess == null) {
170                 logger.debug("query returned no series");
171             } else {
172                 for (QueryResult.Series series : seriess) {
173                     logger.trace("series {}", series.toString());
174                     String itemName = series.getName();
175                     List<List<Object>> valuess = series.getValues();
176                     if (valuess == null) {
177                         logger.debug("query returned no values");
178                     } else {
179                         List<String> columns = series.getColumns();
180                         logger.trace("columns {}", columns);
181                         if (columns != null) {
182                             Integer timestampColumn = null;
183                             Integer valueColumn = null;
184                             Integer itemNameColumn = null;
185                             for (int i = 0; i < columns.size(); i++) {
186                                 String columnName = columns.get(i);
187                                 if (columnName.equals(COLUMN_TIME_NAME_V1)) {
188                                     timestampColumn = i;
189                                 } else if (columnName.equals(COLUMN_VALUE_NAME_V1)) {
190                                     valueColumn = i;
191                                 } else if (columnName.equals(TAG_ITEM_NAME)) {
192                                     itemNameColumn = i;
193                                 }
194                             }
195                             if (valueColumn == null || timestampColumn == null) {
196                                 throw new IllegalStateException("missing column");
197                             }
198                             for (int i = 0; i < valuess.size(); i++) {
199                                 Double rawTime = (Double) valuess.get(i).get(timestampColumn);
200                                 Instant time = Instant.ofEpochMilli(rawTime.longValue());
201                                 Object value = valuess.get(i).get(valueColumn);
202                                 if (itemNameColumn != null) {
203                                     itemName = (String) valuess.get(i).get(itemNameColumn);
204                                 }
205                                 logger.trace("adding historic item {}: time {} value {}", itemName, time, value);
206                                 rows.add(new InfluxRow(time, itemName, value));
207                             }
208                         }
209                     }
210                 }
211             }
212         }
213         return rows;
214     }
215
216     @Override
217     public Map<String, Integer> getStoredItemsCount() {
218         return Collections.emptyMap();
219     }
220 }