2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.persistence.influxdb;
15 import java.time.ZoneId;
16 import java.time.ZonedDateTime;
17 import java.util.List;
18 import java.util.Locale;
21 import java.util.stream.Collectors;
23 import org.eclipse.jdt.annotation.NonNullByDefault;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.openhab.core.config.core.ConfigurableService;
26 import org.openhab.core.items.Item;
27 import org.openhab.core.items.ItemRegistry;
28 import org.openhab.core.items.MetadataRegistry;
29 import org.openhab.core.persistence.FilterCriteria;
30 import org.openhab.core.persistence.HistoricItem;
31 import org.openhab.core.persistence.PersistenceItemInfo;
32 import org.openhab.core.persistence.PersistenceService;
33 import org.openhab.core.persistence.QueryablePersistenceService;
34 import org.openhab.core.persistence.strategy.PersistenceStrategy;
35 import org.openhab.core.types.State;
36 import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
37 import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
38 import org.openhab.persistence.influxdb.internal.InfluxDBHistoricItem;
39 import org.openhab.persistence.influxdb.internal.InfluxDBPersistentItemInfo;
40 import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
41 import org.openhab.persistence.influxdb.internal.InfluxDBStateConvertUtils;
42 import org.openhab.persistence.influxdb.internal.InfluxPoint;
43 import org.openhab.persistence.influxdb.internal.InfluxRow;
44 import org.openhab.persistence.influxdb.internal.ItemToStorePointCreator;
45 import org.openhab.persistence.influxdb.internal.RepositoryFactory;
46 import org.osgi.framework.Constants;
47 import org.osgi.service.component.annotations.Activate;
48 import org.osgi.service.component.annotations.Component;
49 import org.osgi.service.component.annotations.Deactivate;
50 import org.osgi.service.component.annotations.Modified;
51 import org.osgi.service.component.annotations.Reference;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
56 * This is the implementation of the InfluxDB {@link PersistenceService}. It
57 * persists item values using the <a href="http://influxdb.org">InfluxDB time
58 * series database. The states ( {@link State}) of an {@link Item} are persisted
59 * by default in a time series with names equal to the name of the item.
61 * This addon supports 1.X and 2.X versions, as two versions are incompatible
62 * and use different drivers the specific code for each version is accessed by
63 * {@link InfluxDBRepository} and {@link FilterCriteriaQueryCreator} interfaces
64 * and specific implementation reside in
65 * {@link org.openhab.persistence.influxdb.internal.influx1} and
66 * {@link org.openhab.persistence.influxdb.internal.influx2} packages
68 * @author Theo Weiss - Initial contribution, rewrite of
69 * org.openhab.persistence.influxdb
70 * @author Joan Pujol Espinar - Addon rewrite refactoring code and adding
71 * support for InfluxDB 2.0. Some tag code is based from not integrated
72 * branch from Dominik Vorreiter
75 @Component(service = { PersistenceService.class,
76 QueryablePersistenceService.class }, configurationPid = "org.openhab.influxdb", //
77 property = Constants.SERVICE_PID + "=org.openhab.influxdb")
78 @ConfigurableService(category = "persistence", label = "InfluxDB Persistence Service", description_uri = InfluxDBPersistenceService.CONFIG_URI)
79 public class InfluxDBPersistenceService implements QueryablePersistenceService {
80 public static final String SERVICE_NAME = "influxdb";
82 private final Logger logger = LoggerFactory.getLogger(InfluxDBPersistenceService.class);
84 protected static final String CONFIG_URI = "persistence:influxdb";
86 // External dependencies
87 private final ItemRegistry itemRegistry;
88 private final MetadataRegistry metadataRegistry;
90 // Internal dependencies/state
91 private InfluxDBConfiguration configuration = InfluxDBConfiguration.NO_CONFIGURATION;
93 // Relax rules because can only be null if component is not active
94 private @NonNullByDefault({}) ItemToStorePointCreator itemToStorePointCreator;
95 private @NonNullByDefault({}) InfluxDBRepository influxDBRepository;
97 private boolean tryReconnection = false;
100 public InfluxDBPersistenceService(final @Reference ItemRegistry itemRegistry,
101 final @Reference MetadataRegistry metadataRegistry) {
102 this.itemRegistry = itemRegistry;
103 this.metadataRegistry = metadataRegistry;
107 * Connect to database when service is activated
110 public void activate(final @Nullable Map<String, Object> config) {
111 logger.debug("InfluxDB persistence service is being activated");
113 if (loadConfiguration(config)) {
114 itemToStorePointCreator = new ItemToStorePointCreator(configuration, metadataRegistry);
115 influxDBRepository = createInfluxDBRepository();
116 influxDBRepository.connect();
117 tryReconnection = true;
119 logger.error("Cannot load configuration, persistence service wont work");
120 tryReconnection = false;
123 logger.debug("InfluxDB persistence service is now activated");
126 // Visible for testing
127 protected InfluxDBRepository createInfluxDBRepository() {
128 return RepositoryFactory.createRepository(configuration);
132 * Disconnect from database when service is deactivated
135 public void deactivate() {
136 logger.debug("InfluxDB persistence service deactivated");
137 tryReconnection = false;
138 if (influxDBRepository != null) {
139 influxDBRepository.disconnect();
140 influxDBRepository = null;
142 if (itemToStorePointCreator != null) {
143 itemToStorePointCreator = null;
148 * Rerun deactivation/activation code each time configuration is changed
151 protected void modified(@Nullable Map<String, Object> config) {
152 if (config != null) {
153 logger.debug("Config has been modified will deactivate/activate with new config");
158 logger.warn("Null configuration, ignoring");
162 private boolean loadConfiguration(@Nullable Map<String, Object> config) {
163 boolean configurationIsValid;
164 if (config != null) {
165 configuration = new InfluxDBConfiguration(config);
166 configurationIsValid = configuration.isValid();
167 if (configurationIsValid) {
168 logger.debug("Loaded configuration {}", config);
170 logger.warn("Some configuration properties are not valid {}", config);
173 configuration = InfluxDBConfiguration.NO_CONFIGURATION;
174 configurationIsValid = false;
175 logger.warn("Ignoring configuration because it's null");
177 return configurationIsValid;
181 public String getId() {
186 public String getLabel(@Nullable Locale locale) {
187 return "InfluxDB persistence layer";
191 public Set<PersistenceItemInfo> getItemInfo() {
192 if (checkConnection()) {
193 return influxDBRepository.getStoredItemsCount().entrySet().stream()
194 .map(entry -> new InfluxDBPersistentItemInfo(entry.getKey(), entry.getValue()))
195 .collect(Collectors.toUnmodifiableSet());
197 logger.info("getItemInfo ignored, InfluxDB is not yet connected");
203 public void store(Item item) {
204 store(item, item.getName());
208 public void store(Item item, @Nullable String alias) {
209 if (checkConnection()) {
210 InfluxPoint point = itemToStorePointCreator.convert(item, alias);
212 logger.trace("Storing item {} in InfluxDB point {}", item, point);
213 influxDBRepository.write(point);
215 logger.trace("Ignoring item {} as is cannot be converted to an InfluxDB point", item);
218 logger.debug("store ignored, InfluxDB is not yet connected");
223 public Iterable<HistoricItem> query(FilterCriteria filter) {
224 logger.debug("Got a query for historic points!");
226 if (checkConnection()) {
228 "Filter: itemname: {}, ordering: {}, state: {}, operator: {}, getBeginDate: {}, getEndDate: {}, getPageSize: {}, getPageNumber: {}",
229 filter.getItemName(), filter.getOrdering().toString(), filter.getState(), filter.getOperator(),
230 filter.getBeginDate(), filter.getEndDate(), filter.getPageSize(), filter.getPageNumber());
232 String query = RepositoryFactory.createQueryCreator(configuration, metadataRegistry).createQuery(filter,
233 configuration.getRetentionPolicy());
234 logger.trace("Query {}", query);
235 List<InfluxRow> results = influxDBRepository.query(query);
236 return results.stream().map(this::mapRow2HistoricItem).collect(Collectors.toList());
238 logger.debug("query ignored, InfluxDB is not yet connected");
243 private HistoricItem mapRow2HistoricItem(InfluxRow row) {
244 State state = InfluxDBStateConvertUtils.objectToState(row.getValue(), row.getItemName(), itemRegistry);
245 return new InfluxDBHistoricItem(row.getItemName(), state,
246 ZonedDateTime.ofInstant(row.getTime(), ZoneId.systemDefault()));
250 public List<PersistenceStrategy> getDefaultStrategies() {
251 return List.of(PersistenceStrategy.Globals.RESTORE, PersistenceStrategy.Globals.CHANGE);
255 * check connection and try reconnect
257 * @return true if connected
259 private boolean checkConnection() {
260 if (influxDBRepository == null) {
262 } else if (influxDBRepository.isConnected()) {
264 } else if (tryReconnection) {
265 logger.debug("Connection lost, trying re-connection");
266 influxDBRepository.connect();
267 return influxDBRepository.isConnected();