]> git.basschouten.com Git - openhab-addons.git/blob
520376be6677fdb1c620fd481e61002f2fd8e5e1
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.persistence.influxdb;
14
15 import java.time.ZoneId;
16 import java.time.ZonedDateTime;
17 import java.util.Collections;
18 import java.util.List;
19 import java.util.Locale;
20 import java.util.Map;
21 import java.util.Set;
22 import java.util.stream.Collectors;
23
24 import org.eclipse.jdt.annotation.NonNullByDefault;
25 import org.eclipse.jdt.annotation.Nullable;
26 import org.openhab.core.config.core.ConfigurableService;
27 import org.openhab.core.items.Item;
28 import org.openhab.core.items.ItemRegistry;
29 import org.openhab.core.items.MetadataRegistry;
30 import org.openhab.core.persistence.FilterCriteria;
31 import org.openhab.core.persistence.HistoricItem;
32 import org.openhab.core.persistence.PersistenceItemInfo;
33 import org.openhab.core.persistence.PersistenceService;
34 import org.openhab.core.persistence.QueryablePersistenceService;
35 import org.openhab.core.persistence.strategy.PersistenceStrategy;
36 import org.openhab.core.types.State;
37 import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
38 import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
39 import org.openhab.persistence.influxdb.internal.InfluxDBHistoricItem;
40 import org.openhab.persistence.influxdb.internal.InfluxDBPersistentItemInfo;
41 import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
42 import org.openhab.persistence.influxdb.internal.InfluxDBStateConvertUtils;
43 import org.openhab.persistence.influxdb.internal.InfluxPoint;
44 import org.openhab.persistence.influxdb.internal.InfluxRow;
45 import org.openhab.persistence.influxdb.internal.ItemToStorePointCreator;
46 import org.openhab.persistence.influxdb.internal.RepositoryFactory;
47 import org.osgi.framework.Constants;
48 import org.osgi.service.component.annotations.Activate;
49 import org.osgi.service.component.annotations.Component;
50 import org.osgi.service.component.annotations.Deactivate;
51 import org.osgi.service.component.annotations.Modified;
52 import org.osgi.service.component.annotations.Reference;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 /**
57  * This is the implementation of the InfluxDB {@link PersistenceService}. It
58  * persists item values using the <a href="http://influxdb.org">InfluxDB time
59  * series database. The states ( {@link State}) of an {@link Item} are persisted
60  * by default in a time series with names equal to the name of the item.
61  *
62  * This addon supports 1.X and 2.X versions, as two versions are incompatible
63  * and use different drivers the specific code for each version is accessed by
64  * {@link InfluxDBRepository} and {@link FilterCriteriaQueryCreator} interfaces
65  * and specific implementation reside in
66  * {@link org.openhab.persistence.influxdb.internal.influx1} and
67  * {@link org.openhab.persistence.influxdb.internal.influx2} packages
68  *
69  * @author Theo Weiss - Initial contribution, rewrite of
70  *         org.openhab.persistence.influxdb
71  * @author Joan Pujol Espinar - Addon rewrite refactoring code and adding
72  *         support for InfluxDB 2.0. Some tag code is based from not integrated
73  *         branch from Dominik Vorreiter
74  */
75 @NonNullByDefault
76 @Component(service = { PersistenceService.class,
77         QueryablePersistenceService.class }, configurationPid = "org.openhab.influxdb", //
78         property = Constants.SERVICE_PID + "=org.openhab.influxdb")
79 @ConfigurableService(category = "persistence", label = "InfluxDB Persistence Service", description_uri = InfluxDBPersistenceService.CONFIG_URI)
80 public class InfluxDBPersistenceService implements QueryablePersistenceService {
81     public static final String SERVICE_NAME = "influxdb";
82
83     private final Logger logger = LoggerFactory.getLogger(InfluxDBPersistenceService.class);
84
85     protected static final String CONFIG_URI = "persistence:influxdb";
86
87     // External dependencies
88     private final ItemRegistry itemRegistry;
89     private final MetadataRegistry metadataRegistry;
90
91     // Internal dependencies/state
92     private InfluxDBConfiguration configuration = InfluxDBConfiguration.NO_CONFIGURATION;
93
94     // Relax rules because can only be null if component is not active
95     private @NonNullByDefault({}) ItemToStorePointCreator itemToStorePointCreator;
96     private @NonNullByDefault({}) InfluxDBRepository influxDBRepository;
97
98     @Activate
99     public InfluxDBPersistenceService(final @Reference ItemRegistry itemRegistry,
100             final @Reference MetadataRegistry metadataRegistry) {
101         this.itemRegistry = itemRegistry;
102         this.metadataRegistry = metadataRegistry;
103     }
104
105     /**
106      * Connect to database when service is activated
107      */
108     @Activate
109     public void activate(final @Nullable Map<String, Object> config) {
110         logger.debug("InfluxDB persistence service is being activated");
111
112         if (loadConfiguration(config)) {
113             itemToStorePointCreator = new ItemToStorePointCreator(configuration, metadataRegistry);
114             influxDBRepository = createInfluxDBRepository();
115             influxDBRepository.connect();
116         } else {
117             logger.error("Cannot load configuration, persistence service wont work");
118         }
119
120         logger.debug("InfluxDB persistence service is now activated");
121     }
122
123     // Visible for testing
124     protected InfluxDBRepository createInfluxDBRepository() {
125         return RepositoryFactory.createRepository(configuration);
126     }
127
128     /**
129      * Disconnect from database when service is deactivated
130      */
131     @Deactivate
132     public void deactivate() {
133         logger.debug("InfluxDB persistence service deactivated");
134         if (influxDBRepository != null) {
135             influxDBRepository.disconnect();
136             influxDBRepository = null;
137         }
138         if (itemToStorePointCreator != null) {
139             itemToStorePointCreator = null;
140         }
141     }
142
143     /**
144      * Rerun deactivation/activation code each time configuration is changed
145      */
146     @Modified
147     protected void modified(@Nullable Map<String, Object> config) {
148         if (config != null) {
149             logger.debug("Config has been modified will deactivate/activate with new config");
150
151             deactivate();
152             activate(config);
153         } else {
154             logger.warn("Null configuration, ignoring");
155         }
156     }
157
158     private boolean loadConfiguration(@Nullable Map<String, Object> config) {
159         boolean configurationIsValid;
160         if (config != null) {
161             configuration = new InfluxDBConfiguration(config);
162             configurationIsValid = configuration.isValid();
163             if (configurationIsValid) {
164                 logger.debug("Loaded configuration {}", config);
165             } else {
166                 logger.warn("Some configuration properties are not valid {}", config);
167             }
168         } else {
169             configuration = InfluxDBConfiguration.NO_CONFIGURATION;
170             configurationIsValid = false;
171             logger.warn("Ignoring configuration because it's null");
172         }
173         return configurationIsValid;
174     }
175
176     @Override
177     public String getId() {
178         return SERVICE_NAME;
179     }
180
181     @Override
182     public String getLabel(@Nullable Locale locale) {
183         return "InfluxDB persistence layer";
184     }
185
186     @Override
187     public Set<PersistenceItemInfo> getItemInfo() {
188         if (influxDBRepository != null && influxDBRepository.isConnected()) {
189             return influxDBRepository.getStoredItemsCount().entrySet().stream()
190                     .map(entry -> new InfluxDBPersistentItemInfo(entry.getKey(), entry.getValue()))
191                     .collect(Collectors.toUnmodifiableSet());
192         } else {
193             logger.info("getItemInfo ignored, InfluxDB is not yet connected");
194             return Collections.emptySet();
195         }
196     }
197
198     @Override
199     public void store(Item item) {
200         store(item, item.getName());
201     }
202
203     @Override
204     public void store(Item item, @Nullable String alias) {
205         if (influxDBRepository != null && influxDBRepository.isConnected()) {
206             InfluxPoint point = itemToStorePointCreator.convert(item, alias);
207             if (point != null) {
208                 logger.trace("Storing item {} in InfluxDB point {}", item, point);
209                 influxDBRepository.write(point);
210             } else {
211                 logger.trace("Ignoring item {} as is cannot be converted to an InfluxDB point", item);
212             }
213         } else {
214             logger.debug("store ignored, InfluxDB is not yet connected");
215         }
216     }
217
218     @Override
219     public Iterable<HistoricItem> query(FilterCriteria filter) {
220         logger.debug("Got a query for historic points!");
221
222         if (influxDBRepository != null && influxDBRepository.isConnected()) {
223             logger.trace(
224                     "Filter: itemname: {}, ordering: {}, state: {},  operator: {}, getBeginDate: {}, getEndDate: {}, getPageSize: {}, getPageNumber: {}",
225                     filter.getItemName(), filter.getOrdering().toString(), filter.getState(), filter.getOperator(),
226                     filter.getBeginDate(), filter.getEndDate(), filter.getPageSize(), filter.getPageNumber());
227
228             String query = RepositoryFactory.createQueryCreator(configuration, metadataRegistry).createQuery(filter,
229                     configuration.getRetentionPolicy());
230             logger.trace("Query {}", query);
231             List<InfluxRow> results = influxDBRepository.query(query);
232             return results.stream().map(this::mapRow2HistoricItem).collect(Collectors.toList());
233         } else {
234             logger.debug("query ignored, InfluxDB is not yet connected");
235             return Collections.emptyList();
236         }
237     }
238
239     private HistoricItem mapRow2HistoricItem(InfluxRow row) {
240         State state = InfluxDBStateConvertUtils.objectToState(row.getValue(), row.getItemName(), itemRegistry);
241         return new InfluxDBHistoricItem(row.getItemName(), state,
242                 ZonedDateTime.ofInstant(row.getTime(), ZoneId.systemDefault()));
243     }
244
245     @Override
246     public List<PersistenceStrategy> getDefaultStrategies() {
247         return List.of(PersistenceStrategy.Globals.RESTORE, PersistenceStrategy.Globals.CHANGE);
248     }
249 }