]> git.basschouten.com Git - openhab-addons.git/blob
fabae14499deafb6ecfdf19b0c170be3a3a5320f
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.persistence.influxdb.internal.influx1;
14
15 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.COLUMN_TIME_NAME_V1;
16 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.COLUMN_VALUE_NAME_V1;
17 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.FIELD_VALUE_NAME;
18 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.TAG_ITEM_NAME;
19
20 import java.time.Instant;
21 import java.util.ArrayList;
22 import java.util.Collections;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Objects;
26 import java.util.Optional;
27 import java.util.concurrent.TimeUnit;
28
29 import org.eclipse.jdt.annotation.NonNullByDefault;
30 import org.eclipse.jdt.annotation.Nullable;
31 import org.influxdb.InfluxDB;
32 import org.influxdb.InfluxDBFactory;
33 import org.influxdb.InfluxDBIOException;
34 import org.influxdb.dto.BatchPoints;
35 import org.influxdb.dto.Point;
36 import org.influxdb.dto.Pong;
37 import org.influxdb.dto.Query;
38 import org.influxdb.dto.QueryResult;
39 import org.openhab.core.persistence.FilterCriteria;
40 import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
41 import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
42 import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
43 import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
44 import org.openhab.persistence.influxdb.internal.InfluxPoint;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 import com.influxdb.exceptions.InfluxException;
49
50 /**
51  * Implementation of {@link InfluxDBRepository} for InfluxDB 1.0
52  *
53  * @author Joan Pujol Espinar - Initial contribution. Most code has been moved
54  *         from
55  *         {@link org.openhab.persistence.influxdb.InfluxDBPersistenceService}
56  *         where it was in previous version
57  */
58 @NonNullByDefault
59 public class InfluxDB1RepositoryImpl implements InfluxDBRepository {
60     private final Logger logger = LoggerFactory.getLogger(InfluxDB1RepositoryImpl.class);
61     private final InfluxDBConfiguration configuration;
62     private final InfluxDBMetadataService influxDBMetadataService;
63     private final FilterCriteriaQueryCreator queryCreator;
64     private @Nullable InfluxDB client;
65
66     public InfluxDB1RepositoryImpl(InfluxDBConfiguration configuration,
67             InfluxDBMetadataService influxDBMetadataService) {
68         this.configuration = configuration;
69         this.influxDBMetadataService = influxDBMetadataService;
70         this.queryCreator = new InfluxDB1FilterCriteriaQueryCreatorImpl(configuration, influxDBMetadataService);
71     }
72
73     @Override
74     public boolean isConnected() {
75         return client != null;
76     }
77
78     @Override
79     public boolean connect() {
80         final InfluxDB createdClient = InfluxDBFactory.connect(configuration.getUrl(), configuration.getUser(),
81                 configuration.getPassword());
82         createdClient.setDatabase(configuration.getDatabaseName());
83         createdClient.setRetentionPolicy(configuration.getRetentionPolicy());
84         createdClient.enableBatch(200, 100, TimeUnit.MILLISECONDS);
85         this.client = createdClient;
86         return checkConnectionStatus();
87     }
88
89     @Override
90     public void disconnect() {
91         final InfluxDB currentClient = client;
92         if (currentClient != null) {
93             currentClient.close();
94         }
95         this.client = null;
96     }
97
98     @Override
99     public boolean checkConnectionStatus() {
100         final InfluxDB currentClient = client;
101         if (currentClient != null) {
102             try {
103                 Pong pong = currentClient.ping();
104                 String version = pong.getVersion();
105                 // may be check for version >= 0.9
106                 if (version != null && !version.contains("unknown")) {
107                     logger.debug("database status is OK, version is {}", version);
108                     return true;
109                 } else {
110                     logger.warn("database ping error, version is: \"{}\" response time was \"{}\"", version,
111                             pong.getResponseTime());
112                 }
113             } catch (RuntimeException e) {
114                 logger.warn("database error: {}", e.getMessage(), e);
115             }
116         } else {
117             logger.warn("checkConnection: database is not connected");
118         }
119         return false;
120     }
121
122     @Override
123     public boolean write(List<InfluxPoint> influxPoints) {
124         final InfluxDB currentClient = this.client;
125         if (currentClient == null) {
126             return false;
127         }
128         try {
129             List<Point> points = influxPoints.stream().map(this::convertPointToClientFormat).filter(Optional::isPresent)
130                     .map(Optional::get).toList();
131             BatchPoints batchPoints = BatchPoints.database(configuration.getDatabaseName())
132                     .retentionPolicy(configuration.getRetentionPolicy()).points(points).build();
133             currentClient.write(batchPoints);
134         } catch (InfluxException | InfluxDBIOException e) {
135             logger.debug("Writing to database failed", e);
136             return false;
137         }
138         return true;
139     }
140
141     @Override
142     public boolean remove(FilterCriteria filter) {
143         logger.warn("Removing data is not supported in InfluxDB v1.");
144         return false;
145     }
146
147     private Optional<Point> convertPointToClientFormat(InfluxPoint point) {
148         Point.Builder clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime().toEpochMilli(),
149                 TimeUnit.MILLISECONDS);
150         Object value = point.getValue();
151         if (value instanceof String string) {
152             clientPoint.addField(FIELD_VALUE_NAME, string);
153         } else if (value instanceof Number number) {
154             clientPoint.addField(FIELD_VALUE_NAME, number);
155         } else if (value instanceof Boolean boolean1) {
156             clientPoint.addField(FIELD_VALUE_NAME, boolean1);
157         } else if (value == null) {
158             clientPoint.addField(FIELD_VALUE_NAME, "null");
159         } else {
160             logger.warn("Could not convert {}, discarding this datapoint", point);
161             return Optional.empty();
162         }
163         point.getTags().forEach(clientPoint::tag);
164         return Optional.of(clientPoint.build());
165     }
166
167     @Override
168     public List<InfluxRow> query(FilterCriteria filter, String retentionPolicy) {
169         try {
170             final InfluxDB currentClient = client;
171             if (currentClient != null) {
172                 String query = queryCreator.createQuery(filter, retentionPolicy);
173                 logger.trace("Query {}", query);
174                 Query parsedQuery = new Query(query, configuration.getDatabaseName());
175                 List<QueryResult.Result> results = currentClient.query(parsedQuery, TimeUnit.MILLISECONDS).getResults();
176                 return convertClientResultToRepository(results);
177             } else {
178                 throw new InfluxException("API not present");
179             }
180         } catch (InfluxException | InfluxDBIOException e) {
181             logger.warn("Failed to execute query '{}': {}", filter, e.getMessage());
182             return List.of();
183         }
184     }
185
186     private List<InfluxRow> convertClientResultToRepository(List<QueryResult.Result> results) {
187         List<InfluxRow> rows = new ArrayList<>();
188         for (QueryResult.Result result : results) {
189             List<QueryResult.Series> allSeries = result.getSeries();
190             if (result.getError() != null) {
191                 logger.warn("{}", result.getError());
192                 continue;
193             }
194             if (allSeries == null) {
195                 logger.debug("query returned no series");
196             } else {
197                 for (QueryResult.Series series : allSeries) {
198                     logger.trace("series {}", series);
199                     String defaultItemName = series.getName();
200                     List<List<Object>> allValues = series.getValues();
201                     if (allValues == null) {
202                         logger.debug("query returned no values");
203                     } else {
204                         List<String> columns = series.getColumns();
205                         logger.trace("columns {}", columns);
206                         if (columns != null) {
207                             int timestampColumn = columns.indexOf(COLUMN_TIME_NAME_V1);
208                             int valueColumn = columns.indexOf(COLUMN_VALUE_NAME_V1);
209                             int itemNameColumn = columns.indexOf(TAG_ITEM_NAME);
210                             if (valueColumn == -1 || timestampColumn == -1) {
211                                 throw new IllegalStateException("missing column");
212                             }
213                             for (List<Object> valueObject : allValues) {
214                                 Double rawTime = (Double) valueObject.get(timestampColumn);
215                                 Instant time = Instant.ofEpochMilli(rawTime.longValue());
216                                 Object value = valueObject.get(valueColumn);
217                                 String itemName = itemNameColumn == -1 ? defaultItemName
218                                         : Objects.requireNonNullElse((String) valueObject.get(itemNameColumn),
219                                                 defaultItemName);
220                                 logger.trace("adding historic item {}: time {} value {}", itemName, time, value);
221                                 rows.add(new InfluxRow(time, itemName, value));
222                             }
223                         }
224                     }
225                 }
226             }
227         }
228         return rows;
229     }
230
231     @Override
232     public Map<String, Integer> getStoredItemsCount() {
233         return Collections.emptyMap();
234     }
235 }