]> git.basschouten.com Git - openhab-addons.git/blob
dda799785e65df11a1200a2214ca299659f0ade3
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.persistence.influxdb.internal.influx1;
14
15 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.COLUMN_TIME_NAME_V1;
16 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.COLUMN_VALUE_NAME_V1;
17 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.FIELD_VALUE_NAME;
18 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.TAG_ITEM_NAME;
19
20 import java.time.Instant;
21 import java.util.ArrayList;
22 import java.util.Collections;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Objects;
26 import java.util.concurrent.TimeUnit;
27
28 import org.eclipse.jdt.annotation.NonNullByDefault;
29 import org.eclipse.jdt.annotation.Nullable;
30 import org.influxdb.InfluxDB;
31 import org.influxdb.InfluxDBFactory;
32 import org.influxdb.dto.Point;
33 import org.influxdb.dto.Pong;
34 import org.influxdb.dto.Query;
35 import org.influxdb.dto.QueryResult;
36 import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
37 import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
38 import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
39 import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
40 import org.openhab.persistence.influxdb.internal.InfluxPoint;
41 import org.openhab.persistence.influxdb.internal.UnexpectedConditionException;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 /**
46  * Implementation of {@link InfluxDBRepository} for InfluxDB 1.0
47  *
48  * @author Joan Pujol Espinar - Initial contribution. Most code has been moved
49  *         from
50  *         {@link org.openhab.persistence.influxdb.InfluxDBPersistenceService}
51  *         where it was in previous version
52  */
53 @NonNullByDefault
54 public class InfluxDB1RepositoryImpl implements InfluxDBRepository {
55     private final Logger logger = LoggerFactory.getLogger(InfluxDB1RepositoryImpl.class);
56     private final InfluxDBConfiguration configuration;
57     private final InfluxDBMetadataService influxDBMetadataService;
58     private @Nullable InfluxDB client;
59
60     public InfluxDB1RepositoryImpl(InfluxDBConfiguration configuration,
61             InfluxDBMetadataService influxDBMetadataService) {
62         this.configuration = configuration;
63         this.influxDBMetadataService = influxDBMetadataService;
64     }
65
66     @Override
67     public boolean isConnected() {
68         return client != null;
69     }
70
71     @Override
72     public boolean connect() {
73         final InfluxDB createdClient = InfluxDBFactory.connect(configuration.getUrl(), configuration.getUser(),
74                 configuration.getPassword());
75         createdClient.setDatabase(configuration.getDatabaseName());
76         createdClient.setRetentionPolicy(configuration.getRetentionPolicy());
77         createdClient.enableBatch(200, 100, TimeUnit.MILLISECONDS);
78         this.client = createdClient;
79         return checkConnectionStatus();
80     }
81
82     @Override
83     public void disconnect() {
84         final InfluxDB currentClient = client;
85         if (currentClient != null) {
86             currentClient.close();
87         }
88         this.client = null;
89     }
90
91     @Override
92     public boolean checkConnectionStatus() {
93         final InfluxDB currentClient = client;
94         if (currentClient != null) {
95             try {
96                 Pong pong = currentClient.ping();
97                 String version = pong.getVersion();
98                 // may be check for version >= 0.9
99                 if (version != null && !version.contains("unknown")) {
100                     logger.debug("database status is OK, version is {}", version);
101                     return true;
102                 } else {
103                     logger.warn("database ping error, version is: \"{}\" response time was \"{}\"", version,
104                             pong.getResponseTime());
105                 }
106             } catch (RuntimeException e) {
107                 logger.warn("database error: {}", e.getMessage(), e);
108             }
109         } else {
110             logger.warn("checkConnection: database is not connected");
111         }
112         return false;
113     }
114
115     @Override
116     public void write(InfluxPoint point) throws UnexpectedConditionException {
117         final InfluxDB currentClient = this.client;
118         if (currentClient != null) {
119             Point clientPoint = convertPointToClientFormat(point);
120             currentClient.write(configuration.getDatabaseName(), configuration.getRetentionPolicy(), clientPoint);
121         } else {
122             logger.warn("Write point {} ignored due to client isn't connected", point);
123         }
124     }
125
126     private Point convertPointToClientFormat(InfluxPoint point) throws UnexpectedConditionException {
127         Point.Builder clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime().toEpochMilli(),
128                 TimeUnit.MILLISECONDS);
129         Object value = point.getValue();
130         if (value instanceof String) {
131             clientPoint.addField(FIELD_VALUE_NAME, (String) value);
132         } else if (value instanceof Number) {
133             clientPoint.addField(FIELD_VALUE_NAME, (Number) value);
134         } else if (value instanceof Boolean) {
135             clientPoint.addField(FIELD_VALUE_NAME, (Boolean) value);
136         } else if (value == null) {
137             clientPoint.addField(FIELD_VALUE_NAME, "null");
138         } else {
139             throw new UnexpectedConditionException("Not expected value type");
140         }
141         point.getTags().forEach(clientPoint::tag);
142         return clientPoint.build();
143     }
144
145     @Override
146     public List<InfluxRow> query(String query) {
147         final InfluxDB currentClient = client;
148         if (currentClient != null) {
149             Query parsedQuery = new Query(query, configuration.getDatabaseName());
150             List<QueryResult.Result> results = currentClient.query(parsedQuery, TimeUnit.MILLISECONDS).getResults();
151             return convertClientResultToRepository(results);
152         } else {
153             logger.warn("Returning empty list because queryAPI isn't present");
154             return List.of();
155         }
156     }
157
158     private List<InfluxRow> convertClientResultToRepository(List<QueryResult.Result> results) {
159         List<InfluxRow> rows = new ArrayList<>();
160         for (QueryResult.Result result : results) {
161             List<QueryResult.Series> allSeries = result.getSeries();
162             if (result.getError() != null) {
163                 logger.warn("{}", result.getError());
164                 continue;
165             }
166             if (allSeries == null) {
167                 logger.debug("query returned no series");
168             } else {
169                 for (QueryResult.Series series : allSeries) {
170                     logger.trace("series {}", series);
171                     String defaultItemName = series.getName();
172                     List<List<Object>> allValues = series.getValues();
173                     if (allValues == null) {
174                         logger.debug("query returned no values");
175                     } else {
176                         List<String> columns = series.getColumns();
177                         logger.trace("columns {}", columns);
178                         if (columns != null) {
179                             int timestampColumn = columns.indexOf(COLUMN_TIME_NAME_V1);
180                             int valueColumn = columns.indexOf(COLUMN_VALUE_NAME_V1);
181                             int itemNameColumn = columns.indexOf(TAG_ITEM_NAME);
182                             if (valueColumn == -1 || timestampColumn == -1) {
183                                 throw new IllegalStateException("missing column");
184                             }
185                             for (List<Object> valueObject : allValues) {
186                                 Double rawTime = (Double) valueObject.get(timestampColumn);
187                                 Instant time = Instant.ofEpochMilli(rawTime.longValue());
188                                 Object value = valueObject.get(valueColumn);
189                                 String itemName = itemNameColumn == -1 ? defaultItemName
190                                         : Objects.requireNonNullElse((String) valueObject.get(itemNameColumn),
191                                                 defaultItemName);
192                                 logger.trace("adding historic item {}: time {} value {}", itemName, time, value);
193                                 rows.add(new InfluxRow(time, itemName, value));
194                             }
195                         }
196                     }
197                 }
198             }
199         }
200         return rows;
201     }
202
203     @Override
204     public Map<String, Integer> getStoredItemsCount() {
205         return Collections.emptyMap();
206     }
207
208     @Override
209     public FilterCriteriaQueryCreator createQueryCreator() {
210         return new InfluxDB1FilterCriteriaQueryCreatorImpl(configuration, influxDBMetadataService);
211     }
212 }