]> git.basschouten.com Git - openhab-addons.git/blob
7f406168dfb1c75b24a8dbe825202655a70a2a1f
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2021 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.persistence.influxdb.internal.influx1;
14
15 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.COLUMN_TIME_NAME_V1;
16 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.COLUMN_VALUE_NAME_V1;
17 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.FIELD_VALUE_NAME;
18
19 import java.time.Instant;
20 import java.util.ArrayList;
21 import java.util.Collections;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.concurrent.TimeUnit;
25
26 import org.eclipse.jdt.annotation.NonNullByDefault;
27 import org.eclipse.jdt.annotation.Nullable;
28 import org.influxdb.InfluxDB;
29 import org.influxdb.InfluxDBFactory;
30 import org.influxdb.dto.Point;
31 import org.influxdb.dto.Pong;
32 import org.influxdb.dto.Query;
33 import org.influxdb.dto.QueryResult;
34 import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
35 import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
36 import org.openhab.persistence.influxdb.internal.InfluxPoint;
37 import org.openhab.persistence.influxdb.internal.InfluxRow;
38 import org.openhab.persistence.influxdb.internal.UnnexpectedConditionException;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 /**
43  * Implementation of {@link InfluxDBRepository} for InfluxDB 1.0
44  *
45  * @author Joan Pujol Espinar - Initial contribution. Most code has been moved from
46  *         {@link org.openhab.persistence.influxdb.InfluxDBPersistenceService} where it was in previous version
47  */
48 @NonNullByDefault
49 public class InfluxDB1RepositoryImpl implements InfluxDBRepository {
50     private final Logger logger = LoggerFactory.getLogger(InfluxDB1RepositoryImpl.class);
51     private InfluxDBConfiguration configuration;
52     @Nullable
53     private InfluxDB client;
54
55     public InfluxDB1RepositoryImpl(InfluxDBConfiguration configuration) {
56         this.configuration = configuration;
57     }
58
59     @Override
60     public boolean isConnected() {
61         return client != null;
62     }
63
64     @Override
65     public boolean connect() {
66         final InfluxDB createdClient = InfluxDBFactory.connect(configuration.getUrl(), configuration.getUser(),
67                 configuration.getPassword());
68         createdClient.setDatabase(configuration.getDatabaseName());
69         createdClient.setRetentionPolicy(configuration.getRetentionPolicy());
70         createdClient.enableBatch(200, 100, TimeUnit.MILLISECONDS);
71         this.client = createdClient;
72         return checkConnectionStatus();
73     }
74
75     @Override
76     public void disconnect() {
77         this.client = null;
78     }
79
80     @Override
81     public boolean checkConnectionStatus() {
82         boolean dbStatus = false;
83         final InfluxDB currentClient = client;
84         if (currentClient != null) {
85             try {
86                 Pong pong = currentClient.ping();
87                 String version = pong.getVersion();
88                 // may be check for version >= 0.9
89                 if (version != null && !version.contains("unknown")) {
90                     dbStatus = true;
91                     logger.debug("database status is OK, version is {}", version);
92                 } else {
93                     logger.warn("database ping error, version is: \"{}\" response time was \"{}\"", version,
94                             pong.getResponseTime());
95                     dbStatus = false;
96                 }
97             } catch (RuntimeException e) {
98                 dbStatus = false;
99                 logger.error("database connection failed", e);
100                 handleDatabaseException(e);
101             }
102         } else {
103             logger.warn("checkConnection: database is not connected");
104         }
105         return dbStatus;
106     }
107
108     private void handleDatabaseException(Exception e) {
109         logger.warn("database error: {}", e.getMessage(), e);
110     }
111
112     @Override
113     public void write(InfluxPoint point) {
114         final InfluxDB currentClient = this.client;
115         if (currentClient != null) {
116             Point clientPoint = convertPointToClientFormat(point);
117             currentClient.write(configuration.getDatabaseName(), configuration.getRetentionPolicy(), clientPoint);
118         } else {
119             logger.warn("Write point {} ignored due to client isn't connected", point);
120         }
121     }
122
123     private Point convertPointToClientFormat(InfluxPoint point) {
124         Point.Builder clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime().toEpochMilli(),
125                 TimeUnit.MILLISECONDS);
126         setPointValue(point.getValue(), clientPoint);
127         point.getTags().entrySet().forEach(e -> clientPoint.tag(e.getKey(), e.getValue()));
128         return clientPoint.build();
129     }
130
131     private void setPointValue(@Nullable Object value, Point.Builder point) {
132         if (value instanceof String) {
133             point.addField(FIELD_VALUE_NAME, (String) value);
134         } else if (value instanceof Number) {
135             point.addField(FIELD_VALUE_NAME, (Number) value);
136         } else if (value instanceof Boolean) {
137             point.addField(FIELD_VALUE_NAME, (Boolean) value);
138         } else if (value == null) {
139             point.addField(FIELD_VALUE_NAME, (String) null);
140         } else {
141             throw new UnnexpectedConditionException("Not expected value type");
142         }
143     }
144
145     @Override
146     public List<InfluxRow> query(String query) {
147         final InfluxDB currentClient = client;
148         if (currentClient != null) {
149             Query parsedQuery = new Query(query, configuration.getDatabaseName());
150             List<QueryResult.Result> results = currentClient.query(parsedQuery, TimeUnit.MILLISECONDS).getResults();
151             return convertClientResutToRepository(results);
152         } else {
153             logger.warn("Returning empty list because queryAPI isn't present");
154             return Collections.emptyList();
155         }
156     }
157
158     private List<InfluxRow> convertClientResutToRepository(List<QueryResult.Result> results) {
159         List<InfluxRow> rows = new ArrayList<>();
160         for (QueryResult.Result result : results) {
161             List<QueryResult.Series> seriess = result.getSeries();
162             if (result.getError() != null) {
163                 logger.warn("{}", result.getError());
164                 continue;
165             }
166             if (seriess == null) {
167                 logger.debug("query returned no series");
168             } else {
169                 for (QueryResult.Series series : seriess) {
170                     logger.trace("series {}", series.toString());
171                     String itemName = series.getName();
172                     List<List<Object>> valuess = series.getValues();
173                     if (valuess == null) {
174                         logger.debug("query returned no values");
175                     } else {
176                         List<String> columns = series.getColumns();
177                         logger.trace("columns {}", columns);
178                         if (columns != null) {
179                             Integer timestampColumn = null;
180                             Integer valueColumn = null;
181                             for (int i = 0; i < columns.size(); i++) {
182                                 String columnName = columns.get(i);
183                                 if (columnName.equals(COLUMN_TIME_NAME_V1)) {
184                                     timestampColumn = i;
185                                 } else if (columnName.equals(COLUMN_VALUE_NAME_V1)) {
186                                     valueColumn = i;
187                                 }
188                             }
189                             if (valueColumn == null || timestampColumn == null) {
190                                 throw new IllegalStateException("missing column");
191                             }
192                             for (int i = 0; i < valuess.size(); i++) {
193                                 Double rawTime = (Double) valuess.get(i).get(timestampColumn);
194                                 Instant time = Instant.ofEpochMilli(rawTime.longValue());
195                                 Object value = valuess.get(i).get(valueColumn);
196                                 logger.trace("adding historic item {}: time {} value {}", itemName, time, value);
197                                 rows.add(new InfluxRow(time, itemName, value));
198                             }
199                         }
200                     }
201                 }
202             }
203         }
204         return rows;
205     }
206
207     @Override
208     public Map<String, Integer> getStoredItemsCount() {
209         return Collections.emptyMap();
210     }
211 }