2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.persistence.influxdb;
15 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.*;
17 import java.time.Instant;
18 import java.time.ZoneId;
19 import java.time.ZonedDateTime;
20 import java.util.ArrayList;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Locale;
26 import java.util.Objects;
28 import java.util.concurrent.BlockingQueue;
29 import java.util.concurrent.CompletableFuture;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import java.util.concurrent.ScheduledFuture;
32 import java.util.concurrent.TimeUnit;
33 import java.util.stream.Collectors;
35 import org.eclipse.jdt.annotation.NonNullByDefault;
36 import org.eclipse.jdt.annotation.Nullable;
37 import org.openhab.core.common.ThreadPoolManager;
38 import org.openhab.core.config.core.ConfigurableService;
39 import org.openhab.core.items.Item;
40 import org.openhab.core.items.ItemFactory;
41 import org.openhab.core.items.ItemRegistry;
42 import org.openhab.core.items.ItemUtil;
43 import org.openhab.core.persistence.FilterCriteria;
44 import org.openhab.core.persistence.HistoricItem;
45 import org.openhab.core.persistence.PersistenceItemInfo;
46 import org.openhab.core.persistence.PersistenceService;
47 import org.openhab.core.persistence.QueryablePersistenceService;
48 import org.openhab.core.persistence.strategy.PersistenceStrategy;
49 import org.openhab.core.types.State;
50 import org.openhab.core.types.UnDefType;
51 import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
52 import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
53 import org.openhab.persistence.influxdb.internal.InfluxDBHistoricItem;
54 import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
55 import org.openhab.persistence.influxdb.internal.InfluxDBPersistentItemInfo;
56 import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
57 import org.openhab.persistence.influxdb.internal.InfluxDBStateConvertUtils;
58 import org.openhab.persistence.influxdb.internal.InfluxPoint;
59 import org.openhab.persistence.influxdb.internal.influx1.InfluxDB1RepositoryImpl;
60 import org.openhab.persistence.influxdb.internal.influx2.InfluxDB2RepositoryImpl;
61 import org.osgi.framework.Constants;
62 import org.osgi.service.component.annotations.Activate;
63 import org.osgi.service.component.annotations.Component;
64 import org.osgi.service.component.annotations.Deactivate;
65 import org.osgi.service.component.annotations.Reference;
66 import org.osgi.service.component.annotations.ReferenceCardinality;
67 import org.osgi.service.component.annotations.ReferencePolicy;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
72 * This is the implementation of the InfluxDB {@link PersistenceService}. It
73 * persists item values using the <a href="http://influxdb.org">InfluxDB time
74 * series database. The states ( {@link State}) of an {@link Item} are persisted
75 * by default in a time series with names equal to the name of the item.
77 * This addon supports 1.X and 2.X versions, as two versions are incompatible
78 * and use different drivers the specific code for each version is accessed by
79 * {@link InfluxDBRepository} and {@link FilterCriteriaQueryCreator} interfaces
80 * and specific implementation reside in
81 * {@link org.openhab.persistence.influxdb.internal.influx1} and
82 * {@link org.openhab.persistence.influxdb.internal.influx2} packages
84 * @author Theo Weiss - Initial contribution, rewrite of
85 * org.openhab.persistence.influxdb
86 * @author Joan Pujol Espinar - Addon rewrite refactoring code and adding
87 * support for InfluxDB 2.0. Some tag code is based from not integrated
88 * branch from Dominik Vorreiter
91 @Component(service = { PersistenceService.class,
92 QueryablePersistenceService.class }, configurationPid = "org.openhab.influxdb", //
93 property = Constants.SERVICE_PID + "=org.openhab.influxdb")
94 @ConfigurableService(category = "persistence", label = "InfluxDB Persistence Service", description_uri = InfluxDBPersistenceService.CONFIG_URI)
95 public class InfluxDBPersistenceService implements QueryablePersistenceService {
96 public static final String SERVICE_NAME = "influxdb";
98 private final Logger logger = LoggerFactory.getLogger(InfluxDBPersistenceService.class);
100 private static final int COMMIT_INTERVAL = 3; // in s
101 protected static final String CONFIG_URI = "persistence:influxdb";
103 // External dependencies
104 private final ItemRegistry itemRegistry;
105 private final InfluxDBMetadataService influxDBMetadataService;
107 private final InfluxDBConfiguration configuration;
108 private final InfluxDBRepository influxDBRepository;
109 private boolean serviceActivated;
112 private final ScheduledFuture<?> storeJob;
113 private final BlockingQueue<InfluxPoint> pointsQueue = new LinkedBlockingQueue<>();
116 private final Set<ItemFactory> itemFactories = new HashSet<>();
117 private Map<String, Class<? extends State>> desiredClasses = new HashMap<>();
120 public InfluxDBPersistenceService(final @Reference ItemRegistry itemRegistry,
121 final @Reference InfluxDBMetadataService influxDBMetadataService, Map<String, Object> config) {
122 this.itemRegistry = itemRegistry;
123 this.influxDBMetadataService = influxDBMetadataService;
124 this.configuration = new InfluxDBConfiguration(config);
125 if (configuration.isValid()) {
126 this.influxDBRepository = createInfluxDBRepository();
127 this.influxDBRepository.connect();
128 this.storeJob = ThreadPoolManager.getScheduledPool("org.openhab.influxdb")
129 .scheduleWithFixedDelay(this::commit, COMMIT_INTERVAL, COMMIT_INTERVAL, TimeUnit.SECONDS);
130 serviceActivated = true;
132 throw new IllegalArgumentException("Configuration invalid.");
135 logger.info("InfluxDB persistence service started.");
138 // Visible for testing
139 protected InfluxDBRepository createInfluxDBRepository() throws IllegalArgumentException {
140 return switch (configuration.getVersion()) {
141 case V1 -> new InfluxDB1RepositoryImpl(configuration, influxDBMetadataService);
142 case V2 -> new InfluxDB2RepositoryImpl(configuration, influxDBMetadataService);
143 default -> throw new IllegalArgumentException("Failed to instantiate repository.");
148 * Disconnect from database when service is deactivated
151 public void deactivate() {
152 serviceActivated = false;
154 storeJob.cancel(false);
155 commit(); // ensure we at least tried to store the data;
157 if (!pointsQueue.isEmpty()) {
158 logger.warn("InfluxDB failed to finally store {} points.", pointsQueue.size());
161 influxDBRepository.disconnect();
162 logger.info("InfluxDB persistence service stopped.");
166 public String getId() {
171 public String getLabel(@Nullable Locale locale) {
172 return "InfluxDB persistence layer";
176 public Set<PersistenceItemInfo> getItemInfo() {
177 if (checkConnection()) {
178 return influxDBRepository.getStoredItemsCount().entrySet().stream().map(InfluxDBPersistentItemInfo::new)
179 .collect(Collectors.toUnmodifiableSet());
181 logger.info("getItemInfo ignored, InfluxDB is not connected");
187 public void store(Item item) {
192 public void store(Item item, @Nullable String alias) {
193 if (!serviceActivated) {
194 logger.warn("InfluxDB service not ready. Storing {} rejected.", item);
197 convert(item, alias).thenAccept(point -> {
199 logger.trace("Ignoring item {}, conversion to an InfluxDB point failed.", item.getName());
202 if (pointsQueue.offer(point)) {
203 logger.trace("Queued {} for item {}", point, item);
205 logger.warn("Failed to queue {} for item {}", point, item);
211 public Iterable<HistoricItem> query(FilterCriteria filter) {
212 if (serviceActivated && checkConnection()) {
214 "Query-Filter: itemname: {}, ordering: {}, state: {}, operator: {}, getBeginDate: {}, getEndDate: {}, getPageSize: {}, getPageNumber: {}",
215 filter.getItemName(), filter.getOrdering().toString(), filter.getState(), filter.getOperator(),
216 filter.getBeginDate(), filter.getEndDate(), filter.getPageSize(), filter.getPageNumber());
217 if (filter.getItemName() == null) {
218 logger.warn("Item name is missing in filter {}", filter);
221 String query = influxDBRepository.createQueryCreator().createQuery(filter,
222 configuration.getRetentionPolicy());
223 logger.trace("Query {}", query);
224 List<InfluxDBRepository.InfluxRow> results = influxDBRepository.query(query);
225 return results.stream().map(this::mapRowToHistoricItem).collect(Collectors.toList());
227 logger.debug("Query for persisted data ignored, InfluxDB is not connected");
232 private HistoricItem mapRowToHistoricItem(InfluxDBRepository.InfluxRow row) {
233 State state = InfluxDBStateConvertUtils.objectToState(row.value(), row.itemName(), itemRegistry);
234 return new InfluxDBHistoricItem(row.itemName(), state,
235 ZonedDateTime.ofInstant(row.time(), ZoneId.systemDefault()));
239 public List<PersistenceStrategy> getDefaultStrategies() {
240 return List.of(PersistenceStrategy.Globals.RESTORE, PersistenceStrategy.Globals.CHANGE);
244 * check connection and try reconnect
246 * @return true if connected
248 private boolean checkConnection() {
249 if (influxDBRepository.isConnected()) {
251 } else if (serviceActivated) {
252 logger.debug("Connection lost, trying re-connection");
253 return influxDBRepository.connect();
258 private void commit() {
259 if (!pointsQueue.isEmpty() && checkConnection()) {
260 List<InfluxPoint> points = new ArrayList<>();
261 pointsQueue.drainTo(points);
262 if (!influxDBRepository.write(points)) {
263 logger.warn("Re-queuing {} elements, failed to write batch.", points.size());
264 pointsQueue.addAll(points);
266 logger.trace("Wrote {} elements to database", points.size());
272 * Convert incoming data to an {@link InfluxPoint} for further processing. This is needed because storage is
273 * asynchronous and the item data may have changed.
275 * The method is package-private for testing.
277 * @param item the {@link Item} that needs conversion
278 * @param storeAlias an (optional) alias for the item
279 * @return a {@link CompletableFuture} that contains either <code>null</code> for item states that cannot be
280 * converted or the corresponding {@link InfluxPoint}
282 CompletableFuture<@Nullable InfluxPoint> convert(Item item, @Nullable String storeAlias) {
283 String itemName = item.getName();
284 String itemLabel = item.getLabel();
285 String category = item.getCategory();
286 State state = item.getState();
287 String itemType = item.getType();
288 Instant timeStamp = Instant.now();
290 if (state instanceof UnDefType) {
291 return CompletableFuture.completedFuture(null);
294 return CompletableFuture.supplyAsync(() -> {
295 String measurementName = storeAlias != null && !storeAlias.isBlank() ? storeAlias : itemName;
296 measurementName = influxDBMetadataService.getMeasurementNameOrDefault(itemName, measurementName);
298 if (configuration.isReplaceUnderscore()) {
299 measurementName = measurementName.replace('_', '.');
302 State storeState = Objects
303 .requireNonNullElse(state.as(desiredClasses.get(ItemUtil.getMainItemType(itemType))), state);
304 Object value = InfluxDBStateConvertUtils.stateToObject(storeState);
306 InfluxPoint.Builder pointBuilder = InfluxPoint.newBuilder(measurementName).withTime(timeStamp)
307 .withValue(value).withTag(TAG_ITEM_NAME, itemName);
309 if (configuration.isAddCategoryTag()) {
310 String categoryName = Objects.requireNonNullElse(category, "n/a");
311 pointBuilder.withTag(TAG_CATEGORY_NAME, categoryName);
314 if (configuration.isAddTypeTag()) {
315 pointBuilder.withTag(TAG_TYPE_NAME, itemType);
318 if (configuration.isAddLabelTag()) {
319 String labelName = Objects.requireNonNullElse(itemLabel, "n/a");
320 pointBuilder.withTag(TAG_LABEL_NAME, labelName);
323 influxDBMetadataService.getMetaData(itemName)
324 .ifPresent(metadata -> metadata.getConfiguration().forEach(pointBuilder::withTag));
326 return pointBuilder.build();
330 @Reference(cardinality = ReferenceCardinality.AT_LEAST_ONE, policy = ReferencePolicy.DYNAMIC)
331 public void setItemFactory(ItemFactory itemFactory) {
332 itemFactories.add(itemFactory);
333 calculateItemTypeClasses();
336 public void unsetItemFactory(ItemFactory itemFactory) {
337 itemFactories.remove(itemFactory);
338 calculateItemTypeClasses();
341 private synchronized void calculateItemTypeClasses() {
342 Map<String, Class<? extends State>> desiredClasses = new HashMap<>();
343 itemFactories.forEach(factory -> {
344 for (String itemType : factory.getSupportedItemTypes()) {
345 Item item = factory.createItem(itemType, "influxItem");
347 item.getAcceptedCommandTypes().stream()
348 .filter(commandType -> commandType.isAssignableFrom(State.class)).findFirst()
349 .map(commandType -> (Class<? extends State>) commandType.asSubclass(State.class))
350 .ifPresent(desiredClass -> desiredClasses.put(itemType, desiredClass));
354 this.desiredClasses = desiredClasses;