import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.ConfigurationPolicy;
+import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Modified;
import org.osgi.service.component.annotations.Reference;
import org.rrd4j.ConsolFun;
private static final Set<String> SUPPORTED_TYPES = Set.of(CoreItemFactory.SWITCH, CoreItemFactory.CONTACT,
CoreItemFactory.DIMMER, CoreItemFactory.NUMBER, CoreItemFactory.ROLLERSHUTTER, CoreItemFactory.COLOR);
- private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3,
+ private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1,
new NamedThreadFactory("RRD4j"));
private final Map<String, RrdDefConfig> rrdDefs = new ConcurrentHashMap<>();
+ private final ConcurrentSkipListMap<Long, Map<String, Double>> storageMap = new ConcurrentSkipListMap<>();
+
private static final String DATASOURCE_STATE = "state";
private static final Path DB_FOLDER = Path.of(OpenHAB.getUserDataFolder(), "persistence", "rrd4j").toAbsolutePath();
private static final RrdDbPool DATABASE_POOL = new RrdDbPool();
private final Logger logger = LoggerFactory.getLogger(RRD4jPersistenceService.class);
-
- private final Map<String, ScheduledFuture<?>> scheduledJobs = new HashMap<>();
-
private final ItemRegistry itemRegistry;
+ private boolean active = false;
public static Path getDatabasePath(String name) {
return DB_FOLDER.resolve(name + ".rrd");
return DATABASE_POOL;
}
+ private final ScheduledFuture<?> storeJob;
+
@Activate
- public RRD4jPersistenceService(final @Reference ItemRegistry itemRegistry) {
+ public RRD4jPersistenceService(final @Reference ItemRegistry itemRegistry, Map<String, Object> config) {
this.itemRegistry = itemRegistry;
+ storeJob = scheduler.scheduleWithFixedDelay(() -> doStore(false), 1, 1, TimeUnit.SECONDS);
+ modified(config);
+ active = true;
+ }
+
+ @Modified
+ protected void modified(final Map<String, Object> config) {
+ // clean existing definitions
+ rrdDefs.clear();
+
+ // add default configurations
+
+ RrdDefConfig defaultNumeric = new RrdDefConfig(DEFAULT_NUMERIC);
+ // use 10 seconds as a step size for numeric values and allow a 10 minute silence between updates
+ defaultNumeric.setDef("GAUGE,600,U,U,10");
+ // define 5 different boxes:
+ // 1. granularity of 10s for the last hour
+ // 2. granularity of 1m for the last week
+ // 3. granularity of 15m for the last year
+ // 4. granularity of 1h for the last 5 years
+ // 5. granularity of 1d for the last 10 years
+ defaultNumeric
+ .addArchives("LAST,0.5,1,360:LAST,0.5,6,10080:LAST,0.5,90,36500:LAST,0.5,360,43800:LAST,0.5,8640,3650");
+ rrdDefs.put(DEFAULT_NUMERIC, defaultNumeric);
+
+ RrdDefConfig defaultQuantifiable = new RrdDefConfig(DEFAULT_QUANTIFIABLE);
+ // use 10 seconds as a step size for numeric values and allow a 10 minute silence between updates
+ defaultQuantifiable.setDef("GAUGE,600,U,U,10");
+ // define 5 different boxes:
+ // 1. granularity of 10s for the last hour
+ // 2. granularity of 1m for the last week
+ // 3. granularity of 15m for the last year
+ // 4. granularity of 1h for the last 5 years
+ // 5. granularity of 1d for the last 10 years
+ defaultQuantifiable.addArchives(
+ "AVERAGE,0.5,1,360:AVERAGE,0.5,6,10080:AVERAGE,0.5,90,36500:AVERAGE,0.5,360,43800:AVERAGE,0.5,8640,3650");
+ rrdDefs.put(DEFAULT_QUANTIFIABLE, defaultQuantifiable);
+
+ RrdDefConfig defaultOther = new RrdDefConfig(DEFAULT_OTHER);
+ // use 5 seconds as a step size for discrete values and allow a 1h silence between updates
+ defaultOther.setDef("GAUGE,3600,U,U,5");
+ // define 4 different boxes:
+ // 1. granularity of 5s for the last hour
+ // 2. granularity of 1m for the last week
+ // 3. granularity of 15m for the last year
+ // 4. granularity of 4h for the last 10 years
+ defaultOther.addArchives("LAST,0.5,1,720:LAST,0.5,12,10080:LAST,0.5,180,35040:LAST,0.5,2880,21900");
+ rrdDefs.put(DEFAULT_OTHER, defaultOther);
+
+ if (config.isEmpty()) {
+ logger.debug("using default configuration only");
+ return;
+ }
+
+ Iterator<String> keys = config.keySet().iterator();
+ while (keys.hasNext()) {
+ String key = keys.next();
+
+ if ("service.pid".equals(key) || "component.name".equals(key)) {
+ // ignore service.pid and name
+ continue;
+ }
+
+ String[] subkeys = key.split("\\.");
+ if (subkeys.length != 2) {
+ logger.debug("config '{}' should have the format 'name.configkey'", key);
+ continue;
+ }
+
+ Object v = config.get(key);
+ if (v instanceof String) {
+ String value = (String) v;
+ String name = subkeys[0].toLowerCase();
+ String property = subkeys[1].toLowerCase();
+
+ if (value.isBlank()) {
+ logger.trace("Config is empty: {}", property);
+ continue;
+ } else {
+ logger.trace("Processing config: {} = {}", property, value);
+ }
+
+ RrdDefConfig rrdDef = rrdDefs.get(name);
+ if (rrdDef == null) {
+ rrdDef = new RrdDefConfig(name);
+ rrdDefs.put(name, rrdDef);
+ }
+
+ try {
+ if ("def".equals(property)) {
+ rrdDef.setDef(value);
+ } else if ("archives".equals(property)) {
+ rrdDef.addArchives(value);
+ } else if ("items".equals(property)) {
+ rrdDef.addItems(value);
+ } else {
+ logger.debug("Unknown property {} : {}", property, value);
+ }
+ } catch (IllegalArgumentException e) {
+ logger.warn("Ignoring illegal configuration: {}", e.getMessage());
+ }
+ }
+ }
+
+ for (RrdDefConfig rrdDef : rrdDefs.values()) {
+ if (rrdDef.isValid()) {
+ logger.debug("Created {}", rrdDef);
+ } else {
+ logger.info("Removing invalid definition {}", rrdDef);
+ rrdDefs.remove(rrdDef.name);
+ }
+ }
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ active = false;
+ storeJob.cancel(false);
+
+ // make sure we really store everything
+ doStore(true);
}
@Override
@Override
public void store(final Item item, @Nullable final String alias) {
+ if (!active) {
+ logger.warn("Tried to store {} but service is not yet ready (or shutting down).", item);
+ return;
+ }
+
if (!isSupportedItemType(item)) {
logger.trace("Ignoring item '{}' since its type {} is not supported", item.getName(), item.getType());
return;
Double value;
- if (item instanceof NumberItem && item.getState() instanceof QuantityType) {
- NumberItem nItem = (NumberItem) item;
- QuantityType<?> qState = (QuantityType<?>) item.getState();
+ if (item instanceof NumberItem nItem && item.getState() instanceof QuantityType<?> qState) {
Unit<? extends Quantity<?>> unit = nItem.getUnit();
if (unit != null) {
QuantityType<?> convertedState = qState.toUnit(unit);
}
long now = System.currentTimeMillis() / 1000;
+ Double oldValue = storageMap.computeIfAbsent(now, t -> new ConcurrentHashMap<>()).put(name, value);
+ if (oldValue != null && !oldValue.equals(value)) {
+ logger.debug(
+ "Discarding value {} for item {} with timestamp {} because a new value ({}) arrived with the same timestamp.",
+ oldValue, name, now, value);
+ }
+ }
- scheduler.schedule(() -> internalStore(name, value, now, true), 0, TimeUnit.SECONDS);
+ private void doStore(boolean force) {
+ while (!storageMap.isEmpty()) {
+ long timestamp = storageMap.firstKey();
+ long now = System.currentTimeMillis() / 1000;
+ if (now > timestamp || force) {
+ // no new elements can be added for this timestamp because we are already past that time or the service
+ // requires forced storing
+ Map<String, Double> values = storageMap.pollFirstEntry().getValue();
+ values.forEach((name, value) -> writePointToDatabase(name, value, timestamp));
+ } else {
+ return;
+ }
+ }
}
- private synchronized void internalStore(String name, double value, long now, boolean retry) {
+ private synchronized void writePointToDatabase(String name, double value, long timestamp) {
RrdDb db = null;
try {
db = getDB(name, true);
return;
}
- try {
- if (now < db.getLastUpdateTime()) {
- logger.warn("RRD4J does not support adding past value this={}, last update={}. Discarding {} - {}", now,
- db.getLastUpdateTime(), name, value);
- return;
- }
- } catch (IOException ignored) {
- // we can ignore that here, we'll fail again later.
- }
-
ConsolFun function = getConsolidationFunction(db);
if (function != ConsolFun.AVERAGE) {
try {
// we store the last value again, so that the value change
// in the database is not interpolated, but
// happens right at this spot
- if (now - 1 > db.getLastUpdateTime()) {
+ if (timestamp - 1 > db.getLastUpdateTime()) {
// only do it if there is not already a value
double lastValue = db.getLastDatasourceValue(DATASOURCE_STATE);
if (!Double.isNaN(lastValue)) {
Sample sample = db.createSample();
- sample.setTime(now - 1);
+ sample.setTime(timestamp - 1);
sample.setValue(DATASOURCE_STATE, lastValue);
sample.update();
logger.debug("Stored '{}' as value '{}' with timestamp {} in rrd4j database (again)", name,
- lastValue, now - 1);
+ lastValue, timestamp - 1);
}
}
} catch (IOException e) {
- logger.debug("Error storing last value (again): {}", e.getMessage());
+ logger.debug("Error storing last value (again) for {}: {}", e.getMessage(), name);
}
}
try {
Sample sample = db.createSample();
- sample.setTime(now);
+ sample.setTime(timestamp);
double storeValue = value;
if (db.getDatasource(DATASOURCE_STATE).getType() == DsType.COUNTER) {
// counter values must be adjusted by stepsize
}
sample.setValue(DATASOURCE_STATE, storeValue);
sample.update();
- logger.debug("Stored '{}' as value '{}' with timestamp {} in rrd4j database", name, storeValue, now);
- } catch (IllegalArgumentException e) {
- String message = e.getMessage();
- if (message != null && message.contains("at least one second step is required") && retry) {
- // we try to store the value one second later
- ScheduledFuture<?> job = scheduledJobs.get(name);
- if (job != null) {
- job.cancel(true);
- scheduledJobs.remove(name);
- }
- internalStore(name, value, now + 1, false);
- } else {
- logger.warn("Could not persist '{}' to rrd4j database: {}", name, e.getMessage());
- }
+ logger.debug("Stored '{}' as value '{}' with timestamp {} in rrd4j database", name, storeValue, timestamp);
} catch (Exception e) {
logger.warn("Could not persist '{}' to rrd4j database: {}", name, e.getMessage());
}
}
}
- @Activate
- protected void activate(final Map<String, Object> config) {
- modified(config);
- }
-
- @Modified
- protected void modified(final Map<String, Object> config) {
- // clean existing definitions
- rrdDefs.clear();
-
- // add default configurations
-
- RrdDefConfig defaultNumeric = new RrdDefConfig(DEFAULT_NUMERIC);
- // use 10 seconds as a step size for numeric values and allow a 10 minute silence between updates
- defaultNumeric.setDef("GAUGE,600,U,U,10");
- // define 5 different boxes:
- // 1. granularity of 10s for the last hour
- // 2. granularity of 1m for the last week
- // 3. granularity of 15m for the last year
- // 4. granularity of 1h for the last 5 years
- // 5. granularity of 1d for the last 10 years
- defaultNumeric
- .addArchives("LAST,0.5,1,360:LAST,0.5,6,10080:LAST,0.5,90,36500:LAST,0.5,360,43800:LAST,0.5,8640,3650");
- rrdDefs.put(DEFAULT_NUMERIC, defaultNumeric);
-
- RrdDefConfig defaultQuantifiable = new RrdDefConfig(DEFAULT_QUANTIFIABLE);
- // use 10 seconds as a step size for numeric values and allow a 10 minute silence between updates
- defaultQuantifiable.setDef("GAUGE,600,U,U,10");
- // define 5 different boxes:
- // 1. granularity of 10s for the last hour
- // 2. granularity of 1m for the last week
- // 3. granularity of 15m for the last year
- // 4. granularity of 1h for the last 5 years
- // 5. granularity of 1d for the last 10 years
- defaultQuantifiable.addArchives(
- "AVERAGE,0.5,1,360:AVERAGE,0.5,6,10080:AVERAGE,0.5,90,36500:AVERAGE,0.5,360,43800:AVERAGE,0.5,8640,3650");
- rrdDefs.put(DEFAULT_QUANTIFIABLE, defaultQuantifiable);
-
- RrdDefConfig defaultOther = new RrdDefConfig(DEFAULT_OTHER);
- // use 5 seconds as a step size for discrete values and allow a 1h silence between updates
- defaultOther.setDef("GAUGE,3600,U,U,5");
- // define 4 different boxes:
- // 1. granularity of 5s for the last hour
- // 2. granularity of 1m for the last week
- // 3. granularity of 15m for the last year
- // 4. granularity of 4h for the last 10 years
- defaultOther.addArchives("LAST,0.5,1,720:LAST,0.5,12,10080:LAST,0.5,180,35040:LAST,0.5,2880,21900");
- rrdDefs.put(DEFAULT_OTHER, defaultOther);
-
- if (config.isEmpty()) {
- logger.debug("using default configuration only");
- return;
- }
-
- Iterator<String> keys = config.keySet().iterator();
- while (keys.hasNext()) {
- String key = keys.next();
-
- if ("service.pid".equals(key) || "component.name".equals(key)) {
- // ignore service.pid and name
- continue;
- }
-
- String[] subkeys = key.split("\\.");
- if (subkeys.length != 2) {
- logger.debug("config '{}' should have the format 'name.configkey'", key);
- continue;
- }
-
- Object v = config.get(key);
- if (v instanceof String) {
- String value = (String) v;
- String name = subkeys[0].toLowerCase();
- String property = subkeys[1].toLowerCase();
-
- if (value.isBlank()) {
- logger.trace("Config is empty: {}", property);
- continue;
- } else {
- logger.trace("Processing config: {} = {}", property, value);
- }
-
- RrdDefConfig rrdDef = rrdDefs.get(name);
- if (rrdDef == null) {
- rrdDef = new RrdDefConfig(name);
- rrdDefs.put(name, rrdDef);
- }
-
- try {
- if ("def".equals(property)) {
- rrdDef.setDef(value);
- } else if ("archives".equals(property)) {
- rrdDef.addArchives(value);
- } else if ("items".equals(property)) {
- rrdDef.addItems(value);
- } else {
- logger.debug("Unknown property {} : {}", property, value);
- }
- } catch (IllegalArgumentException e) {
- logger.warn("Ignoring illegal configuration: {}", e.getMessage());
- }
- }
- }
-
- for (RrdDefConfig rrdDef : rrdDefs.values()) {
- if (rrdDef.isValid()) {
- logger.debug("Created {}", rrdDef);
- } else {
- logger.info("Removing invalid definition {}", rrdDef);
- rrdDefs.remove(rrdDef.name);
- }
- }
- }
-
private static class RrdArchiveDef {
public @Nullable ConsolFun fcn;
public double xff;