]> git.basschouten.com Git - openhab-addons.git/blob
358bd778a2eb2b3a6c14362c09ae44869e083c07
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.persistence.influxdb;
14
15 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.*;
16
17 import java.time.Instant;
18 import java.time.ZoneId;
19 import java.time.ZonedDateTime;
20 import java.util.ArrayList;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Locale;
25 import java.util.Map;
26 import java.util.Objects;
27 import java.util.Set;
28 import java.util.concurrent.BlockingQueue;
29 import java.util.concurrent.CompletableFuture;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import java.util.concurrent.ScheduledFuture;
32 import java.util.concurrent.TimeUnit;
33 import java.util.stream.Collectors;
34
35 import org.eclipse.jdt.annotation.NonNullByDefault;
36 import org.eclipse.jdt.annotation.Nullable;
37 import org.openhab.core.common.ThreadPoolManager;
38 import org.openhab.core.config.core.ConfigurableService;
39 import org.openhab.core.items.Item;
40 import org.openhab.core.items.ItemFactory;
41 import org.openhab.core.items.ItemRegistry;
42 import org.openhab.core.items.ItemUtil;
43 import org.openhab.core.persistence.FilterCriteria;
44 import org.openhab.core.persistence.HistoricItem;
45 import org.openhab.core.persistence.ModifiablePersistenceService;
46 import org.openhab.core.persistence.PersistenceItemInfo;
47 import org.openhab.core.persistence.PersistenceService;
48 import org.openhab.core.persistence.QueryablePersistenceService;
49 import org.openhab.core.persistence.strategy.PersistenceStrategy;
50 import org.openhab.core.types.State;
51 import org.openhab.core.types.UnDefType;
52 import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
53 import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
54 import org.openhab.persistence.influxdb.internal.InfluxDBHistoricItem;
55 import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
56 import org.openhab.persistence.influxdb.internal.InfluxDBPersistentItemInfo;
57 import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
58 import org.openhab.persistence.influxdb.internal.InfluxDBStateConvertUtils;
59 import org.openhab.persistence.influxdb.internal.InfluxPoint;
60 import org.openhab.persistence.influxdb.internal.influx1.InfluxDB1RepositoryImpl;
61 import org.openhab.persistence.influxdb.internal.influx2.InfluxDB2RepositoryImpl;
62 import org.osgi.framework.Constants;
63 import org.osgi.service.component.annotations.Activate;
64 import org.osgi.service.component.annotations.Component;
65 import org.osgi.service.component.annotations.Deactivate;
66 import org.osgi.service.component.annotations.Reference;
67 import org.osgi.service.component.annotations.ReferenceCardinality;
68 import org.osgi.service.component.annotations.ReferencePolicy;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
71
72 /**
73  * This is the implementation of the InfluxDB {@link PersistenceService}. It
74  * persists item values using the <a href="http://influxdb.org">InfluxDB</a> time
75  * series database. The states ({@link State}) of an {@link Item} are persisted
76  * by default in a time series with names equal to the name of the item.
77  *
78  * This addon supports 1.X and 2.X versions, as two versions are incompatible
79  * and use different drivers the specific code for each version is accessed by
80  * {@link InfluxDBRepository} and {@link FilterCriteriaQueryCreator} interfaces
81  * and specific implementation reside in
82  * {@link org.openhab.persistence.influxdb.internal.influx1} and
83  * {@link org.openhab.persistence.influxdb.internal.influx2} packages
84  *
85  * @author Theo Weiss - Initial contribution, rewrite of
86  *         org.openhab.persistence.influxdb
87  * @author Joan Pujol Espinar - Addon rewrite refactoring code and adding
88  *         support for InfluxDB 2.0. Some tag code is based from not integrated
89  *         branch from Dominik Vorreiter
90  */
91 @NonNullByDefault
92 @Component(service = { PersistenceService.class,
93         QueryablePersistenceService.class }, configurationPid = "org.openhab.influxdb", //
94         property = Constants.SERVICE_PID + "=org.openhab.influxdb")
95 @ConfigurableService(category = "persistence", label = "InfluxDB Persistence Service", description_uri = InfluxDBPersistenceService.CONFIG_URI)
96 public class InfluxDBPersistenceService implements ModifiablePersistenceService {
97     public static final String SERVICE_NAME = "influxdb";
98
99     private final Logger logger = LoggerFactory.getLogger(InfluxDBPersistenceService.class);
100
101     private static final int COMMIT_INTERVAL = 3; // in s
102     protected static final String CONFIG_URI = "persistence:influxdb";
103
104     // External dependencies
105     private final ItemRegistry itemRegistry;
106     private final InfluxDBMetadataService influxDBMetadataService;
107
108     private final InfluxDBConfiguration configuration;
109     private final InfluxDBRepository influxDBRepository;
110     private boolean serviceActivated;
111
112     // storage
113     private final ScheduledFuture<?> storeJob;
114     private final BlockingQueue<InfluxPoint> pointsQueue = new LinkedBlockingQueue<>();
115
116     // conversion
117     private final Set<ItemFactory> itemFactories = new HashSet<>();
118     private Map<String, Class<? extends State>> desiredClasses = new HashMap<>();
119
120     @Activate
121     public InfluxDBPersistenceService(final @Reference ItemRegistry itemRegistry,
122             final @Reference InfluxDBMetadataService influxDBMetadataService, Map<String, Object> config) {
123         this.itemRegistry = itemRegistry;
124         this.influxDBMetadataService = influxDBMetadataService;
125         this.configuration = new InfluxDBConfiguration(config);
126         if (configuration.isValid()) {
127             this.influxDBRepository = createInfluxDBRepository();
128             this.influxDBRepository.connect();
129             this.storeJob = ThreadPoolManager.getScheduledPool("org.openhab.influxdb")
130                     .scheduleWithFixedDelay(this::commit, COMMIT_INTERVAL, COMMIT_INTERVAL, TimeUnit.SECONDS);
131             serviceActivated = true;
132         } else {
133             throw new IllegalArgumentException("Configuration invalid.");
134         }
135
136         logger.info("InfluxDB persistence service started.");
137     }
138
139     // Visible for testing
140     protected InfluxDBRepository createInfluxDBRepository() throws IllegalArgumentException {
141         return switch (configuration.getVersion()) {
142             case V1 -> new InfluxDB1RepositoryImpl(configuration, influxDBMetadataService);
143             case V2 -> new InfluxDB2RepositoryImpl(configuration, influxDBMetadataService);
144             default -> throw new IllegalArgumentException("Failed to instantiate repository.");
145         };
146     }
147
148     /**
149      * Disconnect from database when service is deactivated
150      */
151     @Deactivate
152     public void deactivate() {
153         serviceActivated = false;
154
155         storeJob.cancel(false);
156         commit(); // ensure we at least tried to store the data;
157
158         if (!pointsQueue.isEmpty()) {
159             logger.warn("InfluxDB failed to finally store {} points.", pointsQueue.size());
160         }
161
162         influxDBRepository.disconnect();
163         logger.info("InfluxDB persistence service stopped.");
164     }
165
166     @Override
167     public String getId() {
168         return SERVICE_NAME;
169     }
170
171     @Override
172     public String getLabel(@Nullable Locale locale) {
173         return "InfluxDB persistence layer";
174     }
175
176     @Override
177     public Set<PersistenceItemInfo> getItemInfo() {
178         if (checkConnection()) {
179             return influxDBRepository.getStoredItemsCount().entrySet().stream().map(InfluxDBPersistentItemInfo::new)
180                     .collect(Collectors.toUnmodifiableSet());
181         } else {
182             logger.info("getItemInfo ignored, InfluxDB is not connected");
183             return Set.of();
184         }
185     }
186
187     @Override
188     public void store(Item item) {
189         store(item, null);
190     }
191
192     @Override
193     public void store(Item item, @Nullable String alias) {
194         store(item, ZonedDateTime.now(), item.getState(), alias);
195     }
196
197     @Override
198     public void store(Item item, ZonedDateTime date, State state) {
199         store(item, date, state, null);
200     }
201
202     @Override
203     public void store(Item item, ZonedDateTime date, State state, @Nullable String alias) {
204         if (!serviceActivated) {
205             logger.warn("InfluxDB service not ready. Storing {} rejected.", item);
206             return;
207         }
208         convert(item, state, date.toInstant(), null).thenAccept(point -> {
209             if (point == null) {
210                 logger.trace("Ignoring item {}, conversion to an InfluxDB point failed.", item.getName());
211                 return;
212             }
213             if (pointsQueue.offer(point)) {
214                 logger.trace("Queued {} for item {}", point, item);
215             } else {
216                 logger.warn("Failed to queue {} for item {}", point, item);
217             }
218         });
219     }
220
221     @Override
222     public boolean remove(FilterCriteria filter) throws IllegalArgumentException {
223         if (serviceActivated && checkConnection()) {
224             if (filter.getItemName() == null) {
225                 logger.warn("Item name is missing in filter {} when trying to remove data.", filter);
226                 return false;
227             }
228             return influxDBRepository.remove(filter);
229         } else {
230             logger.debug("Remove query {} ignored, InfluxDB is not connected.", filter);
231             return false;
232         }
233     }
234
235     @Override
236     public Iterable<HistoricItem> query(FilterCriteria filter) {
237         if (serviceActivated && checkConnection()) {
238             logger.trace(
239                     "Query-Filter: itemname: {}, ordering: {}, state: {},  operator: {}, getBeginDate: {}, getEndDate: {}, getPageSize: {}, getPageNumber: {}",
240                     filter.getItemName(), filter.getOrdering().toString(), filter.getState(), filter.getOperator(),
241                     filter.getBeginDate(), filter.getEndDate(), filter.getPageSize(), filter.getPageNumber());
242             if (filter.getItemName() == null) {
243                 logger.warn("Item name is missing in filter {} when querying data.", filter);
244                 return List.of();
245             }
246
247             List<InfluxDBRepository.InfluxRow> results = influxDBRepository.query(filter,
248                     configuration.getRetentionPolicy());
249             return results.stream().map(this::mapRowToHistoricItem).collect(Collectors.toList());
250         } else {
251             logger.debug("Query for persisted data ignored, InfluxDB is not connected");
252             return List.of();
253         }
254     }
255
256     private HistoricItem mapRowToHistoricItem(InfluxDBRepository.InfluxRow row) {
257         State state = InfluxDBStateConvertUtils.objectToState(row.value(), row.itemName(), itemRegistry);
258         return new InfluxDBHistoricItem(row.itemName(), state,
259                 ZonedDateTime.ofInstant(row.time(), ZoneId.systemDefault()));
260     }
261
262     @Override
263     public List<PersistenceStrategy> getDefaultStrategies() {
264         return List.of(PersistenceStrategy.Globals.RESTORE, PersistenceStrategy.Globals.CHANGE);
265     }
266
267     /**
268      * check connection and try reconnect
269      *
270      * @return true if connected
271      */
272     private boolean checkConnection() {
273         if (influxDBRepository.isConnected()) {
274             return true;
275         } else if (serviceActivated) {
276             logger.debug("Connection lost, trying re-connection");
277             return influxDBRepository.connect();
278         }
279         return false;
280     }
281
282     private void commit() {
283         if (!pointsQueue.isEmpty() && checkConnection()) {
284             List<InfluxPoint> points = new ArrayList<>();
285             pointsQueue.drainTo(points);
286             if (!influxDBRepository.write(points)) {
287                 logger.warn("Re-queuing {} elements, failed to write batch.", points.size());
288                 pointsQueue.addAll(points);
289             } else {
290                 logger.trace("Wrote {} elements to database", points.size());
291             }
292         }
293     }
294
295     /**
296      * Convert incoming data to an {@link InfluxPoint} for further processing. This is needed because storage is
297      * asynchronous and the item data may have changed.
298      * <p />
299      * The method is package-private for testing.
300      *
301      * @param item the {@link Item} that needs conversion
302      * @param storeAlias an (optional) alias for the item
303      * @return a {@link CompletableFuture} that contains either <code>null</code> for item states that cannot be
304      *         converted or the corresponding {@link InfluxPoint}
305      */
306     CompletableFuture<@Nullable InfluxPoint> convert(Item item, State state, Instant timeStamp,
307             @Nullable String storeAlias) {
308         String itemName = item.getName();
309         String itemLabel = item.getLabel();
310         String category = item.getCategory();
311         String itemType = item.getType();
312
313         if (state instanceof UnDefType) {
314             return CompletableFuture.completedFuture(null);
315         }
316
317         return CompletableFuture.supplyAsync(() -> {
318             String measurementName = storeAlias != null && !storeAlias.isBlank() ? storeAlias : itemName;
319             measurementName = influxDBMetadataService.getMeasurementNameOrDefault(itemName, measurementName);
320
321             if (configuration.isReplaceUnderscore()) {
322                 measurementName = measurementName.replace('_', '.');
323             }
324
325             State storeState = Objects
326                     .requireNonNullElse(state.as(desiredClasses.get(ItemUtil.getMainItemType(itemType))), state);
327             Object value = InfluxDBStateConvertUtils.stateToObject(storeState);
328
329             InfluxPoint.Builder pointBuilder = InfluxPoint.newBuilder(measurementName).withTime(timeStamp)
330                     .withValue(value).withTag(TAG_ITEM_NAME, itemName);
331
332             if (configuration.isAddCategoryTag()) {
333                 String categoryName = Objects.requireNonNullElse(category, "n/a");
334                 pointBuilder.withTag(TAG_CATEGORY_NAME, categoryName);
335             }
336
337             if (configuration.isAddTypeTag()) {
338                 pointBuilder.withTag(TAG_TYPE_NAME, itemType);
339             }
340
341             if (configuration.isAddLabelTag()) {
342                 String labelName = Objects.requireNonNullElse(itemLabel, "n/a");
343                 pointBuilder.withTag(TAG_LABEL_NAME, labelName);
344             }
345
346             influxDBMetadataService.getMetaData(itemName)
347                     .ifPresent(metadata -> metadata.getConfiguration().forEach(pointBuilder::withTag));
348
349             return pointBuilder.build();
350         });
351     }
352
353     @Reference(cardinality = ReferenceCardinality.AT_LEAST_ONE, policy = ReferencePolicy.DYNAMIC)
354     public void setItemFactory(ItemFactory itemFactory) {
355         itemFactories.add(itemFactory);
356         calculateItemTypeClasses();
357     }
358
359     public void unsetItemFactory(ItemFactory itemFactory) {
360         itemFactories.remove(itemFactory);
361         calculateItemTypeClasses();
362     }
363
364     private synchronized void calculateItemTypeClasses() {
365         Map<String, Class<? extends State>> desiredClasses = new HashMap<>();
366         itemFactories.forEach(factory -> {
367             for (String itemType : factory.getSupportedItemTypes()) {
368                 Item item = factory.createItem(itemType, "influxItem");
369                 if (item != null) {
370                     item.getAcceptedCommandTypes().stream()
371                             .filter(commandType -> commandType.isAssignableFrom(State.class)).findFirst()
372                             .map(commandType -> (Class<? extends State>) commandType.asSubclass(State.class))
373                             .ifPresent(desiredClass -> desiredClasses.put(itemType, desiredClass));
374                 }
375             }
376         });
377         this.desiredClasses = desiredClasses;
378     }
379 }