2 * Copyright (c) 2010-2022 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.persistence.rrd4j.internal;
16 import java.io.IOException;
17 import java.time.Instant;
18 import java.time.ZoneId;
19 import java.time.ZonedDateTime;
20 import java.util.ArrayList;
21 import java.util.HashMap;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.Locale;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.RejectedExecutionException;
30 import java.util.concurrent.ScheduledExecutorService;
31 import java.util.concurrent.ScheduledFuture;
32 import java.util.concurrent.TimeUnit;
34 import javax.measure.Quantity;
35 import javax.measure.Unit;
37 import org.eclipse.jdt.annotation.NonNullByDefault;
38 import org.eclipse.jdt.annotation.Nullable;
39 import org.openhab.core.OpenHAB;
40 import org.openhab.core.common.NamedThreadFactory;
41 import org.openhab.core.items.GroupItem;
42 import org.openhab.core.items.Item;
43 import org.openhab.core.items.ItemNotFoundException;
44 import org.openhab.core.items.ItemRegistry;
45 import org.openhab.core.items.ItemUtil;
46 import org.openhab.core.library.CoreItemFactory;
47 import org.openhab.core.library.items.ColorItem;
48 import org.openhab.core.library.items.ContactItem;
49 import org.openhab.core.library.items.DimmerItem;
50 import org.openhab.core.library.items.NumberItem;
51 import org.openhab.core.library.items.RollershutterItem;
52 import org.openhab.core.library.items.SwitchItem;
53 import org.openhab.core.library.types.DecimalType;
54 import org.openhab.core.library.types.OnOffType;
55 import org.openhab.core.library.types.OpenClosedType;
56 import org.openhab.core.library.types.PercentType;
57 import org.openhab.core.library.types.QuantityType;
58 import org.openhab.core.persistence.FilterCriteria;
59 import org.openhab.core.persistence.FilterCriteria.Ordering;
60 import org.openhab.core.persistence.HistoricItem;
61 import org.openhab.core.persistence.PersistenceItemInfo;
62 import org.openhab.core.persistence.PersistenceService;
63 import org.openhab.core.persistence.QueryablePersistenceService;
64 import org.openhab.core.persistence.strategy.PersistenceCronStrategy;
65 import org.openhab.core.persistence.strategy.PersistenceStrategy;
66 import org.openhab.core.types.State;
67 import org.osgi.service.component.annotations.Activate;
68 import org.osgi.service.component.annotations.Component;
69 import org.osgi.service.component.annotations.ConfigurationPolicy;
70 import org.osgi.service.component.annotations.Modified;
71 import org.osgi.service.component.annotations.Reference;
72 import org.rrd4j.ConsolFun;
73 import org.rrd4j.DsType;
74 import org.rrd4j.core.FetchData;
75 import org.rrd4j.core.FetchRequest;
76 import org.rrd4j.core.RrdDb;
77 import org.rrd4j.core.RrdDef;
78 import org.rrd4j.core.Sample;
79 import org.slf4j.Logger;
80 import org.slf4j.LoggerFactory;
83 * This is the implementation of the RRD4j {@link PersistenceService}. To learn
84 * more about RRD4j please visit their
85 * <a href="https://github.com/rrd4j/rrd4j">website</a>.
87 * @author Kai Kreuzer - Initial contribution
88 * @author Jan N. Klug - some improvements
89 * @author Karel Goderis - remove TimerThread dependency
92 @Component(service = { PersistenceService.class,
93 QueryablePersistenceService.class }, configurationPid = "org.openhab.rrd4j", configurationPolicy = ConfigurationPolicy.OPTIONAL)
94 public class RRD4jPersistenceService implements QueryablePersistenceService {
96 private static final String DEFAULT_OTHER = "default_other";
97 private static final String DEFAULT_NUMERIC = "default_numeric";
98 private static final String DEFAULT_QUANTIFIABLE = "default_quantifiable";
100 private static final Set<String> SUPPORTED_TYPES = Set.of(CoreItemFactory.SWITCH, CoreItemFactory.CONTACT,
101 CoreItemFactory.DIMMER, CoreItemFactory.NUMBER, CoreItemFactory.ROLLERSHUTTER, CoreItemFactory.COLOR);
103 private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3,
104 new NamedThreadFactory("RRD4j"));
106 private final Map<String, RrdDefConfig> rrdDefs = new ConcurrentHashMap<>();
108 private static final String DATASOURCE_STATE = "state";
110 public static final String DB_FOLDER = getUserPersistenceDataFolder() + File.separator + "rrd4j";
112 private final Logger logger = LoggerFactory.getLogger(RRD4jPersistenceService.class);
114 private final Map<String, ScheduledFuture<?>> scheduledJobs = new HashMap<>();
116 protected final ItemRegistry itemRegistry;
119 public RRD4jPersistenceService(final @Reference ItemRegistry itemRegistry) {
120 this.itemRegistry = itemRegistry;
124 public String getId() {
129 public String getLabel(@Nullable Locale locale) {
134 public synchronized void store(final Item item, @Nullable final String alias) {
135 if (!isSupportedItemType(item)) {
136 logger.trace("Ignoring item '{}' since its type {} is not supported", item.getName(), item.getType());
139 final String name = alias == null ? item.getName() : alias;
140 RrdDb db = getDB(name);
142 ConsolFun function = getConsolidationFunction(db);
143 long now = System.currentTimeMillis() / 1000;
144 if (function != ConsolFun.AVERAGE) {
146 // we store the last value again, so that the value change
147 // in the database is not interpolated, but
148 // happens right at this spot
149 if (now - 1 > db.getLastUpdateTime()) {
150 // only do it if there is not already a value
151 double lastValue = db.getLastDatasourceValue(DATASOURCE_STATE);
152 if (!Double.isNaN(lastValue)) {
153 Sample sample = db.createSample();
154 sample.setTime(now - 1);
155 sample.setValue(DATASOURCE_STATE, lastValue);
157 logger.debug("Stored '{}' as value '{}' in rrd4j database (again)", name, lastValue);
160 } catch (IOException e) {
161 logger.debug("Error storing last value (again): {}", e.getMessage());
165 Sample sample = db.createSample();
170 if (item instanceof NumberItem && item.getState() instanceof QuantityType) {
171 NumberItem nItem = (NumberItem) item;
172 QuantityType<?> qState = (QuantityType<?>) item.getState();
173 Unit<? extends Quantity<?>> unit = nItem.getUnit();
175 QuantityType<?> convertedState = qState.toUnit(unit);
176 if (convertedState != null) {
177 value = convertedState.doubleValue();
180 "Failed to convert state '{}' to unit '{}'. Please check your item definition for correctness.",
184 value = qState.doubleValue();
187 DecimalType state = item.getStateAs(DecimalType.class);
189 value = state.toBigDecimal().doubleValue();
193 if (db.getDatasource(DATASOURCE_STATE).getType() == DsType.COUNTER) { // counter values must be
194 // adjusted by stepsize
195 value = value * db.getRrdDef().getStep();
197 sample.setValue(DATASOURCE_STATE, value);
199 logger.debug("Stored '{}' as value '{}' in rrd4j database", name, value);
201 } catch (IllegalArgumentException e) {
202 String message = e.getMessage();
203 if (message != null && message.contains("at least one second step is required")) {
204 // we try to store the value one second later
205 ScheduledFuture<?> job = scheduledJobs.get(name);
208 scheduledJobs.remove(name);
210 job = scheduler.schedule(() -> store(item, name), 1, TimeUnit.SECONDS);
211 scheduledJobs.put(name, job);
213 logger.warn("Could not persist '{}' to rrd4j database: {}", name, e.getMessage());
215 } catch (Exception e) {
216 logger.warn("Could not persist '{}' to rrd4j database: {}", name, e.getMessage());
220 } catch (IOException e) {
221 logger.debug("Error closing rrd4j database: {}", e.getMessage());
227 public void store(Item item) {
232 public Iterable<HistoricItem> query(FilterCriteria filter) {
233 String itemName = filter.getItemName();
235 RrdDb db = getDB(itemName);
237 logger.debug("Could not find item '{}' in rrd4j database", itemName);
244 item = itemRegistry.getItem(itemName);
245 if (item instanceof NumberItem) {
246 // we already retrieve the unit here once as it is a very costly operation,
247 // see https://github.com/openhab/openhab-addons/issues/8928
248 unit = ((NumberItem) item).getUnit();
250 } catch (ItemNotFoundException e) {
251 logger.debug("Could not find item '{}' in registry", itemName);
255 long end = filter.getEndDate() == null ? System.currentTimeMillis() / 1000
256 : filter.getEndDate().toInstant().getEpochSecond();
259 if (filter.getBeginDate() == null) {
260 // as rrd goes back for years and gets more and more
261 // inaccurate, we only support descending order
262 // and a single return value
263 // if there is no begin date is given - this case is
264 // required specifically for the historicState()
265 // query, which we want to support
266 if (filter.getOrdering() == Ordering.DESCENDING && filter.getPageSize() == 1
267 && filter.getPageNumber() == 0) {
268 if (filter.getEndDate() == null) {
269 // we are asked only for the most recent value!
270 double lastValue = db.getLastDatasourceValue(DATASOURCE_STATE);
271 if (!Double.isNaN(lastValue)) {
272 HistoricItem rrd4jItem = new RRD4jItem(itemName, mapToState(lastValue, item, unit),
273 ZonedDateTime.ofInstant(Instant.ofEpochMilli(db.getLastArchiveUpdateTime() * 1000),
274 ZoneId.systemDefault()));
275 return List.of(rrd4jItem);
283 throw new UnsupportedOperationException("rrd4j does not allow querys without a begin date, "
284 + "unless order is descending and a single value is requested");
287 start = filter.getBeginDate().toInstant().getEpochSecond();
290 FetchRequest request = db.createFetchRequest(getConsolidationFunction(db), start, end, 1);
291 FetchData result = request.fetchData();
293 List<HistoricItem> items = new ArrayList<>();
294 long ts = result.getFirstTimestamp();
295 long step = result.getRowCount() > 1 ? result.getStep() : 0;
296 for (double value : result.getValues(DATASOURCE_STATE)) {
297 if (!Double.isNaN(value) && (((ts >= start) && (ts <= end)) || (start == end))) {
298 RRD4jItem rrd4jItem = new RRD4jItem(itemName, mapToState(value, item, unit),
299 ZonedDateTime.ofInstant(Instant.ofEpochSecond(ts), ZoneId.systemDefault()));
300 items.add(rrd4jItem);
305 } catch (IOException e) {
306 logger.warn("Could not query rrd4j database for item '{}': {}", itemName, e.getMessage());
312 public Set<PersistenceItemInfo> getItemInfo() {
316 protected synchronized @Nullable RrdDb getDB(String alias) {
318 File file = new File(DB_FOLDER + File.separator + alias + ".rrd");
321 // recreate the RrdDb instance from the file
322 db = RrdDb.of(file.getAbsolutePath());
324 File folder = new File(DB_FOLDER);
325 if (!folder.exists()) {
328 RrdDef rrdDef = getRrdDef(alias, file);
329 if (rrdDef != null) {
330 // create a new database file
331 db = RrdDb.of(rrdDef);
334 "Did not create rrd4j database for item '{}' since no rrd definition could be determined. This is likely due to an unsupported item type.",
338 } catch (IOException e) {
339 logger.error("Could not create rrd4j database file '{}': {}", file.getAbsolutePath(), e.getMessage());
340 } catch (RejectedExecutionException e) {
341 // this happens if the system is shut down
342 logger.debug("Could not create rrd4j database file '{}': {}", file.getAbsolutePath(), e.getMessage());
347 private @Nullable RrdDefConfig getRrdDefConfig(String itemName) {
348 RrdDefConfig useRdc = null;
349 for (Map.Entry<String, RrdDefConfig> e : rrdDefs.entrySet()) {
350 // try to find special config
351 RrdDefConfig rdc = e.getValue();
352 if (rdc.appliesTo(itemName)) {
357 if (useRdc == null) { // not defined, use defaults
359 Item item = itemRegistry.getItem(itemName);
360 if (!isSupportedItemType(item)) {
363 if (item instanceof NumberItem) {
364 NumberItem numberItem = (NumberItem) item;
365 useRdc = numberItem.getDimension() != null ? rrdDefs.get(DEFAULT_QUANTIFIABLE)
366 : rrdDefs.get(DEFAULT_NUMERIC);
368 useRdc = rrdDefs.get(DEFAULT_OTHER);
370 } catch (ItemNotFoundException e) {
371 logger.debug("Could not find item '{}' in registry", itemName);
375 logger.trace("Using rrd definition '{}' for item '{}'.", useRdc, itemName);
379 private @Nullable RrdDef getRrdDef(String itemName, File file) {
380 RrdDef rrdDef = new RrdDef(file.getAbsolutePath());
381 RrdDefConfig useRdc = getRrdDefConfig(itemName);
382 if (useRdc != null) {
383 rrdDef.setStep(useRdc.step);
384 rrdDef.setStartTime(System.currentTimeMillis() / 1000 - 1);
385 rrdDef.addDatasource(DATASOURCE_STATE, useRdc.dsType, useRdc.heartbeat, useRdc.min, useRdc.max);
386 for (RrdArchiveDef rad : useRdc.archives) {
387 rrdDef.addArchive(rad.fcn, rad.xff, rad.steps, rad.rows);
395 public ConsolFun getConsolidationFunction(RrdDb db) {
397 return db.getRrdDef().getArcDefs()[0].getConsolFun();
398 } catch (IOException e) {
399 return ConsolFun.MAX;
403 @SuppressWarnings({ "unchecked", "rawtypes" })
404 private State mapToState(double value, @Nullable Item item, @Nullable Unit unit) {
405 if (item instanceof GroupItem) {
406 item = ((GroupItem) item).getBaseItem();
409 if (item instanceof SwitchItem && !(item instanceof DimmerItem)) {
410 return value == 0.0d ? OnOffType.OFF : OnOffType.ON;
411 } else if (item instanceof ContactItem) {
412 return value == 0.0d ? OpenClosedType.CLOSED : OpenClosedType.OPEN;
413 } else if (item instanceof DimmerItem || item instanceof RollershutterItem || item instanceof ColorItem) {
414 // make sure Items that need PercentTypes instead of DecimalTypes do receive the right information
415 return new PercentType((int) Math.round(value * 100));
416 } else if (item instanceof NumberItem) {
418 return new QuantityType(value, unit);
420 return new DecimalType(value);
423 return new DecimalType(value);
426 private boolean isSupportedItemType(Item item) {
427 if (item instanceof GroupItem) {
428 final Item baseItem = ((GroupItem) item).getBaseItem();
429 if (baseItem != null) {
434 return SUPPORTED_TYPES.contains(ItemUtil.getMainItemType(item.getType()));
437 private static String getUserPersistenceDataFolder() {
438 return OpenHAB.getUserDataFolder() + File.separator + "persistence";
442 protected void activate(final Map<String, Object> config) {
447 protected void modified(final Map<String, Object> config) {
448 // clean existing definitions
451 // add default configurations
453 RrdDefConfig defaultNumeric = new RrdDefConfig(DEFAULT_NUMERIC);
454 // use 10 seconds as a step size for numeric values and allow a 10 minute silence between updates
455 defaultNumeric.setDef("GAUGE,600,U,U,10");
456 // define 5 different boxes:
457 // 1. granularity of 10s for the last hour
458 // 2. granularity of 1m for the last week
459 // 3. granularity of 15m for the last year
460 // 4. granularity of 1h for the last 5 years
461 // 5. granularity of 1d for the last 10 years
463 .addArchives("LAST,0.5,1,360:LAST,0.5,6,10080:LAST,0.5,90,36500:LAST,0.5,360,43800:LAST,0.5,8640,3650");
464 rrdDefs.put(DEFAULT_NUMERIC, defaultNumeric);
466 RrdDefConfig defaultQuantifiable = new RrdDefConfig(DEFAULT_QUANTIFIABLE);
467 // use 10 seconds as a step size for numeric values and allow a 10 minute silence between updates
468 defaultQuantifiable.setDef("GAUGE,600,U,U,10");
469 // define 5 different boxes:
470 // 1. granularity of 10s for the last hour
471 // 2. granularity of 1m for the last week
472 // 3. granularity of 15m for the last year
473 // 4. granularity of 1h for the last 5 years
474 // 5. granularity of 1d for the last 10 years
475 defaultQuantifiable.addArchives(
476 "AVERAGE,0.5,1,360:AVERAGE,0.5,6,10080:AVERAGE,0.5,90,36500:AVERAGE,0.5,360,43800:AVERAGE,0.5,8640,3650");
477 rrdDefs.put(DEFAULT_QUANTIFIABLE, defaultQuantifiable);
479 RrdDefConfig defaultOther = new RrdDefConfig(DEFAULT_OTHER);
480 // use 5 seconds as a step size for discrete values and allow a 1h silence between updates
481 defaultOther.setDef("GAUGE,3600,U,U,5");
482 // define 4 different boxes:
483 // 1. granularity of 5s for the last hour
484 // 2. granularity of 1m for the last week
485 // 3. granularity of 15m for the last year
486 // 4. granularity of 4h for the last 10 years
487 defaultOther.addArchives("LAST,0.5,1,720:LAST,0.5,12,10080:LAST,0.5,180,35040:LAST,0.5,2880,21900");
488 rrdDefs.put(DEFAULT_OTHER, defaultOther);
490 if (config.isEmpty()) {
491 logger.debug("using default configuration only");
495 Iterator<String> keys = config.keySet().iterator();
496 while (keys.hasNext()) {
497 String key = keys.next();
499 if (key.equals("service.pid") || key.equals("component.name")) {
500 // ignore service.pid and name
504 String[] subkeys = key.split("\\.");
505 if (subkeys.length != 2) {
506 logger.debug("config '{}' should have the format 'name.configkey'", key);
510 Object v = config.get(key);
511 if (v instanceof String) {
512 String value = (String) v;
513 String name = subkeys[0].toLowerCase();
514 String property = subkeys[1].toLowerCase();
516 if (value.isBlank()) {
517 logger.trace("Config is empty: {}", property);
520 logger.trace("Processing config: {} = {}", property, value);
523 RrdDefConfig rrdDef = rrdDefs.get(name);
524 if (rrdDef == null) {
525 rrdDef = new RrdDefConfig(name);
526 rrdDefs.put(name, rrdDef);
530 if (property.equals("def")) {
531 rrdDef.setDef(value);
532 } else if (property.equals("archives")) {
533 rrdDef.addArchives(value);
534 } else if (property.equals("items")) {
535 rrdDef.addItems(value);
537 logger.debug("Unknown property {} : {}", property, value);
539 } catch (IllegalArgumentException e) {
540 logger.warn("Ignoring illegal configuration: {}", e.getMessage());
545 for (RrdDefConfig rrdDef : rrdDefs.values()) {
546 if (rrdDef.isValid()) {
547 logger.debug("Created {}", rrdDef);
549 logger.info("Removing invalid definition {}", rrdDef);
550 rrdDefs.remove(rrdDef.name);
555 private class RrdArchiveDef {
556 public @Nullable ConsolFun fcn;
558 public int steps, rows;
561 public String toString() {
562 StringBuilder sb = new StringBuilder(" " + fcn);
563 sb.append(" xff = ").append(xff);
564 sb.append(" steps = ").append(steps);
565 sb.append(" rows = ").append(rows);
566 return sb.toString();
570 private class RrdDefConfig {
572 public @Nullable DsType dsType;
573 public int heartbeat, step;
574 public double min, max;
575 public List<RrdArchiveDef> archives;
576 public List<String> itemNames;
578 private boolean isInitialized;
580 public RrdDefConfig(String name) {
582 archives = new ArrayList<>();
583 itemNames = new ArrayList<>();
584 isInitialized = false;
587 public void setDef(String defString) {
588 String[] opts = defString.split(",");
589 if (opts.length != 5) { // check if correct number of parameters
590 logger.warn("invalid number of parameters {}: {}", name, defString);
594 if (opts[0].equals("ABSOLUTE")) { // dsType
595 dsType = DsType.ABSOLUTE;
596 } else if (opts[0].equals("COUNTER")) {
597 dsType = DsType.COUNTER;
598 } else if (opts[0].equals("DERIVE")) {
599 dsType = DsType.DERIVE;
600 } else if (opts[0].equals("GAUGE")) {
601 dsType = DsType.GAUGE;
603 logger.warn("{}: dsType {} not supported", name, opts[0]);
606 heartbeat = Integer.parseInt(opts[1]);
608 if (opts[2].equals("U")) {
611 min = Double.parseDouble(opts[2]);
614 if (opts[3].equals("U")) {
617 max = Double.parseDouble(opts[3]);
620 step = Integer.parseInt(opts[4]);
622 isInitialized = true; // successfully initialized
627 public void addArchives(String archivesString) {
628 String splitArchives[] = archivesString.split(":");
629 for (String archiveString : splitArchives) {
630 String[] opts = archiveString.split(",");
631 if (opts.length != 4) { // check if correct number of parameters
632 logger.warn("invalid number of parameters {}: {}", name, archiveString);
635 RrdArchiveDef arc = new RrdArchiveDef();
637 if (opts[0].equals("AVERAGE")) {
638 arc.fcn = ConsolFun.AVERAGE;
639 } else if (opts[0].equals("MIN")) {
640 arc.fcn = ConsolFun.MIN;
641 } else if (opts[0].equals("MAX")) {
642 arc.fcn = ConsolFun.MAX;
643 } else if (opts[0].equals("LAST")) {
644 arc.fcn = ConsolFun.LAST;
645 } else if (opts[0].equals("FIRST")) {
646 arc.fcn = ConsolFun.FIRST;
647 } else if (opts[0].equals("TOTAL")) {
648 arc.fcn = ConsolFun.TOTAL;
650 logger.warn("{}: consolidation function {} not supported", name, opts[0]);
652 arc.xff = Double.parseDouble(opts[1]);
653 arc.steps = Integer.parseInt(opts[2]);
654 arc.rows = Integer.parseInt(opts[3]);
659 public void addItems(String itemsString) {
660 String splitItems[] = itemsString.split(",");
661 for (String item : splitItems) {
666 public boolean appliesTo(String item) {
667 return itemNames.contains(item);
670 public boolean isValid() { // a valid configuration must be initialized
671 // and contain at least one function
672 return isInitialized && !archives.isEmpty();
676 public String toString() {
677 StringBuilder sb = new StringBuilder(name);
678 sb.append(" = ").append(dsType);
679 sb.append(" heartbeat = ").append(heartbeat);
680 sb.append(" min/max = ").append(min).append("/").append(max);
681 sb.append(" step = ").append(step);
682 sb.append(" ").append(archives.size()).append(" archives(s) = [");
683 for (RrdArchiveDef arc : archives) {
684 sb.append(arc.toString());
687 sb.append(itemNames.size()).append(" items(s) = [");
688 for (String item : itemNames) {
689 sb.append(item).append(" ");
692 return sb.toString();
697 public List<PersistenceStrategy> getDefaultStrategies() {
698 return List.of(PersistenceStrategy.Globals.RESTORE, PersistenceStrategy.Globals.CHANGE,
699 new PersistenceCronStrategy("everyMinute", "0 * * * * ?"));