]> git.basschouten.com Git - openhab-addons.git/blob
0319603e5f39bf44fdc379d5968d195488790d42
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.persistence.influxdb.internal.influx1;
14
15 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.COLUMN_TIME_NAME_V1;
16 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.COLUMN_VALUE_NAME_V1;
17 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.FIELD_VALUE_NAME;
18 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.TAG_ITEM_NAME;
19
20 import java.time.Instant;
21 import java.util.ArrayList;
22 import java.util.Collections;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Objects;
26 import java.util.Optional;
27 import java.util.concurrent.TimeUnit;
28
29 import org.eclipse.jdt.annotation.NonNullByDefault;
30 import org.eclipse.jdt.annotation.Nullable;
31 import org.influxdb.InfluxDB;
32 import org.influxdb.InfluxDBFactory;
33 import org.influxdb.dto.BatchPoints;
34 import org.influxdb.dto.Point;
35 import org.influxdb.dto.Pong;
36 import org.influxdb.dto.Query;
37 import org.influxdb.dto.QueryResult;
38 import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
39 import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
40 import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
41 import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
42 import org.openhab.persistence.influxdb.internal.InfluxPoint;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 import com.influxdb.exceptions.InfluxException;
47
48 /**
49  * Implementation of {@link InfluxDBRepository} for InfluxDB 1.0
50  *
51  * @author Joan Pujol Espinar - Initial contribution. Most code has been moved
52  *         from
53  *         {@link org.openhab.persistence.influxdb.InfluxDBPersistenceService}
54  *         where it was in previous version
55  */
56 @NonNullByDefault
57 public class InfluxDB1RepositoryImpl implements InfluxDBRepository {
58     private final Logger logger = LoggerFactory.getLogger(InfluxDB1RepositoryImpl.class);
59     private final InfluxDBConfiguration configuration;
60     private final InfluxDBMetadataService influxDBMetadataService;
61     private @Nullable InfluxDB client;
62
63     public InfluxDB1RepositoryImpl(InfluxDBConfiguration configuration,
64             InfluxDBMetadataService influxDBMetadataService) {
65         this.configuration = configuration;
66         this.influxDBMetadataService = influxDBMetadataService;
67     }
68
69     @Override
70     public boolean isConnected() {
71         return client != null;
72     }
73
74     @Override
75     public boolean connect() {
76         final InfluxDB createdClient = InfluxDBFactory.connect(configuration.getUrl(), configuration.getUser(),
77                 configuration.getPassword());
78         createdClient.setDatabase(configuration.getDatabaseName());
79         createdClient.setRetentionPolicy(configuration.getRetentionPolicy());
80         createdClient.enableBatch(200, 100, TimeUnit.MILLISECONDS);
81         this.client = createdClient;
82         return checkConnectionStatus();
83     }
84
85     @Override
86     public void disconnect() {
87         final InfluxDB currentClient = client;
88         if (currentClient != null) {
89             currentClient.close();
90         }
91         this.client = null;
92     }
93
94     @Override
95     public boolean checkConnectionStatus() {
96         final InfluxDB currentClient = client;
97         if (currentClient != null) {
98             try {
99                 Pong pong = currentClient.ping();
100                 String version = pong.getVersion();
101                 // may be check for version >= 0.9
102                 if (version != null && !version.contains("unknown")) {
103                     logger.debug("database status is OK, version is {}", version);
104                     return true;
105                 } else {
106                     logger.warn("database ping error, version is: \"{}\" response time was \"{}\"", version,
107                             pong.getResponseTime());
108                 }
109             } catch (RuntimeException e) {
110                 logger.warn("database error: {}", e.getMessage(), e);
111             }
112         } else {
113             logger.warn("checkConnection: database is not connected");
114         }
115         return false;
116     }
117
118     @Override
119     public boolean write(List<InfluxPoint> influxPoints) {
120         final InfluxDB currentClient = this.client;
121         if (currentClient == null) {
122             return false;
123         }
124         try {
125             List<Point> points = influxPoints.stream().map(this::convertPointToClientFormat).filter(Optional::isPresent)
126                     .map(Optional::get).toList();
127             BatchPoints batchPoints = BatchPoints.database(configuration.getDatabaseName())
128                     .retentionPolicy(configuration.getRetentionPolicy()).points(points).build();
129             currentClient.write(batchPoints);
130         } catch (InfluxException e) {
131             logger.debug("Writing to database failed", e);
132             return false;
133         }
134         return true;
135     }
136
137     private Optional<Point> convertPointToClientFormat(InfluxPoint point) {
138         Point.Builder clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime().toEpochMilli(),
139                 TimeUnit.MILLISECONDS);
140         Object value = point.getValue();
141         if (value instanceof String) {
142             clientPoint.addField(FIELD_VALUE_NAME, (String) value);
143         } else if (value instanceof Number) {
144             clientPoint.addField(FIELD_VALUE_NAME, (Number) value);
145         } else if (value instanceof Boolean) {
146             clientPoint.addField(FIELD_VALUE_NAME, (Boolean) value);
147         } else if (value == null) {
148             clientPoint.addField(FIELD_VALUE_NAME, "null");
149         } else {
150             logger.warn("Could not convert {}, discarding this datapoint", point);
151             return Optional.empty();
152         }
153         point.getTags().forEach(clientPoint::tag);
154         return Optional.of(clientPoint.build());
155     }
156
157     @Override
158     public List<InfluxRow> query(String query) {
159         final InfluxDB currentClient = client;
160         if (currentClient != null) {
161             Query parsedQuery = new Query(query, configuration.getDatabaseName());
162             List<QueryResult.Result> results = currentClient.query(parsedQuery, TimeUnit.MILLISECONDS).getResults();
163             return convertClientResultToRepository(results);
164         } else {
165             logger.warn("Returning empty list because queryAPI isn't present");
166             return List.of();
167         }
168     }
169
170     private List<InfluxRow> convertClientResultToRepository(List<QueryResult.Result> results) {
171         List<InfluxRow> rows = new ArrayList<>();
172         for (QueryResult.Result result : results) {
173             List<QueryResult.Series> allSeries = result.getSeries();
174             if (result.getError() != null) {
175                 logger.warn("{}", result.getError());
176                 continue;
177             }
178             if (allSeries == null) {
179                 logger.debug("query returned no series");
180             } else {
181                 for (QueryResult.Series series : allSeries) {
182                     logger.trace("series {}", series);
183                     String defaultItemName = series.getName();
184                     List<List<Object>> allValues = series.getValues();
185                     if (allValues == null) {
186                         logger.debug("query returned no values");
187                     } else {
188                         List<String> columns = series.getColumns();
189                         logger.trace("columns {}", columns);
190                         if (columns != null) {
191                             int timestampColumn = columns.indexOf(COLUMN_TIME_NAME_V1);
192                             int valueColumn = columns.indexOf(COLUMN_VALUE_NAME_V1);
193                             int itemNameColumn = columns.indexOf(TAG_ITEM_NAME);
194                             if (valueColumn == -1 || timestampColumn == -1) {
195                                 throw new IllegalStateException("missing column");
196                             }
197                             for (List<Object> valueObject : allValues) {
198                                 Double rawTime = (Double) valueObject.get(timestampColumn);
199                                 Instant time = Instant.ofEpochMilli(rawTime.longValue());
200                                 Object value = valueObject.get(valueColumn);
201                                 String itemName = itemNameColumn == -1 ? defaultItemName
202                                         : Objects.requireNonNullElse((String) valueObject.get(itemNameColumn),
203                                                 defaultItemName);
204                                 logger.trace("adding historic item {}: time {} value {}", itemName, time, value);
205                                 rows.add(new InfluxRow(time, itemName, value));
206                             }
207                         }
208                     }
209                 }
210             }
211         }
212         return rows;
213     }
214
215     @Override
216     public Map<String, Integer> getStoredItemsCount() {
217         return Collections.emptyMap();
218     }
219
220     @Override
221     public FilterCriteriaQueryCreator createQueryCreator() {
222         return new InfluxDB1FilterCriteriaQueryCreatorImpl(configuration, influxDBMetadataService);
223     }
224 }