]> git.basschouten.com Git - openhab-addons.git/blob
b103657c05f54a3846a9624800fcf9fc09a52d93
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.persistence.influxdb.internal.influx1;
14
15 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.COLUMN_TIME_NAME_V1;
16 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.COLUMN_VALUE_NAME_V1;
17 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.FIELD_VALUE_NAME;
18 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.TAG_ITEM_NAME;
19
20 import java.time.Instant;
21 import java.util.ArrayList;
22 import java.util.Collections;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Objects;
26 import java.util.Optional;
27 import java.util.concurrent.TimeUnit;
28
29 import org.eclipse.jdt.annotation.NonNullByDefault;
30 import org.eclipse.jdt.annotation.Nullable;
31 import org.influxdb.InfluxDB;
32 import org.influxdb.InfluxDBFactory;
33 import org.influxdb.dto.Point;
34 import org.influxdb.dto.Pong;
35 import org.influxdb.dto.Query;
36 import org.influxdb.dto.QueryResult;
37 import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
38 import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
39 import org.openhab.persistence.influxdb.internal.InfluxPoint;
40 import org.openhab.persistence.influxdb.internal.InfluxRow;
41 import org.openhab.persistence.influxdb.internal.UnnexpectedConditionException;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 /**
46  * Implementation of {@link InfluxDBRepository} for InfluxDB 1.0
47  *
48  * @author Joan Pujol Espinar - Initial contribution. Most code has been moved
49  *         from
50  *         {@link org.openhab.persistence.influxdb.InfluxDBPersistenceService}
51  *         where it was in previous version
52  */
53 @NonNullByDefault
54 public class InfluxDB1RepositoryImpl implements InfluxDBRepository {
55     private final Logger logger = LoggerFactory.getLogger(InfluxDB1RepositoryImpl.class);
56     private InfluxDBConfiguration configuration;
57     @Nullable
58     private InfluxDB client;
59
60     public InfluxDB1RepositoryImpl(InfluxDBConfiguration configuration) {
61         this.configuration = configuration;
62     }
63
64     @Override
65     public boolean isConnected() {
66         return client != null;
67     }
68
69     @Override
70     public boolean connect() {
71         final InfluxDB createdClient = InfluxDBFactory.connect(configuration.getUrl(), configuration.getUser(),
72                 configuration.getPassword());
73         createdClient.setDatabase(configuration.getDatabaseName());
74         createdClient.setRetentionPolicy(configuration.getRetentionPolicy());
75         createdClient.enableBatch(200, 100, TimeUnit.MILLISECONDS);
76         this.client = createdClient;
77         return checkConnectionStatus();
78     }
79
80     @Override
81     public void disconnect() {
82         this.client = null;
83     }
84
85     @Override
86     public boolean checkConnectionStatus() {
87         boolean dbStatus = false;
88         final InfluxDB currentClient = client;
89         if (currentClient != null) {
90             try {
91                 Pong pong = currentClient.ping();
92                 String version = pong.getVersion();
93                 // may be check for version >= 0.9
94                 if (version != null && !version.contains("unknown")) {
95                     dbStatus = true;
96                     logger.debug("database status is OK, version is {}", version);
97                 } else {
98                     logger.warn("database ping error, version is: \"{}\" response time was \"{}\"", version,
99                             pong.getResponseTime());
100                     dbStatus = false;
101                 }
102             } catch (RuntimeException e) {
103                 dbStatus = false;
104                 logger.error("database connection failed", e);
105                 handleDatabaseException(e);
106             }
107         } else {
108             logger.warn("checkConnection: database is not connected");
109         }
110         return dbStatus;
111     }
112
113     private void handleDatabaseException(Exception e) {
114         logger.warn("database error: {}", e.getMessage(), e);
115     }
116
117     @Override
118     public void write(InfluxPoint point) {
119         final InfluxDB currentClient = this.client;
120         if (currentClient != null) {
121             Point clientPoint = convertPointToClientFormat(point);
122             currentClient.write(configuration.getDatabaseName(), configuration.getRetentionPolicy(), clientPoint);
123         } else {
124             logger.warn("Write point {} ignored due to client isn't connected", point);
125         }
126     }
127
128     private Point convertPointToClientFormat(InfluxPoint point) {
129         Point.Builder clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime().toEpochMilli(),
130                 TimeUnit.MILLISECONDS);
131         setPointValue(point.getValue(), clientPoint);
132         point.getTags().entrySet().forEach(e -> clientPoint.tag(e.getKey(), e.getValue()));
133         return clientPoint.build();
134     }
135
136     private void setPointValue(@Nullable Object value, Point.Builder point) {
137         if (value instanceof String) {
138             point.addField(FIELD_VALUE_NAME, (String) value);
139         } else if (value instanceof Number) {
140             point.addField(FIELD_VALUE_NAME, (Number) value);
141         } else if (value instanceof Boolean) {
142             point.addField(FIELD_VALUE_NAME, (Boolean) value);
143         } else if (value == null) {
144             point.addField(FIELD_VALUE_NAME, (String) null);
145         } else {
146             throw new UnnexpectedConditionException("Not expected value type");
147         }
148     }
149
150     @Override
151     public List<InfluxRow> query(String query) {
152         final InfluxDB currentClient = client;
153         if (currentClient != null) {
154             Query parsedQuery = new Query(query, configuration.getDatabaseName());
155             List<QueryResult.Result> results = currentClient.query(parsedQuery, TimeUnit.MILLISECONDS).getResults();
156             return convertClientResutToRepository(results);
157         } else {
158             logger.warn("Returning empty list because queryAPI isn't present");
159             return Collections.emptyList();
160         }
161     }
162
163     private List<InfluxRow> convertClientResutToRepository(List<QueryResult.Result> results) {
164         List<InfluxRow> rows = new ArrayList<>();
165         for (QueryResult.Result result : results) {
166             List<QueryResult.Series> seriess = result.getSeries();
167             if (result.getError() != null) {
168                 logger.warn("{}", result.getError());
169                 continue;
170             }
171             if (seriess == null) {
172                 logger.debug("query returned no series");
173             } else {
174                 for (QueryResult.Series series : seriess) {
175                     logger.trace("series {}", series.toString());
176                     List<List<@Nullable Object>> valuess = series.getValues();
177                     if (valuess == null) {
178                         logger.debug("query returned no values");
179                     } else {
180                         List<String> columns = series.getColumns();
181                         logger.trace("columns {}", columns);
182                         if (columns != null) {
183                             Integer timestampColumn = null;
184                             Integer valueColumn = null;
185                             Integer itemNameColumn = null;
186                             for (int i = 0; i < columns.size(); i++) {
187                                 String columnName = columns.get(i);
188                                 if (columnName.equals(COLUMN_TIME_NAME_V1)) {
189                                     timestampColumn = i;
190                                 } else if (columnName.equals(COLUMN_VALUE_NAME_V1)) {
191                                     valueColumn = i;
192                                 } else if (columnName.equals(TAG_ITEM_NAME)) {
193                                     itemNameColumn = i;
194                                 }
195                             }
196                             if (valueColumn == null || timestampColumn == null) {
197                                 throw new IllegalStateException("missing column");
198                             }
199                             for (int i = 0; i < valuess.size(); i++) {
200                                 Double rawTime = (Double) Objects.requireNonNull(valuess.get(i).get(timestampColumn));
201                                 Instant time = Instant.ofEpochMilli(rawTime.longValue());
202                                 @Nullable
203                                 Object value = valuess.get(i).get(valueColumn);
204                                 var currentI = i;
205                                 String itemName = Optional.ofNullable(itemNameColumn)
206                                         .flatMap(inc -> Optional.ofNullable((String) valuess.get(currentI).get(inc)))
207                                         .orElse(series.getName());
208                                 logger.trace("adding historic item {}: time {} value {}", itemName, time, value);
209                                 rows.add(new InfluxRow(time, itemName, value));
210                             }
211                         }
212                     }
213                 }
214             }
215         }
216         return rows;
217     }
218
219     @Override
220     public Map<String, Integer> getStoredItemsCount() {
221         return Collections.emptyMap();
222     }
223 }