]> git.basschouten.com Git - openhab-addons.git/blob
783624651f480b1cb9d9e4fbb7e3bba051057fa8
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.persistence.influxdb;
14
15 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.*;
16
17 import java.time.Instant;
18 import java.time.ZoneId;
19 import java.time.ZonedDateTime;
20 import java.util.ArrayList;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Locale;
25 import java.util.Map;
26 import java.util.Objects;
27 import java.util.Set;
28 import java.util.concurrent.BlockingQueue;
29 import java.util.concurrent.CompletableFuture;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import java.util.concurrent.ScheduledFuture;
32 import java.util.concurrent.TimeUnit;
33 import java.util.stream.Collectors;
34
35 import org.eclipse.jdt.annotation.NonNullByDefault;
36 import org.eclipse.jdt.annotation.Nullable;
37 import org.openhab.core.common.ThreadPoolManager;
38 import org.openhab.core.config.core.ConfigurableService;
39 import org.openhab.core.items.Item;
40 import org.openhab.core.items.ItemFactory;
41 import org.openhab.core.items.ItemRegistry;
42 import org.openhab.core.items.ItemUtil;
43 import org.openhab.core.persistence.FilterCriteria;
44 import org.openhab.core.persistence.HistoricItem;
45 import org.openhab.core.persistence.PersistenceItemInfo;
46 import org.openhab.core.persistence.PersistenceService;
47 import org.openhab.core.persistence.QueryablePersistenceService;
48 import org.openhab.core.persistence.strategy.PersistenceStrategy;
49 import org.openhab.core.types.State;
50 import org.openhab.core.types.UnDefType;
51 import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
52 import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
53 import org.openhab.persistence.influxdb.internal.InfluxDBHistoricItem;
54 import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
55 import org.openhab.persistence.influxdb.internal.InfluxDBPersistentItemInfo;
56 import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
57 import org.openhab.persistence.influxdb.internal.InfluxDBStateConvertUtils;
58 import org.openhab.persistence.influxdb.internal.InfluxPoint;
59 import org.openhab.persistence.influxdb.internal.influx1.InfluxDB1RepositoryImpl;
60 import org.openhab.persistence.influxdb.internal.influx2.InfluxDB2RepositoryImpl;
61 import org.osgi.framework.Constants;
62 import org.osgi.service.component.annotations.Activate;
63 import org.osgi.service.component.annotations.Component;
64 import org.osgi.service.component.annotations.Deactivate;
65 import org.osgi.service.component.annotations.Reference;
66 import org.osgi.service.component.annotations.ReferenceCardinality;
67 import org.osgi.service.component.annotations.ReferencePolicy;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
70
71 /**
72  * This is the implementation of the InfluxDB {@link PersistenceService}. It
73  * persists item values using the <a href="http://influxdb.org">InfluxDB time
74  * series database. The states ( {@link State}) of an {@link Item} are persisted
75  * by default in a time series with names equal to the name of the item.
76  *
77  * This addon supports 1.X and 2.X versions, as two versions are incompatible
78  * and use different drivers the specific code for each version is accessed by
79  * {@link InfluxDBRepository} and {@link FilterCriteriaQueryCreator} interfaces
80  * and specific implementation reside in
81  * {@link org.openhab.persistence.influxdb.internal.influx1} and
82  * {@link org.openhab.persistence.influxdb.internal.influx2} packages
83  *
84  * @author Theo Weiss - Initial contribution, rewrite of
85  *         org.openhab.persistence.influxdb
86  * @author Joan Pujol Espinar - Addon rewrite refactoring code and adding
87  *         support for InfluxDB 2.0. Some tag code is based from not integrated
88  *         branch from Dominik Vorreiter
89  */
90 @NonNullByDefault
91 @Component(service = { PersistenceService.class,
92         QueryablePersistenceService.class }, configurationPid = "org.openhab.influxdb", //
93         property = Constants.SERVICE_PID + "=org.openhab.influxdb")
94 @ConfigurableService(category = "persistence", label = "InfluxDB Persistence Service", description_uri = InfluxDBPersistenceService.CONFIG_URI)
95 public class InfluxDBPersistenceService implements QueryablePersistenceService {
96     public static final String SERVICE_NAME = "influxdb";
97
98     private final Logger logger = LoggerFactory.getLogger(InfluxDBPersistenceService.class);
99
100     private static final int COMMIT_INTERVAL = 3; // in s
101     protected static final String CONFIG_URI = "persistence:influxdb";
102
103     // External dependencies
104     private final ItemRegistry itemRegistry;
105     private final InfluxDBMetadataService influxDBMetadataService;
106
107     private final InfluxDBConfiguration configuration;
108     private final InfluxDBRepository influxDBRepository;
109     private boolean serviceActivated;
110
111     // storage
112     private final ScheduledFuture<?> storeJob;
113     private final BlockingQueue<InfluxPoint> pointsQueue = new LinkedBlockingQueue<>();
114
115     // conversion
116     private final Set<ItemFactory> itemFactories = new HashSet<>();
117     private Map<String, Class<? extends State>> desiredClasses = new HashMap<>();
118
119     @Activate
120     public InfluxDBPersistenceService(final @Reference ItemRegistry itemRegistry,
121             final @Reference InfluxDBMetadataService influxDBMetadataService, Map<String, Object> config) {
122         this.itemRegistry = itemRegistry;
123         this.influxDBMetadataService = influxDBMetadataService;
124         this.configuration = new InfluxDBConfiguration(config);
125         if (configuration.isValid()) {
126             this.influxDBRepository = createInfluxDBRepository();
127             this.influxDBRepository.connect();
128             this.storeJob = ThreadPoolManager.getScheduledPool("org.openhab.influxdb")
129                     .scheduleWithFixedDelay(this::commit, COMMIT_INTERVAL, COMMIT_INTERVAL, TimeUnit.SECONDS);
130             serviceActivated = true;
131         } else {
132             throw new IllegalArgumentException("Configuration invalid.");
133         }
134
135         logger.info("InfluxDB persistence service started.");
136     }
137
138     // Visible for testing
139     protected InfluxDBRepository createInfluxDBRepository() throws IllegalArgumentException {
140         return switch (configuration.getVersion()) {
141             case V1 -> new InfluxDB1RepositoryImpl(configuration, influxDBMetadataService);
142             case V2 -> new InfluxDB2RepositoryImpl(configuration, influxDBMetadataService);
143             default -> throw new IllegalArgumentException("Failed to instantiate repository.");
144         };
145     }
146
147     /**
148      * Disconnect from database when service is deactivated
149      */
150     @Deactivate
151     public void deactivate() {
152         serviceActivated = false;
153
154         storeJob.cancel(false);
155         commit(); // ensure we at least tried to store the data;
156
157         if (!pointsQueue.isEmpty()) {
158             logger.warn("InfluxDB failed to finally store {} points.", pointsQueue.size());
159         }
160
161         influxDBRepository.disconnect();
162         logger.info("InfluxDB persistence service stopped.");
163     }
164
165     @Override
166     public String getId() {
167         return SERVICE_NAME;
168     }
169
170     @Override
171     public String getLabel(@Nullable Locale locale) {
172         return "InfluxDB persistence layer";
173     }
174
175     @Override
176     public Set<PersistenceItemInfo> getItemInfo() {
177         if (checkConnection()) {
178             return influxDBRepository.getStoredItemsCount().entrySet().stream().map(InfluxDBPersistentItemInfo::new)
179                     .collect(Collectors.toUnmodifiableSet());
180         } else {
181             logger.info("getItemInfo ignored, InfluxDB is not connected");
182             return Set.of();
183         }
184     }
185
186     @Override
187     public void store(Item item) {
188         store(item, null);
189     }
190
191     @Override
192     public void store(Item item, @Nullable String alias) {
193         if (!serviceActivated) {
194             logger.warn("InfluxDB service not ready. Storing {} rejected.", item);
195             return;
196         }
197         convert(item, alias).thenAccept(point -> {
198             if (point == null) {
199                 logger.trace("Ignoring item {}, conversion to an InfluxDB point failed.", item.getName());
200                 return;
201             }
202             if (pointsQueue.offer(point)) {
203                 logger.trace("Queued {} for item {}", point, item);
204             } else {
205                 logger.warn("Failed to queue {} for item {}", point, item);
206             }
207         });
208     }
209
210     @Override
211     public Iterable<HistoricItem> query(FilterCriteria filter) {
212         if (serviceActivated && checkConnection()) {
213             logger.trace(
214                     "Query-Filter: itemname: {}, ordering: {}, state: {},  operator: {}, getBeginDate: {}, getEndDate: {}, getPageSize: {}, getPageNumber: {}",
215                     filter.getItemName(), filter.getOrdering().toString(), filter.getState(), filter.getOperator(),
216                     filter.getBeginDate(), filter.getEndDate(), filter.getPageSize(), filter.getPageNumber());
217             if (filter.getItemName() == null) {
218                 logger.warn("Item name is missing in filter {}", filter);
219                 return List.of();
220             }
221             String query = influxDBRepository.createQueryCreator().createQuery(filter,
222                     configuration.getRetentionPolicy());
223             logger.trace("Query {}", query);
224             List<InfluxDBRepository.InfluxRow> results = influxDBRepository.query(query);
225             return results.stream().map(this::mapRowToHistoricItem).collect(Collectors.toList());
226         } else {
227             logger.debug("Query for persisted data ignored, InfluxDB is not connected");
228             return List.of();
229         }
230     }
231
232     private HistoricItem mapRowToHistoricItem(InfluxDBRepository.InfluxRow row) {
233         State state = InfluxDBStateConvertUtils.objectToState(row.value(), row.itemName(), itemRegistry);
234         return new InfluxDBHistoricItem(row.itemName(), state,
235                 ZonedDateTime.ofInstant(row.time(), ZoneId.systemDefault()));
236     }
237
238     @Override
239     public List<PersistenceStrategy> getDefaultStrategies() {
240         return List.of(PersistenceStrategy.Globals.RESTORE, PersistenceStrategy.Globals.CHANGE);
241     }
242
243     /**
244      * check connection and try reconnect
245      *
246      * @return true if connected
247      */
248     private boolean checkConnection() {
249         if (influxDBRepository.isConnected()) {
250             return true;
251         } else if (serviceActivated) {
252             logger.debug("Connection lost, trying re-connection");
253             return influxDBRepository.connect();
254         }
255         return false;
256     }
257
258     private void commit() {
259         if (!pointsQueue.isEmpty() && checkConnection()) {
260             List<InfluxPoint> points = new ArrayList<>();
261             pointsQueue.drainTo(points);
262             if (!influxDBRepository.write(points)) {
263                 logger.warn("Re-queuing {} elements, failed to write batch.", points.size());
264                 pointsQueue.addAll(points);
265             } else {
266                 logger.trace("Wrote {} elements to database", points.size());
267             }
268         }
269     }
270
271     /**
272      * Convert incoming data to an {@link InfluxPoint} for further processing. This is needed because storage is
273      * asynchronous and the item data may have changed.
274      * <p />
275      * The method is package-private for testing.
276      *
277      * @param item the {@link Item} that needs conversion
278      * @param storeAlias an (optional) alias for the item
279      * @return a {@link CompletableFuture} that contains either <code>null</code> for item states that cannot be
280      *         converted or the corresponding {@link InfluxPoint}
281      */
282     CompletableFuture<@Nullable InfluxPoint> convert(Item item, @Nullable String storeAlias) {
283         String itemName = item.getName();
284         String itemLabel = item.getLabel();
285         String category = item.getCategory();
286         State state = item.getState();
287         String itemType = item.getType();
288         Instant timeStamp = Instant.now();
289
290         if (state instanceof UnDefType) {
291             return CompletableFuture.completedFuture(null);
292         }
293
294         return CompletableFuture.supplyAsync(() -> {
295             String measurementName = storeAlias != null && !storeAlias.isBlank() ? storeAlias : itemName;
296             measurementName = influxDBMetadataService.getMeasurementNameOrDefault(itemName, measurementName);
297
298             if (configuration.isReplaceUnderscore()) {
299                 measurementName = measurementName.replace('_', '.');
300             }
301
302             State storeState = Objects
303                     .requireNonNullElse(state.as(desiredClasses.get(ItemUtil.getMainItemType(itemType))), state);
304             Object value = InfluxDBStateConvertUtils.stateToObject(storeState);
305
306             InfluxPoint.Builder pointBuilder = InfluxPoint.newBuilder(measurementName).withTime(timeStamp)
307                     .withValue(value).withTag(TAG_ITEM_NAME, itemName);
308
309             if (configuration.isAddCategoryTag()) {
310                 String categoryName = Objects.requireNonNullElse(category, "n/a");
311                 pointBuilder.withTag(TAG_CATEGORY_NAME, categoryName);
312             }
313
314             if (configuration.isAddTypeTag()) {
315                 pointBuilder.withTag(TAG_TYPE_NAME, itemType);
316             }
317
318             if (configuration.isAddLabelTag()) {
319                 String labelName = Objects.requireNonNullElse(itemLabel, "n/a");
320                 pointBuilder.withTag(TAG_LABEL_NAME, labelName);
321             }
322
323             influxDBMetadataService.getMetaData(itemName)
324                     .ifPresent(metadata -> metadata.getConfiguration().forEach(pointBuilder::withTag));
325
326             return pointBuilder.build();
327         });
328     }
329
330     @Reference(cardinality = ReferenceCardinality.AT_LEAST_ONE, policy = ReferencePolicy.DYNAMIC)
331     public void setItemFactory(ItemFactory itemFactory) {
332         itemFactories.add(itemFactory);
333         calculateItemTypeClasses();
334     }
335
336     public void unsetItemFactory(ItemFactory itemFactory) {
337         itemFactories.remove(itemFactory);
338         calculateItemTypeClasses();
339     }
340
341     private synchronized void calculateItemTypeClasses() {
342         Map<String, Class<? extends State>> desiredClasses = new HashMap<>();
343         itemFactories.forEach(factory -> {
344             for (String itemType : factory.getSupportedItemTypes()) {
345                 Item item = factory.createItem(itemType, "influxItem");
346                 if (item != null) {
347                     item.getAcceptedCommandTypes().stream()
348                             .filter(commandType -> commandType.isAssignableFrom(State.class)).findFirst()
349                             .map(commandType -> (Class<? extends State>) commandType.asSubclass(State.class))
350                             .ifPresent(desiredClass -> desiredClasses.put(itemType, desiredClass));
351                 }
352             }
353         });
354         this.desiredClasses = desiredClasses;
355     }
356 }