]> git.basschouten.com Git - openhab-addons.git/blob
ef66df495e683ef8e21e5a3c9db9c667086f7112
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.persistence.influxdb.internal.influx1;
14
15 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.COLUMN_TIME_NAME_V1;
16 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.COLUMN_VALUE_NAME_V1;
17 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.FIELD_VALUE_NAME;
18 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.TAG_ITEM_NAME;
19
20 import java.time.Instant;
21 import java.util.ArrayList;
22 import java.util.Collections;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Objects;
26 import java.util.Optional;
27 import java.util.concurrent.TimeUnit;
28
29 import org.eclipse.jdt.annotation.NonNullByDefault;
30 import org.eclipse.jdt.annotation.Nullable;
31 import org.influxdb.InfluxDB;
32 import org.influxdb.InfluxDBFactory;
33 import org.influxdb.dto.BatchPoints;
34 import org.influxdb.dto.Point;
35 import org.influxdb.dto.Pong;
36 import org.influxdb.dto.Query;
37 import org.influxdb.dto.QueryResult;
38 import org.openhab.core.persistence.FilterCriteria;
39 import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
40 import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
41 import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
42 import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
43 import org.openhab.persistence.influxdb.internal.InfluxPoint;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 import com.influxdb.exceptions.InfluxException;
48
49 /**
50  * Implementation of {@link InfluxDBRepository} for InfluxDB 1.0
51  *
52  * @author Joan Pujol Espinar - Initial contribution. Most code has been moved
53  *         from
54  *         {@link org.openhab.persistence.influxdb.InfluxDBPersistenceService}
55  *         where it was in previous version
56  */
57 @NonNullByDefault
58 public class InfluxDB1RepositoryImpl implements InfluxDBRepository {
59     private final Logger logger = LoggerFactory.getLogger(InfluxDB1RepositoryImpl.class);
60     private final InfluxDBConfiguration configuration;
61     private final InfluxDBMetadataService influxDBMetadataService;
62     private final FilterCriteriaQueryCreator queryCreator;
63     private @Nullable InfluxDB client;
64
65     public InfluxDB1RepositoryImpl(InfluxDBConfiguration configuration,
66             InfluxDBMetadataService influxDBMetadataService) {
67         this.configuration = configuration;
68         this.influxDBMetadataService = influxDBMetadataService;
69         this.queryCreator = new InfluxDB1FilterCriteriaQueryCreatorImpl(configuration, influxDBMetadataService);
70     }
71
72     @Override
73     public boolean isConnected() {
74         return client != null;
75     }
76
77     @Override
78     public boolean connect() {
79         final InfluxDB createdClient = InfluxDBFactory.connect(configuration.getUrl(), configuration.getUser(),
80                 configuration.getPassword());
81         createdClient.setDatabase(configuration.getDatabaseName());
82         createdClient.setRetentionPolicy(configuration.getRetentionPolicy());
83         createdClient.enableBatch(200, 100, TimeUnit.MILLISECONDS);
84         this.client = createdClient;
85         return checkConnectionStatus();
86     }
87
88     @Override
89     public void disconnect() {
90         final InfluxDB currentClient = client;
91         if (currentClient != null) {
92             currentClient.close();
93         }
94         this.client = null;
95     }
96
97     @Override
98     public boolean checkConnectionStatus() {
99         final InfluxDB currentClient = client;
100         if (currentClient != null) {
101             try {
102                 Pong pong = currentClient.ping();
103                 String version = pong.getVersion();
104                 // may be check for version >= 0.9
105                 if (version != null && !version.contains("unknown")) {
106                     logger.debug("database status is OK, version is {}", version);
107                     return true;
108                 } else {
109                     logger.warn("database ping error, version is: \"{}\" response time was \"{}\"", version,
110                             pong.getResponseTime());
111                 }
112             } catch (RuntimeException e) {
113                 logger.warn("database error: {}", e.getMessage(), e);
114             }
115         } else {
116             logger.warn("checkConnection: database is not connected");
117         }
118         return false;
119     }
120
121     @Override
122     public boolean write(List<InfluxPoint> influxPoints) {
123         final InfluxDB currentClient = this.client;
124         if (currentClient == null) {
125             return false;
126         }
127         try {
128             List<Point> points = influxPoints.stream().map(this::convertPointToClientFormat).filter(Optional::isPresent)
129                     .map(Optional::get).toList();
130             BatchPoints batchPoints = BatchPoints.database(configuration.getDatabaseName())
131                     .retentionPolicy(configuration.getRetentionPolicy()).points(points).build();
132             currentClient.write(batchPoints);
133         } catch (InfluxException e) {
134             logger.debug("Writing to database failed", e);
135             return false;
136         }
137         return true;
138     }
139
140     @Override
141     public boolean remove(FilterCriteria filter) {
142         logger.warn("Removing data is not supported in InfluxDB v1.");
143         return false;
144     }
145
146     private Optional<Point> convertPointToClientFormat(InfluxPoint point) {
147         Point.Builder clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime().toEpochMilli(),
148                 TimeUnit.MILLISECONDS);
149         Object value = point.getValue();
150         if (value instanceof String) {
151             clientPoint.addField(FIELD_VALUE_NAME, (String) value);
152         } else if (value instanceof Number) {
153             clientPoint.addField(FIELD_VALUE_NAME, (Number) value);
154         } else if (value instanceof Boolean) {
155             clientPoint.addField(FIELD_VALUE_NAME, (Boolean) value);
156         } else if (value == null) {
157             clientPoint.addField(FIELD_VALUE_NAME, "null");
158         } else {
159             logger.warn("Could not convert {}, discarding this datapoint", point);
160             return Optional.empty();
161         }
162         point.getTags().forEach(clientPoint::tag);
163         return Optional.of(clientPoint.build());
164     }
165
166     @Override
167     public List<InfluxRow> query(FilterCriteria filter, String retentionPolicy) {
168         final InfluxDB currentClient = client;
169         if (currentClient != null) {
170             String query = queryCreator.createQuery(filter, retentionPolicy);
171             logger.trace("Query {}", query);
172             Query parsedQuery = new Query(query, configuration.getDatabaseName());
173             List<QueryResult.Result> results = currentClient.query(parsedQuery, TimeUnit.MILLISECONDS).getResults();
174             return convertClientResultToRepository(results);
175         } else {
176             logger.warn("Returning empty list because queryAPI isn't present");
177             return List.of();
178         }
179     }
180
181     private List<InfluxRow> convertClientResultToRepository(List<QueryResult.Result> results) {
182         List<InfluxRow> rows = new ArrayList<>();
183         for (QueryResult.Result result : results) {
184             List<QueryResult.Series> allSeries = result.getSeries();
185             if (result.getError() != null) {
186                 logger.warn("{}", result.getError());
187                 continue;
188             }
189             if (allSeries == null) {
190                 logger.debug("query returned no series");
191             } else {
192                 for (QueryResult.Series series : allSeries) {
193                     logger.trace("series {}", series);
194                     String defaultItemName = series.getName();
195                     List<List<Object>> allValues = series.getValues();
196                     if (allValues == null) {
197                         logger.debug("query returned no values");
198                     } else {
199                         List<String> columns = series.getColumns();
200                         logger.trace("columns {}", columns);
201                         if (columns != null) {
202                             int timestampColumn = columns.indexOf(COLUMN_TIME_NAME_V1);
203                             int valueColumn = columns.indexOf(COLUMN_VALUE_NAME_V1);
204                             int itemNameColumn = columns.indexOf(TAG_ITEM_NAME);
205                             if (valueColumn == -1 || timestampColumn == -1) {
206                                 throw new IllegalStateException("missing column");
207                             }
208                             for (List<Object> valueObject : allValues) {
209                                 Double rawTime = (Double) valueObject.get(timestampColumn);
210                                 Instant time = Instant.ofEpochMilli(rawTime.longValue());
211                                 Object value = valueObject.get(valueColumn);
212                                 String itemName = itemNameColumn == -1 ? defaultItemName
213                                         : Objects.requireNonNullElse((String) valueObject.get(itemNameColumn),
214                                                 defaultItemName);
215                                 logger.trace("adding historic item {}: time {} value {}", itemName, time, value);
216                                 rows.add(new InfluxRow(time, itemName, value));
217                             }
218                         }
219                     }
220                 }
221             }
222         }
223         return rows;
224     }
225
226     @Override
227     public Map<String, Integer> getStoredItemsCount() {
228         return Collections.emptyMap();
229     }
230 }