]> git.basschouten.com Git - openhab-addons.git/blob
2888cb8fdb00fadb2a19353a78fae43ebf037a11
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.persistence.influxdb;
14
15 import java.time.ZoneId;
16 import java.time.ZonedDateTime;
17 import java.util.List;
18 import java.util.Locale;
19 import java.util.Map;
20 import java.util.Set;
21 import java.util.stream.Collectors;
22
23 import org.eclipse.jdt.annotation.NonNullByDefault;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.openhab.core.config.core.ConfigurableService;
26 import org.openhab.core.items.Item;
27 import org.openhab.core.items.ItemRegistry;
28 import org.openhab.core.items.MetadataRegistry;
29 import org.openhab.core.persistence.FilterCriteria;
30 import org.openhab.core.persistence.HistoricItem;
31 import org.openhab.core.persistence.PersistenceItemInfo;
32 import org.openhab.core.persistence.PersistenceService;
33 import org.openhab.core.persistence.QueryablePersistenceService;
34 import org.openhab.core.persistence.strategy.PersistenceStrategy;
35 import org.openhab.core.types.State;
36 import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
37 import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
38 import org.openhab.persistence.influxdb.internal.InfluxDBHistoricItem;
39 import org.openhab.persistence.influxdb.internal.InfluxDBPersistentItemInfo;
40 import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
41 import org.openhab.persistence.influxdb.internal.InfluxDBStateConvertUtils;
42 import org.openhab.persistence.influxdb.internal.InfluxPoint;
43 import org.openhab.persistence.influxdb.internal.InfluxRow;
44 import org.openhab.persistence.influxdb.internal.ItemToStorePointCreator;
45 import org.openhab.persistence.influxdb.internal.RepositoryFactory;
46 import org.osgi.framework.Constants;
47 import org.osgi.service.component.annotations.Activate;
48 import org.osgi.service.component.annotations.Component;
49 import org.osgi.service.component.annotations.Deactivate;
50 import org.osgi.service.component.annotations.Modified;
51 import org.osgi.service.component.annotations.Reference;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 /**
56  * This is the implementation of the InfluxDB {@link PersistenceService}. It
57  * persists item values using the <a href="http://influxdb.org">InfluxDB time
58  * series database. The states ( {@link State}) of an {@link Item} are persisted
59  * by default in a time series with names equal to the name of the item.
60  *
61  * This addon supports 1.X and 2.X versions, as two versions are incompatible
62  * and use different drivers the specific code for each version is accessed by
63  * {@link InfluxDBRepository} and {@link FilterCriteriaQueryCreator} interfaces
64  * and specific implementation reside in
65  * {@link org.openhab.persistence.influxdb.internal.influx1} and
66  * {@link org.openhab.persistence.influxdb.internal.influx2} packages
67  *
68  * @author Theo Weiss - Initial contribution, rewrite of
69  *         org.openhab.persistence.influxdb
70  * @author Joan Pujol Espinar - Addon rewrite refactoring code and adding
71  *         support for InfluxDB 2.0. Some tag code is based from not integrated
72  *         branch from Dominik Vorreiter
73  */
74 @NonNullByDefault
75 @Component(service = { PersistenceService.class,
76         QueryablePersistenceService.class }, configurationPid = "org.openhab.influxdb", //
77         property = Constants.SERVICE_PID + "=org.openhab.influxdb")
78 @ConfigurableService(category = "persistence", label = "InfluxDB Persistence Service", description_uri = InfluxDBPersistenceService.CONFIG_URI)
79 public class InfluxDBPersistenceService implements QueryablePersistenceService {
80     public static final String SERVICE_NAME = "influxdb";
81
82     private final Logger logger = LoggerFactory.getLogger(InfluxDBPersistenceService.class);
83
84     protected static final String CONFIG_URI = "persistence:influxdb";
85
86     // External dependencies
87     private final ItemRegistry itemRegistry;
88     private final MetadataRegistry metadataRegistry;
89
90     // Internal dependencies/state
91     private InfluxDBConfiguration configuration = InfluxDBConfiguration.NO_CONFIGURATION;
92
93     // Relax rules because can only be null if component is not active
94     private @NonNullByDefault({}) ItemToStorePointCreator itemToStorePointCreator;
95     private @NonNullByDefault({}) InfluxDBRepository influxDBRepository;
96
97     private boolean tryReconnection = false;
98
99     @Activate
100     public InfluxDBPersistenceService(final @Reference ItemRegistry itemRegistry,
101             final @Reference MetadataRegistry metadataRegistry) {
102         this.itemRegistry = itemRegistry;
103         this.metadataRegistry = metadataRegistry;
104     }
105
106     /**
107      * Connect to database when service is activated
108      */
109     @Activate
110     public void activate(final @Nullable Map<String, Object> config) {
111         logger.debug("InfluxDB persistence service is being activated");
112
113         if (loadConfiguration(config)) {
114             itemToStorePointCreator = new ItemToStorePointCreator(configuration, metadataRegistry);
115             influxDBRepository = createInfluxDBRepository();
116             influxDBRepository.connect();
117             tryReconnection = true;
118         } else {
119             logger.error("Cannot load configuration, persistence service wont work");
120             tryReconnection = false;
121         }
122
123         logger.debug("InfluxDB persistence service is now activated");
124     }
125
126     // Visible for testing
127     protected InfluxDBRepository createInfluxDBRepository() {
128         return RepositoryFactory.createRepository(configuration);
129     }
130
131     /**
132      * Disconnect from database when service is deactivated
133      */
134     @Deactivate
135     public void deactivate() {
136         logger.debug("InfluxDB persistence service deactivated");
137         tryReconnection = false;
138         if (influxDBRepository != null) {
139             influxDBRepository.disconnect();
140             influxDBRepository = null;
141         }
142         if (itemToStorePointCreator != null) {
143             itemToStorePointCreator = null;
144         }
145     }
146
147     /**
148      * Rerun deactivation/activation code each time configuration is changed
149      */
150     @Modified
151     protected void modified(@Nullable Map<String, Object> config) {
152         if (config != null) {
153             logger.debug("Config has been modified will deactivate/activate with new config");
154
155             deactivate();
156             activate(config);
157         } else {
158             logger.warn("Null configuration, ignoring");
159         }
160     }
161
162     private boolean loadConfiguration(@Nullable Map<String, Object> config) {
163         boolean configurationIsValid;
164         if (config != null) {
165             configuration = new InfluxDBConfiguration(config);
166             configurationIsValid = configuration.isValid();
167             if (configurationIsValid) {
168                 logger.debug("Loaded configuration {}", config);
169             } else {
170                 logger.warn("Some configuration properties are not valid {}", config);
171             }
172         } else {
173             configuration = InfluxDBConfiguration.NO_CONFIGURATION;
174             configurationIsValid = false;
175             logger.warn("Ignoring configuration because it's null");
176         }
177         return configurationIsValid;
178     }
179
180     @Override
181     public String getId() {
182         return SERVICE_NAME;
183     }
184
185     @Override
186     public String getLabel(@Nullable Locale locale) {
187         return "InfluxDB persistence layer";
188     }
189
190     @Override
191     public Set<PersistenceItemInfo> getItemInfo() {
192         if (checkConnection()) {
193             return influxDBRepository.getStoredItemsCount().entrySet().stream()
194                     .map(entry -> new InfluxDBPersistentItemInfo(entry.getKey(), entry.getValue()))
195                     .collect(Collectors.toUnmodifiableSet());
196         } else {
197             logger.info("getItemInfo ignored, InfluxDB is not yet connected");
198             return Set.of();
199         }
200     }
201
202     @Override
203     public void store(Item item) {
204         store(item, item.getName());
205     }
206
207     @Override
208     public void store(Item item, @Nullable String alias) {
209         if (checkConnection()) {
210             InfluxPoint point = itemToStorePointCreator.convert(item, alias);
211             if (point != null) {
212                 logger.trace("Storing item {} in InfluxDB point {}", item, point);
213                 influxDBRepository.write(point);
214             } else {
215                 logger.trace("Ignoring item {} as is cannot be converted to an InfluxDB point", item);
216             }
217         } else {
218             logger.debug("store ignored, InfluxDB is not yet connected");
219         }
220     }
221
222     @Override
223     public Iterable<HistoricItem> query(FilterCriteria filter) {
224         logger.debug("Got a query for historic points!");
225
226         if (checkConnection()) {
227             logger.trace(
228                     "Filter: itemname: {}, ordering: {}, state: {},  operator: {}, getBeginDate: {}, getEndDate: {}, getPageSize: {}, getPageNumber: {}",
229                     filter.getItemName(), filter.getOrdering().toString(), filter.getState(), filter.getOperator(),
230                     filter.getBeginDate(), filter.getEndDate(), filter.getPageSize(), filter.getPageNumber());
231
232             String query = RepositoryFactory.createQueryCreator(configuration, metadataRegistry).createQuery(filter,
233                     configuration.getRetentionPolicy());
234             logger.trace("Query {}", query);
235             List<InfluxRow> results = influxDBRepository.query(query);
236             return results.stream().map(this::mapRow2HistoricItem).collect(Collectors.toList());
237         } else {
238             logger.debug("query ignored, InfluxDB is not yet connected");
239             return List.of();
240         }
241     }
242
243     private HistoricItem mapRow2HistoricItem(InfluxRow row) {
244         State state = InfluxDBStateConvertUtils.objectToState(row.getValue(), row.getItemName(), itemRegistry);
245         return new InfluxDBHistoricItem(row.getItemName(), state,
246                 ZonedDateTime.ofInstant(row.getTime(), ZoneId.systemDefault()));
247     }
248
249     @Override
250     public List<PersistenceStrategy> getDefaultStrategies() {
251         return List.of(PersistenceStrategy.Globals.RESTORE, PersistenceStrategy.Globals.CHANGE);
252     }
253
254     /**
255      * check connection and try reconnect
256      *
257      * @return true if connected
258      */
259     private boolean checkConnection() {
260         if (influxDBRepository == null) {
261             return false;
262         } else if (influxDBRepository.isConnected()) {
263             return true;
264         } else if (tryReconnection) {
265             logger.debug("Connection lost, trying re-connection");
266             influxDBRepository.connect();
267             return influxDBRepository.isConnected();
268         }
269         return false;
270     }
271 }