]> git.basschouten.com Git - openhab-addons.git/blob
43d578d727ded9781c4192a70031fe5a3a691696
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.persistence.influxdb.internal.influx2;
14
15 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.*;
16
17 import java.time.Instant;
18 import java.util.Collections;
19 import java.util.LinkedHashMap;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.Objects;
23 import java.util.stream.Stream;
24
25 import org.eclipse.jdt.annotation.NonNullByDefault;
26 import org.eclipse.jdt.annotation.Nullable;
27 import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
28 import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
29 import org.openhab.persistence.influxdb.internal.InfluxDBConstants;
30 import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
31 import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
32 import org.openhab.persistence.influxdb.internal.InfluxPoint;
33 import org.openhab.persistence.influxdb.internal.UnexpectedConditionException;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 import com.influxdb.client.InfluxDBClient;
38 import com.influxdb.client.InfluxDBClientFactory;
39 import com.influxdb.client.InfluxDBClientOptions;
40 import com.influxdb.client.QueryApi;
41 import com.influxdb.client.WriteApi;
42 import com.influxdb.client.domain.Ready;
43 import com.influxdb.client.domain.WritePrecision;
44 import com.influxdb.client.write.Point;
45 import com.influxdb.query.FluxTable;
46
47 /**
48  * Implementation of {@link InfluxDBRepository} for InfluxDB 2.0
49  *
50  * @author Joan Pujol Espinar - Initial contribution
51  */
52 @NonNullByDefault
53 public class InfluxDB2RepositoryImpl implements InfluxDBRepository {
54     private final Logger logger = LoggerFactory.getLogger(InfluxDB2RepositoryImpl.class);
55     private final InfluxDBConfiguration configuration;
56     private final InfluxDBMetadataService influxDBMetadataService;
57
58     private @Nullable InfluxDBClient client;
59     private @Nullable QueryApi queryAPI;
60     private @Nullable WriteApi writeAPI;
61
62     public InfluxDB2RepositoryImpl(InfluxDBConfiguration configuration,
63             InfluxDBMetadataService influxDBMetadataService) {
64         this.configuration = configuration;
65         this.influxDBMetadataService = influxDBMetadataService;
66     }
67
68     @Override
69     public boolean isConnected() {
70         return client != null;
71     }
72
73     @Override
74     public boolean connect() {
75         InfluxDBClientOptions.Builder optionsBuilder = InfluxDBClientOptions.builder().url(configuration.getUrl())
76                 .org(configuration.getDatabaseName()).bucket(configuration.getRetentionPolicy());
77         char[] token = configuration.getToken().toCharArray();
78         if (token.length > 0) {
79             optionsBuilder.authenticateToken(token);
80         } else {
81             optionsBuilder.authenticate(configuration.getUser(), configuration.getPassword().toCharArray());
82         }
83         InfluxDBClientOptions clientOptions = optionsBuilder.build();
84
85         final InfluxDBClient createdClient = InfluxDBClientFactory.create(clientOptions);
86         this.client = createdClient;
87
88         queryAPI = createdClient.getQueryApi();
89         writeAPI = createdClient.getWriteApi();
90         logger.debug("Successfully connected to InfluxDB. Instance ready={}", createdClient.ready());
91
92         return checkConnectionStatus();
93     }
94
95     @Override
96     public void disconnect() {
97         final InfluxDBClient currentClient = this.client;
98         if (currentClient != null) {
99             currentClient.close();
100         }
101         this.client = null;
102     }
103
104     @Override
105     public boolean checkConnectionStatus() {
106         final InfluxDBClient currentClient = client;
107         if (currentClient != null) {
108             Ready ready = currentClient.ready();
109             boolean isUp = ready != null && ready.getStatus() == Ready.StatusEnum.READY;
110             if (isUp) {
111                 logger.debug("database status is OK");
112             } else {
113                 logger.warn("database not ready");
114             }
115             return isUp;
116         } else {
117             logger.warn("checkConnection: database is not connected");
118             return false;
119         }
120     }
121
122     @Override
123     public void write(InfluxPoint point) throws UnexpectedConditionException {
124         final WriteApi currentWriteAPI = writeAPI;
125         if (currentWriteAPI != null) {
126             currentWriteAPI.writePoint(convertPointToClientFormat(point));
127         } else {
128             logger.warn("Write point {} ignored due to writeAPI isn't present", point);
129         }
130     }
131
132     private Point convertPointToClientFormat(InfluxPoint point) throws UnexpectedConditionException {
133         Point clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime(), WritePrecision.MS);
134         setPointValue(point.getValue(), clientPoint);
135         point.getTags().forEach(clientPoint::addTag);
136         return clientPoint;
137     }
138
139     private void setPointValue(@Nullable Object value, Point point) throws UnexpectedConditionException {
140         if (value instanceof String) {
141             point.addField(FIELD_VALUE_NAME, (String) value);
142         } else if (value instanceof Number) {
143             point.addField(FIELD_VALUE_NAME, (Number) value);
144         } else if (value instanceof Boolean) {
145             point.addField(FIELD_VALUE_NAME, (Boolean) value);
146         } else if (value == null) {
147             point.addField(FIELD_VALUE_NAME, (String) null);
148         } else {
149             throw new UnexpectedConditionException("Not expected value type");
150         }
151     }
152
153     @Override
154     public List<InfluxRow> query(String query) {
155         final QueryApi currentQueryAPI = queryAPI;
156         if (currentQueryAPI != null) {
157             List<FluxTable> clientResult = currentQueryAPI.query(query);
158             return clientResult.stream().flatMap(this::mapRawResultToHistoric).toList();
159         } else {
160             logger.warn("Returning empty list because queryAPI isn't present");
161             return List.of();
162         }
163     }
164
165     private Stream<InfluxRow> mapRawResultToHistoric(FluxTable rawRow) {
166         return rawRow.getRecords().stream().map(r -> {
167             String itemName = (String) r.getValueByKey(InfluxDBConstants.TAG_ITEM_NAME);
168             if (itemName == null) {
169                 itemName = r.getMeasurement();
170             }
171             Object value = r.getValueByKey(COLUMN_VALUE_NAME_V2);
172             Instant time = (Instant) r.getValueByKey(COLUMN_TIME_NAME_V2);
173             return new InfluxRow(time, itemName, value);
174         });
175     }
176
177     @Override
178     public Map<String, Integer> getStoredItemsCount() {
179         final QueryApi currentQueryAPI = queryAPI;
180
181         if (currentQueryAPI != null) {
182             Map<String, Integer> result = new LinkedHashMap<>();
183             // Query wrote by hand https://github.com/influxdata/influxdb-client-java/issues/75
184             String query = "from(bucket: \"" + configuration.getRetentionPolicy() + "\")\n"
185                     + "  |> range(start:-365d)\n" + "  |> filter(fn: (r) => exists r." + TAG_ITEM_NAME + " )\n"
186                     + "  |> group(columns: [\"" + TAG_ITEM_NAME + "\"], mode:\"by\")\n" + "  |> count()\n"
187                     + "  |> group()";
188
189             List<FluxTable> queryResult = currentQueryAPI.query(query);
190             Objects.requireNonNull(queryResult.stream().findFirst().orElse(new FluxTable())).getRecords()
191                     .forEach(row -> {
192                         result.put((String) row.getValueByKey(TAG_ITEM_NAME), ((Number) row.getValue()).intValue());
193                     });
194             return result;
195         } else {
196             logger.warn("Returning empty result  because queryAPI isn't present");
197             return Collections.emptyMap();
198         }
199     }
200
201     @Override
202     public FilterCriteriaQueryCreator createQueryCreator() {
203         return new InfluxDB2FilterCriteriaQueryCreatorImpl(configuration, influxDBMetadataService);
204     }
205 }