]> git.basschouten.com Git - openhab-addons.git/blob
dade69cf59b86a36c10f936746dd50b0d3af85fd
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.persistence.influxdb.internal.influx2;
14
15 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.*;
16
17 import java.time.Instant;
18 import java.util.Collections;
19 import java.util.LinkedHashMap;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.Objects;
23 import java.util.Optional;
24 import java.util.stream.Stream;
25
26 import org.eclipse.jdt.annotation.NonNullByDefault;
27 import org.eclipse.jdt.annotation.Nullable;
28 import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
29 import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
30 import org.openhab.persistence.influxdb.internal.InfluxDBConstants;
31 import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
32 import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
33 import org.openhab.persistence.influxdb.internal.InfluxPoint;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 import com.influxdb.client.InfluxDBClient;
38 import com.influxdb.client.InfluxDBClientFactory;
39 import com.influxdb.client.InfluxDBClientOptions;
40 import com.influxdb.client.QueryApi;
41 import com.influxdb.client.WriteApi;
42 import com.influxdb.client.domain.Ready;
43 import com.influxdb.client.domain.WritePrecision;
44 import com.influxdb.client.write.Point;
45 import com.influxdb.exceptions.InfluxException;
46 import com.influxdb.query.FluxTable;
47
48 /**
49  * Implementation of {@link InfluxDBRepository} for InfluxDB 2.0
50  *
51  * @author Joan Pujol Espinar - Initial contribution
52  */
53 @NonNullByDefault
54 public class InfluxDB2RepositoryImpl implements InfluxDBRepository {
55     private final Logger logger = LoggerFactory.getLogger(InfluxDB2RepositoryImpl.class);
56     private final InfluxDBConfiguration configuration;
57     private final InfluxDBMetadataService influxDBMetadataService;
58
59     private @Nullable InfluxDBClient client;
60     private @Nullable QueryApi queryAPI;
61     private @Nullable WriteApi writeAPI;
62
63     public InfluxDB2RepositoryImpl(InfluxDBConfiguration configuration,
64             InfluxDBMetadataService influxDBMetadataService) {
65         this.configuration = configuration;
66         this.influxDBMetadataService = influxDBMetadataService;
67     }
68
69     @Override
70     public boolean isConnected() {
71         return client != null;
72     }
73
74     @Override
75     public boolean connect() {
76         InfluxDBClientOptions.Builder optionsBuilder = InfluxDBClientOptions.builder().url(configuration.getUrl())
77                 .org(configuration.getDatabaseName()).bucket(configuration.getRetentionPolicy());
78         char[] token = configuration.getToken().toCharArray();
79         if (token.length > 0) {
80             optionsBuilder.authenticateToken(token);
81         } else {
82             optionsBuilder.authenticate(configuration.getUser(), configuration.getPassword().toCharArray());
83         }
84         InfluxDBClientOptions clientOptions = optionsBuilder.build();
85
86         final InfluxDBClient createdClient = InfluxDBClientFactory.create(clientOptions);
87         this.client = createdClient;
88
89         queryAPI = createdClient.getQueryApi();
90         writeAPI = createdClient.getWriteApi();
91         logger.debug("Successfully connected to InfluxDB. Instance ready={}", createdClient.ready());
92
93         return checkConnectionStatus();
94     }
95
96     @Override
97     public void disconnect() {
98         final InfluxDBClient currentClient = this.client;
99         if (currentClient != null) {
100             currentClient.close();
101         }
102         this.client = null;
103     }
104
105     @Override
106     public boolean checkConnectionStatus() {
107         final InfluxDBClient currentClient = client;
108         if (currentClient != null) {
109             Ready ready = currentClient.ready();
110             boolean isUp = ready != null && ready.getStatus() == Ready.StatusEnum.READY;
111             if (isUp) {
112                 logger.debug("database status is OK");
113             } else {
114                 logger.warn("database not ready");
115             }
116             return isUp;
117         } else {
118             logger.warn("checkConnection: database is not connected");
119             return false;
120         }
121     }
122
123     @Override
124     public boolean write(List<InfluxPoint> influxPoints) {
125         final WriteApi currentWriteAPI = writeAPI;
126         if (currentWriteAPI == null) {
127             return false;
128         }
129         try {
130             List<Point> clientPoints = influxPoints.stream().map(this::convertPointToClientFormat)
131                     .filter(Optional::isPresent).map(Optional::get).toList();
132             currentWriteAPI.writePoints(clientPoints);
133         } catch (InfluxException e) {
134             logger.debug("Writing to database failed", e);
135             return false;
136         }
137         return true;
138     }
139
140     private Optional<Point> convertPointToClientFormat(InfluxPoint point) {
141         Point clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime(), WritePrecision.MS);
142         @Nullable
143         Object value = point.getValue();
144         if (value instanceof String) {
145             clientPoint.addField(FIELD_VALUE_NAME, (String) value);
146         } else if (value instanceof Number) {
147             clientPoint.addField(FIELD_VALUE_NAME, (Number) value);
148         } else if (value instanceof Boolean) {
149             clientPoint.addField(FIELD_VALUE_NAME, (Boolean) value);
150         } else if (value == null) {
151             clientPoint.addField(FIELD_VALUE_NAME, (String) null);
152         } else {
153             logger.warn("Could not convert {}, discarding this datapoint)", clientPoint);
154             return Optional.empty();
155         }
156         point.getTags().forEach(clientPoint::addTag);
157         return Optional.of(clientPoint);
158     }
159
160     @Override
161     public List<InfluxRow> query(String query) {
162         final QueryApi currentQueryAPI = queryAPI;
163         if (currentQueryAPI != null) {
164             List<FluxTable> clientResult = currentQueryAPI.query(query);
165             return clientResult.stream().flatMap(this::mapRawResultToHistoric).toList();
166         } else {
167             logger.warn("Returning empty list because queryAPI isn't present");
168             return List.of();
169         }
170     }
171
172     private Stream<InfluxRow> mapRawResultToHistoric(FluxTable rawRow) {
173         return rawRow.getRecords().stream().map(r -> {
174             String itemName = (String) r.getValueByKey(InfluxDBConstants.TAG_ITEM_NAME);
175             if (itemName == null) {
176                 itemName = r.getMeasurement();
177             }
178             Object value = r.getValueByKey(COLUMN_VALUE_NAME_V2);
179             Instant time = (Instant) r.getValueByKey(COLUMN_TIME_NAME_V2);
180             return new InfluxRow(time, itemName, value);
181         });
182     }
183
184     @Override
185     public Map<String, Integer> getStoredItemsCount() {
186         final QueryApi currentQueryAPI = queryAPI;
187
188         if (currentQueryAPI != null) {
189             Map<String, Integer> result = new LinkedHashMap<>();
190             // Query wrote by hand https://github.com/influxdata/influxdb-client-java/issues/75
191             String query = "from(bucket: \"" + configuration.getRetentionPolicy() + "\")\n"
192                     + "  |> range(start:-365d)\n" + "  |> filter(fn: (r) => exists r." + TAG_ITEM_NAME + " )\n"
193                     + "  |> group(columns: [\"" + TAG_ITEM_NAME + "\"], mode:\"by\")\n" + "  |> count()\n"
194                     + "  |> group()";
195
196             List<FluxTable> queryResult = currentQueryAPI.query(query);
197             Objects.requireNonNull(queryResult.stream().findFirst().orElse(new FluxTable())).getRecords()
198                     .forEach(row -> {
199                         result.put((String) row.getValueByKey(TAG_ITEM_NAME), ((Number) row.getValue()).intValue());
200                     });
201             return result;
202         } else {
203             logger.warn("Returning empty result  because queryAPI isn't present");
204             return Collections.emptyMap();
205         }
206     }
207
208     @Override
209     public FilterCriteriaQueryCreator createQueryCreator() {
210         return new InfluxDB2FilterCriteriaQueryCreatorImpl(configuration, influxDBMetadataService);
211     }
212 }