]> git.basschouten.com Git - openhab-addons.git/blob
6edaff7a859b5a5345ad2d9e17821cb5e3459447
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2021 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.persistence.influxdb;
14
15 import java.time.ZoneId;
16 import java.time.ZonedDateTime;
17 import java.util.Collections;
18 import java.util.List;
19 import java.util.Locale;
20 import java.util.Map;
21 import java.util.Set;
22 import java.util.stream.Collectors;
23
24 import org.eclipse.jdt.annotation.NonNullByDefault;
25 import org.eclipse.jdt.annotation.Nullable;
26 import org.openhab.core.config.core.ConfigurableService;
27 import org.openhab.core.items.Item;
28 import org.openhab.core.items.ItemRegistry;
29 import org.openhab.core.items.MetadataRegistry;
30 import org.openhab.core.persistence.FilterCriteria;
31 import org.openhab.core.persistence.HistoricItem;
32 import org.openhab.core.persistence.PersistenceItemInfo;
33 import org.openhab.core.persistence.PersistenceService;
34 import org.openhab.core.persistence.QueryablePersistenceService;
35 import org.openhab.core.persistence.strategy.PersistenceStrategy;
36 import org.openhab.core.types.State;
37 import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
38 import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
39 import org.openhab.persistence.influxdb.internal.InfluxDBHistoricItem;
40 import org.openhab.persistence.influxdb.internal.InfluxDBPersistentItemInfo;
41 import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
42 import org.openhab.persistence.influxdb.internal.InfluxDBStateConvertUtils;
43 import org.openhab.persistence.influxdb.internal.InfluxPoint;
44 import org.openhab.persistence.influxdb.internal.InfluxRow;
45 import org.openhab.persistence.influxdb.internal.ItemToStorePointCreator;
46 import org.openhab.persistence.influxdb.internal.RepositoryFactory;
47 import org.osgi.framework.Constants;
48 import org.osgi.service.component.annotations.Activate;
49 import org.osgi.service.component.annotations.Component;
50 import org.osgi.service.component.annotations.Deactivate;
51 import org.osgi.service.component.annotations.Modified;
52 import org.osgi.service.component.annotations.Reference;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 /**
57  * This is the implementation of the InfluxDB {@link PersistenceService}. It persists item values
58  * using the <a href="http://influxdb.org">InfluxDB time series database. The states (
59  * {@link State}) of an {@link Item} are persisted by default in a time series with names equal to the name of
60  * the item.
61  *
62  * This addon supports 1.X and 2.X versions, as two versions are incompatible and use different drivers the
63  * specific code for each version is accessed by {@link InfluxDBRepository} and {@link FilterCriteriaQueryCreator}
64  * interfaces and specific implementation reside in {@link org.openhab.persistence.influxdb.internal.influx1} and
65  * {@link org.openhab.persistence.influxdb.internal.influx2} packages
66  *
67  * @author Theo Weiss - Initial contribution, rewrite of org.openhab.persistence.influxdb
68  * @author Joan Pujol Espinar - Addon rewrite refactoring code and adding support for InfluxDB 2.0. Some tag code is
69  *         based
70  *         from not integrated branch from Dominik Vorreiter
71  */
72 @NonNullByDefault
73 @Component(service = { PersistenceService.class,
74         QueryablePersistenceService.class }, configurationPid = "org.openhab.influxdb", //
75         property = Constants.SERVICE_PID + "=org.openhab.influxdb")
76 @ConfigurableService(category = "persistence", label = "InfluxDB Persistence Service", description_uri = InfluxDBPersistenceService.CONFIG_URI)
77 public class InfluxDBPersistenceService implements QueryablePersistenceService {
78     public static final String SERVICE_NAME = "influxdb";
79
80     private final Logger logger = LoggerFactory.getLogger(InfluxDBPersistenceService.class);
81
82     protected static final String CONFIG_URI = "persistence:influxdb";
83
84     // External dependencies
85     private final ItemRegistry itemRegistry;
86     private final MetadataRegistry metadataRegistry;
87
88     // Internal dependencies/state
89     private InfluxDBConfiguration configuration = InfluxDBConfiguration.NO_CONFIGURATION;
90
91     // Relax rules because can only be null if component is not active
92     private @NonNullByDefault({}) ItemToStorePointCreator itemToStorePointCreator;
93     private @NonNullByDefault({}) InfluxDBRepository influxDBRepository;
94
95     @Activate
96     public InfluxDBPersistenceService(final @Reference ItemRegistry itemRegistry,
97             final @Reference MetadataRegistry metadataRegistry) {
98         this.itemRegistry = itemRegistry;
99         this.metadataRegistry = metadataRegistry;
100     }
101
102     /**
103      * Connect to database when service is activated
104      */
105     @Activate
106     public void activate(final @Nullable Map<String, Object> config) {
107         logger.debug("InfluxDB persistence service is being activated");
108
109         if (loadConfiguration(config)) {
110             itemToStorePointCreator = new ItemToStorePointCreator(configuration, metadataRegistry);
111             influxDBRepository = createInfluxDBRepository();
112             influxDBRepository.connect();
113         } else {
114             logger.error("Cannot load configuration, persistence service wont work");
115         }
116
117         logger.debug("InfluxDB persistence service is now activated");
118     }
119
120     // Visible for testing
121     protected InfluxDBRepository createInfluxDBRepository() {
122         return RepositoryFactory.createRepository(configuration);
123     }
124
125     /**
126      * Disconnect from database when service is deactivated
127      */
128     @Deactivate
129     public void deactivate() {
130         logger.debug("InfluxDB persistence service deactivated");
131         if (influxDBRepository != null) {
132             influxDBRepository.disconnect();
133             influxDBRepository = null;
134         }
135         if (itemToStorePointCreator != null) {
136             itemToStorePointCreator = null;
137         }
138     }
139
140     /**
141      * Rerun deactivation/activation code each time configuration is changed
142      */
143     @Modified
144     protected void modified(@Nullable Map<String, Object> config) {
145         if (config != null) {
146             logger.debug("Config has been modified will deactivate/activate with new config");
147
148             deactivate();
149             activate(config);
150         } else {
151             logger.warn("Null configuration, ignoring");
152         }
153     }
154
155     private boolean loadConfiguration(@Nullable Map<String, Object> config) {
156         boolean configurationIsValid;
157         if (config != null) {
158             configuration = new InfluxDBConfiguration(config);
159             configurationIsValid = configuration.isValid();
160             if (configurationIsValid) {
161                 logger.debug("Loaded configuration {}", config);
162             } else {
163                 logger.warn("Some configuration properties are not valid {}", config);
164             }
165         } else {
166             configuration = InfluxDBConfiguration.NO_CONFIGURATION;
167             configurationIsValid = false;
168             logger.warn("Ignoring configuration because it's null");
169         }
170         return configurationIsValid;
171     }
172
173     @Override
174     public String getId() {
175         return SERVICE_NAME;
176     }
177
178     @Override
179     public String getLabel(@Nullable Locale locale) {
180         return "InfluxDB persistence layer";
181     }
182
183     @Override
184     public Set<PersistenceItemInfo> getItemInfo() {
185         if (influxDBRepository != null && influxDBRepository.isConnected()) {
186             return influxDBRepository.getStoredItemsCount().entrySet().stream()
187                     .map(entry -> new InfluxDBPersistentItemInfo(entry.getKey(), entry.getValue()))
188                     .collect(Collectors.toUnmodifiableSet());
189         } else {
190             logger.info("getItemInfo ignored, InfluxDB is not yet connected");
191             return Collections.emptySet();
192         }
193     }
194
195     @Override
196     public void store(Item item) {
197         store(item, item.getName());
198     }
199
200     @Override
201     public void store(Item item, @Nullable String alias) {
202         if (influxDBRepository != null && influxDBRepository.isConnected()) {
203             InfluxPoint point = itemToStorePointCreator.convert(item, alias);
204             if (point != null) {
205                 logger.trace("Storing item {} in InfluxDB point {}", item, point);
206                 influxDBRepository.write(point);
207             } else {
208                 logger.trace("Ignoring item {} as is cannot be converted to a InfluxDB point", item);
209             }
210         } else {
211             logger.debug("store ignored, InfluxDB is not yet connected");
212         }
213     }
214
215     @Override
216     public Iterable<HistoricItem> query(FilterCriteria filter) {
217         logger.debug("Got a query for historic points!");
218
219         if (influxDBRepository != null && influxDBRepository.isConnected()) {
220             logger.trace(
221                     "Filter: itemname: {}, ordering: {}, state: {},  operator: {}, getBeginDate: {}, getEndDate: {}, getPageSize: {}, getPageNumber: {}",
222                     filter.getItemName(), filter.getOrdering().toString(), filter.getState(), filter.getOperator(),
223                     filter.getBeginDate(), filter.getEndDate(), filter.getPageSize(), filter.getPageNumber());
224
225             String query = RepositoryFactory.createQueryCreator(configuration).createQuery(filter,
226                     configuration.getRetentionPolicy());
227             logger.trace("Query {}", query);
228             List<InfluxRow> results = influxDBRepository.query(query);
229             return results.stream().map(this::mapRow2HistoricItem).collect(Collectors.toList());
230         } else {
231             logger.debug("query ignored, InfluxDB is not yet connected");
232             return Collections.emptyList();
233         }
234     }
235
236     private HistoricItem mapRow2HistoricItem(InfluxRow row) {
237         State state = InfluxDBStateConvertUtils.objectToState(row.getValue(), row.getItemName(), itemRegistry);
238         return new InfluxDBHistoricItem(row.getItemName(), state,
239                 ZonedDateTime.ofInstant(row.getTime(), ZoneId.systemDefault()));
240     }
241
242     @Override
243     public List<PersistenceStrategy> getDefaultStrategies() {
244         return List.of(PersistenceStrategy.Globals.RESTORE, PersistenceStrategy.Globals.CHANGE);
245     }
246 }