]> git.basschouten.com Git - openhab-addons.git/blob
31dff022172b6c3b60419921992e6fde63bd344b
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.persistence.influxdb;
14
15 import java.time.ZoneId;
16 import java.time.ZonedDateTime;
17 import java.util.List;
18 import java.util.Locale;
19 import java.util.Map;
20 import java.util.Set;
21 import java.util.stream.Collectors;
22
23 import org.eclipse.jdt.annotation.NonNullByDefault;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.openhab.core.config.core.ConfigurableService;
26 import org.openhab.core.items.Item;
27 import org.openhab.core.items.ItemRegistry;
28 import org.openhab.core.persistence.FilterCriteria;
29 import org.openhab.core.persistence.HistoricItem;
30 import org.openhab.core.persistence.PersistenceItemInfo;
31 import org.openhab.core.persistence.PersistenceService;
32 import org.openhab.core.persistence.QueryablePersistenceService;
33 import org.openhab.core.persistence.strategy.PersistenceStrategy;
34 import org.openhab.core.types.State;
35 import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
36 import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
37 import org.openhab.persistence.influxdb.internal.InfluxDBHistoricItem;
38 import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
39 import org.openhab.persistence.influxdb.internal.InfluxDBPersistentItemInfo;
40 import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
41 import org.openhab.persistence.influxdb.internal.InfluxDBStateConvertUtils;
42 import org.openhab.persistence.influxdb.internal.InfluxPoint;
43 import org.openhab.persistence.influxdb.internal.ItemToStorePointCreator;
44 import org.openhab.persistence.influxdb.internal.UnexpectedConditionException;
45 import org.openhab.persistence.influxdb.internal.influx1.InfluxDB1RepositoryImpl;
46 import org.openhab.persistence.influxdb.internal.influx2.InfluxDB2RepositoryImpl;
47 import org.osgi.framework.Constants;
48 import org.osgi.service.component.annotations.Activate;
49 import org.osgi.service.component.annotations.Component;
50 import org.osgi.service.component.annotations.Deactivate;
51 import org.osgi.service.component.annotations.Reference;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 /**
56  * This is the implementation of the InfluxDB {@link PersistenceService}. It
57  * persists item values using the <a href="http://influxdb.org">InfluxDB time
58  * series database. The states ( {@link State}) of an {@link Item} are persisted
59  * by default in a time series with names equal to the name of the item.
60  *
61  * This addon supports 1.X and 2.X versions, as two versions are incompatible
62  * and use different drivers the specific code for each version is accessed by
63  * {@link InfluxDBRepository} and {@link FilterCriteriaQueryCreator} interfaces
64  * and specific implementation reside in
65  * {@link org.openhab.persistence.influxdb.internal.influx1} and
66  * {@link org.openhab.persistence.influxdb.internal.influx2} packages
67  *
68  * @author Theo Weiss - Initial contribution, rewrite of
69  *         org.openhab.persistence.influxdb
70  * @author Joan Pujol Espinar - Addon rewrite refactoring code and adding
71  *         support for InfluxDB 2.0. Some tag code is based from not integrated
72  *         branch from Dominik Vorreiter
73  */
74 @NonNullByDefault
75 @Component(service = { PersistenceService.class,
76         QueryablePersistenceService.class }, configurationPid = "org.openhab.influxdb", //
77         property = Constants.SERVICE_PID + "=org.openhab.influxdb")
78 @ConfigurableService(category = "persistence", label = "InfluxDB Persistence Service", description_uri = InfluxDBPersistenceService.CONFIG_URI)
79 public class InfluxDBPersistenceService implements QueryablePersistenceService {
80     public static final String SERVICE_NAME = "influxdb";
81
82     private final Logger logger = LoggerFactory.getLogger(InfluxDBPersistenceService.class);
83
84     protected static final String CONFIG_URI = "persistence:influxdb";
85
86     // External dependencies
87     private final ItemRegistry itemRegistry;
88     private final InfluxDBMetadataService influxDBMetadataService;
89
90     private final InfluxDBConfiguration configuration;
91     private final ItemToStorePointCreator itemToStorePointCreator;
92     private final InfluxDBRepository influxDBRepository;
93     private boolean tryReconnection;
94
95     @Activate
96     public InfluxDBPersistenceService(final @Reference ItemRegistry itemRegistry,
97             final @Reference InfluxDBMetadataService influxDBMetadataService, Map<String, Object> config) {
98         this.itemRegistry = itemRegistry;
99         this.influxDBMetadataService = influxDBMetadataService;
100         this.configuration = new InfluxDBConfiguration(config);
101         if (configuration.isValid()) {
102             this.influxDBRepository = createInfluxDBRepository();
103             this.influxDBRepository.connect();
104             this.itemToStorePointCreator = new ItemToStorePointCreator(configuration, influxDBMetadataService);
105             tryReconnection = true;
106         } else {
107             throw new IllegalArgumentException("Configuration invalid.");
108         }
109
110         logger.info("InfluxDB persistence service started.");
111     }
112
113     // Visible for testing
114     protected InfluxDBRepository createInfluxDBRepository() throws IllegalArgumentException {
115         return switch (configuration.getVersion()) {
116             case V1 -> new InfluxDB1RepositoryImpl(configuration, influxDBMetadataService);
117             case V2 -> new InfluxDB2RepositoryImpl(configuration, influxDBMetadataService);
118             default -> throw new IllegalArgumentException("Failed to instantiate repository.");
119         };
120     }
121
122     /**
123      * Disconnect from database when service is deactivated
124      */
125     @Deactivate
126     public void deactivate() {
127         tryReconnection = false;
128         influxDBRepository.disconnect();
129         logger.info("InfluxDB persistence service stopped.");
130     }
131
132     @Override
133     public String getId() {
134         return SERVICE_NAME;
135     }
136
137     @Override
138     public String getLabel(@Nullable Locale locale) {
139         return "InfluxDB persistence layer";
140     }
141
142     @Override
143     public Set<PersistenceItemInfo> getItemInfo() {
144         if (checkConnection()) {
145             return influxDBRepository.getStoredItemsCount().entrySet().stream().map(InfluxDBPersistentItemInfo::new)
146                     .collect(Collectors.toUnmodifiableSet());
147         } else {
148             logger.info("getItemInfo ignored, InfluxDB is not connected");
149             return Set.of();
150         }
151     }
152
153     @Override
154     public void store(Item item) {
155         store(item, null);
156     }
157
158     @Override
159     public void store(Item item, @Nullable String alias) {
160         if (checkConnection()) {
161             InfluxPoint point = itemToStorePointCreator.convert(item, alias);
162             if (point != null) {
163                 try {
164                     influxDBRepository.write(point);
165                     logger.trace("Stored item {} in InfluxDB point {}", item, point);
166                 } catch (UnexpectedConditionException e) {
167                     logger.warn("Failed to store item {} in InfluxDB point {}", point, item);
168                 }
169             } else {
170                 logger.trace("Ignoring item {}, conversion to an InfluxDB point failed.", item);
171             }
172         } else {
173             logger.debug("store ignored, InfluxDB is not connected");
174         }
175     }
176
177     @Override
178     public Iterable<HistoricItem> query(FilterCriteria filter) {
179         if (checkConnection()) {
180             logger.trace(
181                     "Query-Filter: itemname: {}, ordering: {}, state: {},  operator: {}, getBeginDate: {}, getEndDate: {}, getPageSize: {}, getPageNumber: {}",
182                     filter.getItemName(), filter.getOrdering().toString(), filter.getState(), filter.getOperator(),
183                     filter.getBeginDate(), filter.getEndDate(), filter.getPageSize(), filter.getPageNumber());
184             String query = influxDBRepository.createQueryCreator().createQuery(filter,
185                     configuration.getRetentionPolicy());
186             logger.trace("Query {}", query);
187             List<InfluxDBRepository.InfluxRow> results = influxDBRepository.query(query);
188             return results.stream().map(this::mapRowToHistoricItem).collect(Collectors.toList());
189         } else {
190             logger.debug("Query for persisted data ignored, InfluxDB is not connected");
191             return List.of();
192         }
193     }
194
195     private HistoricItem mapRowToHistoricItem(InfluxDBRepository.InfluxRow row) {
196         State state = InfluxDBStateConvertUtils.objectToState(row.value(), row.itemName(), itemRegistry);
197         return new InfluxDBHistoricItem(row.itemName(), state,
198                 ZonedDateTime.ofInstant(row.time(), ZoneId.systemDefault()));
199     }
200
201     @Override
202     public List<PersistenceStrategy> getDefaultStrategies() {
203         return List.of(PersistenceStrategy.Globals.RESTORE, PersistenceStrategy.Globals.CHANGE);
204     }
205
206     /**
207      * check connection and try reconnect
208      *
209      * @return true if connected
210      */
211     private boolean checkConnection() {
212         if (influxDBRepository.isConnected()) {
213             return true;
214         } else if (tryReconnection) {
215             logger.debug("Connection lost, trying re-connection");
216             return influxDBRepository.connect();
217         }
218         return false;
219     }
220 }