2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.persistence.influxdb;
15 import java.time.ZoneId;
16 import java.time.ZonedDateTime;
17 import java.util.List;
18 import java.util.Locale;
21 import java.util.stream.Collectors;
23 import org.eclipse.jdt.annotation.NonNullByDefault;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.openhab.core.config.core.ConfigurableService;
26 import org.openhab.core.items.Item;
27 import org.openhab.core.items.ItemRegistry;
28 import org.openhab.core.persistence.FilterCriteria;
29 import org.openhab.core.persistence.HistoricItem;
30 import org.openhab.core.persistence.PersistenceItemInfo;
31 import org.openhab.core.persistence.PersistenceService;
32 import org.openhab.core.persistence.QueryablePersistenceService;
33 import org.openhab.core.persistence.strategy.PersistenceStrategy;
34 import org.openhab.core.types.State;
35 import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
36 import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
37 import org.openhab.persistence.influxdb.internal.InfluxDBHistoricItem;
38 import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
39 import org.openhab.persistence.influxdb.internal.InfluxDBPersistentItemInfo;
40 import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
41 import org.openhab.persistence.influxdb.internal.InfluxDBStateConvertUtils;
42 import org.openhab.persistence.influxdb.internal.InfluxPoint;
43 import org.openhab.persistence.influxdb.internal.ItemToStorePointCreator;
44 import org.openhab.persistence.influxdb.internal.UnexpectedConditionException;
45 import org.openhab.persistence.influxdb.internal.influx1.InfluxDB1RepositoryImpl;
46 import org.openhab.persistence.influxdb.internal.influx2.InfluxDB2RepositoryImpl;
47 import org.osgi.framework.Constants;
48 import org.osgi.service.component.annotations.Activate;
49 import org.osgi.service.component.annotations.Component;
50 import org.osgi.service.component.annotations.Deactivate;
51 import org.osgi.service.component.annotations.Reference;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
56 * This is the implementation of the InfluxDB {@link PersistenceService}. It
57 * persists item values using the <a href="http://influxdb.org">InfluxDB time
58 * series database. The states ( {@link State}) of an {@link Item} are persisted
59 * by default in a time series with names equal to the name of the item.
61 * This addon supports 1.X and 2.X versions, as two versions are incompatible
62 * and use different drivers the specific code for each version is accessed by
63 * {@link InfluxDBRepository} and {@link FilterCriteriaQueryCreator} interfaces
64 * and specific implementation reside in
65 * {@link org.openhab.persistence.influxdb.internal.influx1} and
66 * {@link org.openhab.persistence.influxdb.internal.influx2} packages
68 * @author Theo Weiss - Initial contribution, rewrite of
69 * org.openhab.persistence.influxdb
70 * @author Joan Pujol Espinar - Addon rewrite refactoring code and adding
71 * support for InfluxDB 2.0. Some tag code is based from not integrated
72 * branch from Dominik Vorreiter
75 @Component(service = { PersistenceService.class,
76 QueryablePersistenceService.class }, configurationPid = "org.openhab.influxdb", //
77 property = Constants.SERVICE_PID + "=org.openhab.influxdb")
78 @ConfigurableService(category = "persistence", label = "InfluxDB Persistence Service", description_uri = InfluxDBPersistenceService.CONFIG_URI)
79 public class InfluxDBPersistenceService implements QueryablePersistenceService {
80 public static final String SERVICE_NAME = "influxdb";
82 private final Logger logger = LoggerFactory.getLogger(InfluxDBPersistenceService.class);
84 protected static final String CONFIG_URI = "persistence:influxdb";
86 // External dependencies
87 private final ItemRegistry itemRegistry;
88 private final InfluxDBMetadataService influxDBMetadataService;
90 private final InfluxDBConfiguration configuration;
91 private final ItemToStorePointCreator itemToStorePointCreator;
92 private final InfluxDBRepository influxDBRepository;
93 private boolean tryReconnection;
96 public InfluxDBPersistenceService(final @Reference ItemRegistry itemRegistry,
97 final @Reference InfluxDBMetadataService influxDBMetadataService, Map<String, Object> config) {
98 this.itemRegistry = itemRegistry;
99 this.influxDBMetadataService = influxDBMetadataService;
100 this.configuration = new InfluxDBConfiguration(config);
101 if (configuration.isValid()) {
102 this.influxDBRepository = createInfluxDBRepository();
103 this.influxDBRepository.connect();
104 this.itemToStorePointCreator = new ItemToStorePointCreator(configuration, influxDBMetadataService);
105 tryReconnection = true;
107 throw new IllegalArgumentException("Configuration invalid.");
110 logger.info("InfluxDB persistence service started.");
113 // Visible for testing
114 protected InfluxDBRepository createInfluxDBRepository() throws IllegalArgumentException {
115 return switch (configuration.getVersion()) {
116 case V1 -> new InfluxDB1RepositoryImpl(configuration, influxDBMetadataService);
117 case V2 -> new InfluxDB2RepositoryImpl(configuration, influxDBMetadataService);
118 default -> throw new IllegalArgumentException("Failed to instantiate repository.");
123 * Disconnect from database when service is deactivated
126 public void deactivate() {
127 tryReconnection = false;
128 influxDBRepository.disconnect();
129 logger.info("InfluxDB persistence service stopped.");
133 public String getId() {
138 public String getLabel(@Nullable Locale locale) {
139 return "InfluxDB persistence layer";
143 public Set<PersistenceItemInfo> getItemInfo() {
144 if (checkConnection()) {
145 return influxDBRepository.getStoredItemsCount().entrySet().stream().map(InfluxDBPersistentItemInfo::new)
146 .collect(Collectors.toUnmodifiableSet());
148 logger.info("getItemInfo ignored, InfluxDB is not connected");
154 public void store(Item item) {
159 public void store(Item item, @Nullable String alias) {
160 if (checkConnection()) {
161 InfluxPoint point = itemToStorePointCreator.convert(item, alias);
164 influxDBRepository.write(point);
165 logger.trace("Stored item {} in InfluxDB point {}", item, point);
166 } catch (UnexpectedConditionException e) {
167 logger.warn("Failed to store item {} in InfluxDB point {}", point, item);
170 logger.trace("Ignoring item {}, conversion to an InfluxDB point failed.", item);
173 logger.debug("store ignored, InfluxDB is not connected");
178 public Iterable<HistoricItem> query(FilterCriteria filter) {
179 if (checkConnection()) {
181 "Query-Filter: itemname: {}, ordering: {}, state: {}, operator: {}, getBeginDate: {}, getEndDate: {}, getPageSize: {}, getPageNumber: {}",
182 filter.getItemName(), filter.getOrdering().toString(), filter.getState(), filter.getOperator(),
183 filter.getBeginDate(), filter.getEndDate(), filter.getPageSize(), filter.getPageNumber());
184 String query = influxDBRepository.createQueryCreator().createQuery(filter,
185 configuration.getRetentionPolicy());
186 logger.trace("Query {}", query);
187 List<InfluxDBRepository.InfluxRow> results = influxDBRepository.query(query);
188 return results.stream().map(this::mapRowToHistoricItem).collect(Collectors.toList());
190 logger.debug("Query for persisted data ignored, InfluxDB is not connected");
195 private HistoricItem mapRowToHistoricItem(InfluxDBRepository.InfluxRow row) {
196 State state = InfluxDBStateConvertUtils.objectToState(row.value(), row.itemName(), itemRegistry);
197 return new InfluxDBHistoricItem(row.itemName(), state,
198 ZonedDateTime.ofInstant(row.time(), ZoneId.systemDefault()));
202 public List<PersistenceStrategy> getDefaultStrategies() {
203 return List.of(PersistenceStrategy.Globals.RESTORE, PersistenceStrategy.Globals.CHANGE);
207 * check connection and try reconnect
209 * @return true if connected
211 private boolean checkConnection() {
212 if (influxDBRepository.isConnected()) {
214 } else if (tryReconnection) {
215 logger.debug("Connection lost, trying re-connection");
216 return influxDBRepository.connect();