2 * Copyright (c) 2010-2020 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.persistence.rrd4j.internal;
16 import java.io.IOException;
17 import java.time.Instant;
18 import java.time.ZoneId;
19 import java.time.ZonedDateTime;
20 import java.util.ArrayList;
21 import java.util.Collections;
22 import java.util.HashMap;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Locale;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.RejectedExecutionException;
31 import java.util.concurrent.ScheduledExecutorService;
32 import java.util.concurrent.ScheduledFuture;
33 import java.util.concurrent.TimeUnit;
35 import javax.measure.Quantity;
36 import javax.measure.Unit;
38 import org.eclipse.jdt.annotation.NonNullByDefault;
39 import org.eclipse.jdt.annotation.Nullable;
40 import org.openhab.core.OpenHAB;
41 import org.openhab.core.common.NamedThreadFactory;
42 import org.openhab.core.items.Item;
43 import org.openhab.core.items.ItemNotFoundException;
44 import org.openhab.core.items.ItemRegistry;
45 import org.openhab.core.items.ItemUtil;
46 import org.openhab.core.library.CoreItemFactory;
47 import org.openhab.core.library.items.ContactItem;
48 import org.openhab.core.library.items.DimmerItem;
49 import org.openhab.core.library.items.NumberItem;
50 import org.openhab.core.library.items.RollershutterItem;
51 import org.openhab.core.library.items.SwitchItem;
52 import org.openhab.core.library.types.DecimalType;
53 import org.openhab.core.library.types.OnOffType;
54 import org.openhab.core.library.types.OpenClosedType;
55 import org.openhab.core.library.types.PercentType;
56 import org.openhab.core.library.types.QuantityType;
57 import org.openhab.core.persistence.FilterCriteria;
58 import org.openhab.core.persistence.FilterCriteria.Ordering;
59 import org.openhab.core.persistence.HistoricItem;
60 import org.openhab.core.persistence.PersistenceItemInfo;
61 import org.openhab.core.persistence.PersistenceService;
62 import org.openhab.core.persistence.QueryablePersistenceService;
63 import org.openhab.core.persistence.strategy.PersistenceCronStrategy;
64 import org.openhab.core.persistence.strategy.PersistenceStrategy;
65 import org.openhab.core.types.State;
66 import org.osgi.service.component.annotations.Activate;
67 import org.osgi.service.component.annotations.Component;
68 import org.osgi.service.component.annotations.ConfigurationPolicy;
69 import org.osgi.service.component.annotations.Modified;
70 import org.osgi.service.component.annotations.Reference;
71 import org.rrd4j.ConsolFun;
72 import org.rrd4j.DsType;
73 import org.rrd4j.core.FetchData;
74 import org.rrd4j.core.FetchRequest;
75 import org.rrd4j.core.RrdDb;
76 import org.rrd4j.core.RrdDef;
77 import org.rrd4j.core.Sample;
78 import org.slf4j.Logger;
79 import org.slf4j.LoggerFactory;
82 * This is the implementation of the RRD4j {@link PersistenceService}. To learn
83 * more about RRD4j please visit their
84 * <a href="https://github.com/rrd4j/rrd4j">website</a>.
86 * @author Kai Kreuzer - Initial contribution
87 * @author Jan N. Klug - some improvements
88 * @author Karel Goderis - remove TimerThread dependency
91 @Component(service = { PersistenceService.class,
92 QueryablePersistenceService.class }, configurationPid = "org.openhab.rrd4j", configurationPolicy = ConfigurationPolicy.OPTIONAL)
93 public class RRD4jPersistenceService implements QueryablePersistenceService {
95 private static final String DEFAULT_OTHER = "default_other";
96 private static final String DEFAULT_NUMERIC = "default_numeric";
97 private static final String DEFAULT_QUANTIFIABLE = "default_quantifiable";
99 private static final Set<String> SUPPORTED_TYPES = Set.of(CoreItemFactory.SWITCH, CoreItemFactory.CONTACT,
100 CoreItemFactory.DIMMER, CoreItemFactory.NUMBER, CoreItemFactory.ROLLERSHUTTER);
102 private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3,
103 new NamedThreadFactory("RRD4j"));
105 private final Map<String, @Nullable RrdDefConfig> rrdDefs = new ConcurrentHashMap<>();
107 private static final String DATASOURCE_STATE = "state";
109 public static final String DB_FOLDER = getUserPersistenceDataFolder() + File.separator + "rrd4j";
111 private final Logger logger = LoggerFactory.getLogger(RRD4jPersistenceService.class);
113 private final Map<String, @Nullable ScheduledFuture<?>> scheduledJobs = new HashMap<>();
115 protected final ItemRegistry itemRegistry;
118 public RRD4jPersistenceService(final @Reference ItemRegistry itemRegistry) {
119 this.itemRegistry = itemRegistry;
123 public String getId() {
128 public String getLabel(@Nullable Locale locale) {
133 public synchronized void store(final Item item, @Nullable final String alias) {
134 if (!isSupportedItemType(item)) {
135 logger.trace("Ignoring item '{}' since its type {} is not supported", item.getName(), item.getType());
138 final String name = alias == null ? item.getName() : alias;
139 RrdDb db = getDB(name);
141 ConsolFun function = getConsolidationFunction(db);
142 long now = System.currentTimeMillis() / 1000;
143 if (function != ConsolFun.AVERAGE) {
145 // we store the last value again, so that the value change
146 // in the database is not interpolated, but
147 // happens right at this spot
148 if (now - 1 > db.getLastUpdateTime()) {
149 // only do it if there is not already a value
150 double lastValue = db.getLastDatasourceValue(DATASOURCE_STATE);
151 if (!Double.isNaN(lastValue)) {
152 Sample sample = db.createSample();
153 sample.setTime(now - 1);
154 sample.setValue(DATASOURCE_STATE, lastValue);
156 logger.debug("Stored '{}' with state '{}' in rrd4j database (again)", name,
157 mapToState(lastValue, item.getName()));
160 } catch (IOException e) {
161 logger.debug("Error storing last value (again): {}", e.getMessage());
165 Sample sample = db.createSample();
170 if (item instanceof NumberItem && item.getState() instanceof QuantityType) {
171 NumberItem nItem = (NumberItem) item;
172 QuantityType<?> qState = (QuantityType<?>) item.getState();
173 Unit<? extends Quantity<?>> unit = nItem.getUnit();
175 QuantityType<?> convertedState = qState.toUnit(unit);
176 if (convertedState != null) {
177 value = convertedState.doubleValue();
180 "Failed to convert state '{}' to unit '{}'. Please check your item definition for correctness.",
184 value = qState.doubleValue();
187 DecimalType state = item.getStateAs(DecimalType.class);
189 value = state.toBigDecimal().doubleValue();
193 if (db.getDatasource(DATASOURCE_STATE).getType() == DsType.COUNTER) { // counter values must be
194 // adjusted by stepsize
195 value = value * db.getRrdDef().getStep();
197 sample.setValue(DATASOURCE_STATE, value);
199 logger.debug("Stored '{}' with state '{}' in rrd4j database", name, value);
201 } catch (IllegalArgumentException e) {
202 if (e.getMessage().contains("at least one second step is required")) {
203 // we try to store the value one second later
204 ScheduledFuture<?> job = scheduledJobs.get(name);
207 scheduledJobs.remove(name);
209 job = scheduler.schedule(() -> store(item, name), 1, TimeUnit.SECONDS);
210 scheduledJobs.put(name, job);
212 logger.warn("Could not persist '{}' to rrd4j database: {}", name, e.getMessage());
214 } catch (Exception e) {
215 logger.warn("Could not persist '{}' to rrd4j database: {}", name, e.getMessage());
219 } catch (IOException e) {
220 logger.debug("Error closing rrd4j database: {}", e.getMessage());
226 public void store(Item item) {
231 public Iterable<HistoricItem> query(FilterCriteria filter) {
232 String itemName = filter.getItemName();
233 RrdDb db = getDB(itemName);
235 ConsolFun consolidationFunction = getConsolidationFunction(db);
237 long end = filter.getEndDate() == null ? System.currentTimeMillis() / 1000
238 : filter.getEndDate().toInstant().getEpochSecond();
241 if (filter.getBeginDate() == null) {
242 // as rrd goes back for years and gets more and more
243 // inaccurate, we only support descending order
244 // and a single return value
245 // if there is no begin date is given - this case is
246 // required specifically for the historicState()
247 // query, which we want to support
248 if (filter.getOrdering() == Ordering.DESCENDING && filter.getPageSize() == 1
249 && filter.getPageNumber() == 0) {
250 if (filter.getEndDate() == null) {
251 // we are asked only for the most recent value!
252 double lastValue = db.getLastDatasourceValue(DATASOURCE_STATE);
253 if (!Double.isNaN(lastValue)) {
254 HistoricItem rrd4jItem = new RRD4jItem(itemName, mapToState(lastValue, itemName),
255 ZonedDateTime.ofInstant(
256 Instant.ofEpochMilli(db.getLastArchiveUpdateTime() * 1000),
257 ZoneId.systemDefault()));
258 return Collections.singletonList(rrd4jItem);
260 return Collections.emptyList();
266 throw new UnsupportedOperationException("rrd4j does not allow querys without a begin date, "
267 + "unless order is descending and a single value is requested");
270 start = filter.getBeginDate().toInstant().getEpochSecond();
272 FetchRequest request = db.createFetchRequest(consolidationFunction, start, end, 1);
274 List<HistoricItem> items = new ArrayList<>();
275 FetchData result = request.fetchData();
276 long ts = result.getFirstTimestamp();
277 long step = result.getRowCount() > 1 ? result.getStep() : 0;
278 for (double value : result.getValues(DATASOURCE_STATE)) {
279 if (!Double.isNaN(value) && (((ts >= start) && (ts <= end)) || (start == end))) {
280 RRD4jItem rrd4jItem = new RRD4jItem(itemName, mapToState(value, itemName),
281 ZonedDateTime.ofInstant(Instant.ofEpochMilli(ts * 1000), ZoneId.systemDefault()));
282 items.add(rrd4jItem);
287 } catch (IOException e) {
288 logger.warn("Could not query rrd4j database for item '{}': {}", itemName, e.getMessage());
291 return Collections.emptyList();
295 public Set<PersistenceItemInfo> getItemInfo() {
296 return Collections.emptySet();
299 protected @Nullable synchronized RrdDb getDB(String alias) {
301 File file = new File(DB_FOLDER + File.separator + alias + ".rrd");
304 // recreate the RrdDb instance from the file
305 db = new RrdDb(file.getAbsolutePath());
307 File folder = new File(DB_FOLDER);
308 if (!folder.exists()) {
311 RrdDef rrdDef = getRrdDef(alias, file);
312 if (rrdDef != null) {
313 // create a new database file
314 db = new RrdDb(rrdDef);
317 "Did not create rrd4j database for item '{}' since no rrd definition could be determined. This is likely due to an unsupported item type.",
321 } catch (IOException e) {
322 logger.error("Could not create rrd4j database file '{}': {}", file.getAbsolutePath(), e.getMessage());
323 } catch (RejectedExecutionException e) {
324 // this happens if the system is shut down
325 logger.debug("Could not create rrd4j database file '{}': {}", file.getAbsolutePath(), e.getMessage());
330 private @Nullable RrdDefConfig getRrdDefConfig(String itemName) {
331 RrdDefConfig useRdc = null;
332 for (Map.Entry<String, @Nullable RrdDefConfig> e : rrdDefs.entrySet()) {
333 // try to find special config
334 RrdDefConfig rdc = e.getValue();
335 if (rdc != null && rdc.appliesTo(itemName)) {
340 if (useRdc == null) { // not defined, use defaults
342 Item item = itemRegistry.getItem(itemName);
343 if (!isSupportedItemType(item)) {
346 if (item instanceof NumberItem) {
347 NumberItem numberItem = (NumberItem) item;
348 useRdc = numberItem.getDimension() != null ? rrdDefs.get(DEFAULT_QUANTIFIABLE)
349 : rrdDefs.get(DEFAULT_NUMERIC);
351 useRdc = rrdDefs.get(DEFAULT_OTHER);
353 } catch (ItemNotFoundException e) {
354 logger.debug("Could not find item '{}' in registry", itemName);
358 logger.trace("Using rrd definition '{}' for item '{}'.", useRdc, itemName);
362 private @Nullable RrdDef getRrdDef(String itemName, File file) {
363 RrdDef rrdDef = new RrdDef(file.getAbsolutePath());
364 RrdDefConfig useRdc = getRrdDefConfig(itemName);
365 if (useRdc != null) {
366 rrdDef.setStep(useRdc.step);
367 rrdDef.setStartTime(System.currentTimeMillis() / 1000 - 1);
368 rrdDef.addDatasource(DATASOURCE_STATE, useRdc.dsType, useRdc.heartbeat, useRdc.min, useRdc.max);
369 for (RrdArchiveDef rad : useRdc.archives) {
370 rrdDef.addArchive(rad.fcn, rad.xff, rad.steps, rad.rows);
378 public ConsolFun getConsolidationFunction(RrdDb db) {
380 return db.getRrdDef().getArcDefs()[0].getConsolFun();
381 } catch (IOException e) {
382 return ConsolFun.MAX;
386 @SuppressWarnings({ "rawtypes", "unchecked" })
387 private State mapToState(double value, String itemName) {
389 Item item = itemRegistry.getItem(itemName);
390 if (item instanceof SwitchItem && !(item instanceof DimmerItem)) {
391 return value == 0.0d ? OnOffType.OFF : OnOffType.ON;
392 } else if (item instanceof ContactItem) {
393 return value == 0.0d ? OpenClosedType.CLOSED : OpenClosedType.OPEN;
394 } else if (item instanceof DimmerItem || item instanceof RollershutterItem) {
395 // make sure Items that need PercentTypes instead of DecimalTypes do receive the right information
396 return new PercentType((int) Math.round(value * 100));
397 } else if (item instanceof NumberItem) {
398 Unit<? extends Quantity<?>> unit = ((NumberItem) item).getUnit();
400 return new QuantityType(value, unit);
402 return new DecimalType(value);
405 } catch (ItemNotFoundException e) {
406 logger.debug("Could not find item '{}' in registry", itemName);
408 // just return a DecimalType as a fallback
409 return new DecimalType(value);
412 private boolean isSupportedItemType(Item item) {
413 return SUPPORTED_TYPES.contains(ItemUtil.getMainItemType(item.getType()));
416 private static String getUserPersistenceDataFolder() {
417 return OpenHAB.getUserDataFolder() + File.separator + "persistence";
421 protected void activate(final Map<String, Object> config) {
426 protected void modified(final Map<String, Object> config) {
427 // clean existing definitions
430 // add default configurations
432 RrdDefConfig defaultNumeric = new RrdDefConfig(DEFAULT_NUMERIC);
433 // use 10 seconds as a step size for numeric values and allow a 10 minute silence between updates
434 defaultNumeric.setDef("GAUGE,600,U,U,10");
435 // define 5 different boxes:
436 // 1. granularity of 10s for the last hour
437 // 2. granularity of 1m for the last week
438 // 3. granularity of 15m for the last year
439 // 4. granularity of 1h for the last 5 years
440 // 5. granularity of 1d for the last 10 years
442 .addArchives("LAST,0.5,1,360:LAST,0.5,6,10080:LAST,0.5,90,36500:LAST,0.5,360,43800:LAST,0.5,8640,3650");
443 rrdDefs.put(DEFAULT_NUMERIC, defaultNumeric);
445 RrdDefConfig defaultQuantifiable = new RrdDefConfig(DEFAULT_QUANTIFIABLE);
446 // use 10 seconds as a step size for numeric values and allow a 10 minute silence between updates
447 defaultQuantifiable.setDef("GAUGE,600,U,U,10");
448 // define 5 different boxes:
449 // 1. granularity of 10s for the last hour
450 // 2. granularity of 1m for the last week
451 // 3. granularity of 15m for the last year
452 // 4. granularity of 1h for the last 5 years
453 // 5. granularity of 1d for the last 10 years
454 defaultQuantifiable.addArchives(
455 "AVERAGE,0.5,1,360:AVERAGE,0.5,6,10080:AVERAGE,0.5,90,36500:AVERAGE,0.5,360,43800:AVERAGE,0.5,8640,3650");
456 rrdDefs.put(DEFAULT_QUANTIFIABLE, defaultQuantifiable);
458 RrdDefConfig defaultOther = new RrdDefConfig(DEFAULT_OTHER);
459 // use 5 seconds as a step size for discrete values and allow a 1h silence between updates
460 defaultOther.setDef("GAUGE,3600,U,U,5");
461 // define 4 different boxes:
462 // 1. granularity of 5s for the last hour
463 // 2. granularity of 1m for the last week
464 // 3. granularity of 15m for the last year
465 // 4. granularity of 4h for the last 10 years
466 defaultOther.addArchives("LAST,0.5,1,720:LAST,0.5,12,10080:LAST,0.5,180,35040:LAST,0.5,2880,21900");
467 rrdDefs.put(DEFAULT_OTHER, defaultOther);
469 if (config.isEmpty()) {
470 logger.debug("using default configuration only");
474 Iterator<String> keys = config.keySet().iterator();
475 while (keys.hasNext()) {
476 String key = keys.next();
478 if (key.equals("service.pid") || key.equals("component.name")) {
479 // ignore service.pid and name
483 String[] subkeys = key.split("\\.");
484 if (subkeys.length != 2) {
485 logger.debug("config '{}' should have the format 'name.configkey'", key);
489 Object v = config.get(key);
490 if (v instanceof String) {
491 String value = (String) v;
492 String name = subkeys[0].toLowerCase();
493 String property = subkeys[1].toLowerCase();
495 if (value.isBlank()) {
496 logger.trace("Config is empty: {}", property);
499 logger.trace("Processing config: {} = {}", property, value);
502 RrdDefConfig rrdDef = rrdDefs.get(name);
503 if (rrdDef == null) {
504 rrdDef = new RrdDefConfig(name);
505 rrdDefs.put(name, rrdDef);
509 if (property.equals("def")) {
510 rrdDef.setDef(value);
511 } else if (property.equals("archives")) {
512 rrdDef.addArchives(value);
513 } else if (property.equals("items")) {
514 rrdDef.addItems(value);
516 logger.debug("Unknown property {} : {}", property, value);
518 } catch (IllegalArgumentException e) {
519 logger.warn("Ignoring illegal configuration: {}", e.getMessage());
524 for (RrdDefConfig rrdDef : rrdDefs.values()) {
525 if (rrdDef != null) {
526 if (rrdDef.isValid()) {
527 logger.debug("Created {}", rrdDef);
529 logger.info("Removing invalid definition {}", rrdDef);
530 rrdDefs.remove(rrdDef.name);
536 private class RrdArchiveDef {
537 public @Nullable ConsolFun fcn;
539 public int steps, rows;
542 public String toString() {
543 StringBuilder sb = new StringBuilder(" " + fcn);
544 sb.append(" xff = ").append(xff);
545 sb.append(" steps = ").append(steps);
546 sb.append(" rows = ").append(rows);
547 return sb.toString();
551 private class RrdDefConfig {
553 public @Nullable DsType dsType;
554 public int heartbeat, step;
555 public double min, max;
556 public List<RrdArchiveDef> archives;
557 public List<String> itemNames;
559 private boolean isInitialized;
561 public RrdDefConfig(String name) {
563 archives = new ArrayList<>();
564 itemNames = new ArrayList<>();
565 isInitialized = false;
568 public void setDef(String defString) {
569 String[] opts = defString.split(",");
570 if (opts.length != 5) { // check if correct number of parameters
571 logger.warn("invalid number of parameters {}: {}", name, defString);
575 if (opts[0].equals("ABSOLUTE")) { // dsType
576 dsType = DsType.ABSOLUTE;
577 } else if (opts[0].equals("COUNTER")) {
578 dsType = DsType.COUNTER;
579 } else if (opts[0].equals("DERIVE")) {
580 dsType = DsType.DERIVE;
581 } else if (opts[0].equals("GAUGE")) {
582 dsType = DsType.GAUGE;
584 logger.warn("{}: dsType {} not supported", name, opts[0]);
587 heartbeat = Integer.parseInt(opts[1]);
589 if (opts[2].equals("U")) {
592 min = Double.parseDouble(opts[2]);
595 if (opts[3].equals("U")) {
598 max = Double.parseDouble(opts[3]);
601 step = Integer.parseInt(opts[4]);
603 isInitialized = true; // successfully initialized
608 public void addArchives(String archivesString) {
609 String splitArchives[] = archivesString.split(":");
610 for (String archiveString : splitArchives) {
611 String[] opts = archiveString.split(",");
612 if (opts.length != 4) { // check if correct number of parameters
613 logger.warn("invalid number of parameters {}: {}", name, archiveString);
616 RrdArchiveDef arc = new RrdArchiveDef();
618 if (opts[0].equals("AVERAGE")) {
619 arc.fcn = ConsolFun.AVERAGE;
620 } else if (opts[0].equals("MIN")) {
621 arc.fcn = ConsolFun.MIN;
622 } else if (opts[0].equals("MAX")) {
623 arc.fcn = ConsolFun.MAX;
624 } else if (opts[0].equals("LAST")) {
625 arc.fcn = ConsolFun.LAST;
626 } else if (opts[0].equals("FIRST")) {
627 arc.fcn = ConsolFun.FIRST;
628 } else if (opts[0].equals("TOTAL")) {
629 arc.fcn = ConsolFun.TOTAL;
631 logger.warn("{}: consolidation function {} not supported", name, opts[0]);
633 arc.xff = Double.parseDouble(opts[1]);
634 arc.steps = Integer.parseInt(opts[2]);
635 arc.rows = Integer.parseInt(opts[3]);
640 public void addItems(String itemsString) {
641 String splitItems[] = itemsString.split(",");
642 for (String item : splitItems) {
647 public boolean appliesTo(String item) {
648 return itemNames.contains(item);
651 public boolean isValid() { // a valid configuration must be initialized
652 // and contain at least one function
653 return (isInitialized && (archives.size() > 0));
657 public String toString() {
658 StringBuilder sb = new StringBuilder(name);
659 sb.append(" = ").append(dsType);
660 sb.append(" heartbeat = ").append(heartbeat);
661 sb.append(" min/max = ").append(min).append("/").append(max);
662 sb.append(" step = ").append(step);
663 sb.append(" ").append(archives.size()).append(" archives(s) = [");
664 for (RrdArchiveDef arc : archives) {
665 sb.append(arc.toString());
668 sb.append(itemNames.size()).append(" items(s) = [");
669 for (String item : itemNames) {
670 sb.append(item).append(" ");
673 return sb.toString();
678 public List<PersistenceStrategy> getDefaultStrategies() {
679 return List.of(PersistenceStrategy.Globals.RESTORE, PersistenceStrategy.Globals.CHANGE,
680 new PersistenceCronStrategy("everyMinute", "0 * * * * ?"));