]> git.basschouten.com Git - openhab-addons.git/commitdiff
[rrd4j] Improve event handling (#15223)
authorJ-N-K <github@klug.nrw>
Thu, 13 Jul 2023 20:11:14 +0000 (22:11 +0200)
committerGitHub <noreply@github.com>
Thu, 13 Jul 2023 20:11:14 +0000 (22:11 +0200)
* [rrd4j] Improve event handling

Signed-off-by: Jan N. Klug <github@klug.nrw>
bundles/org.openhab.persistence.rrd4j/src/main/java/org/openhab/persistence/rrd4j/internal/RRD4jPersistenceService.java

index b0d309560490a8653cec674dd9f87ed2ebcb89c3..9e8ef713650254b0a36df72e7af12647d21eb4cc 100644 (file)
@@ -20,13 +20,13 @@ import java.time.ZoneId;
 import java.time.ZonedDateTime;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
@@ -71,6 +71,7 @@ import org.openhab.core.types.State;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.ConfigurationPolicy;
+import org.osgi.service.component.annotations.Deactivate;
 import org.osgi.service.component.annotations.Modified;
 import org.osgi.service.component.annotations.Reference;
 import org.rrd4j.ConsolFun;
@@ -108,11 +109,13 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
     private static final Set<String> SUPPORTED_TYPES = Set.of(CoreItemFactory.SWITCH, CoreItemFactory.CONTACT,
             CoreItemFactory.DIMMER, CoreItemFactory.NUMBER, CoreItemFactory.ROLLERSHUTTER, CoreItemFactory.COLOR);
 
-    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3,
+    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1,
             new NamedThreadFactory("RRD4j"));
 
     private final Map<String, RrdDefConfig> rrdDefs = new ConcurrentHashMap<>();
 
+    private final ConcurrentSkipListMap<Long, Map<String, Double>> storageMap = new ConcurrentSkipListMap<>();
+
     private static final String DATASOURCE_STATE = "state";
 
     private static final Path DB_FOLDER = Path.of(OpenHAB.getUserDataFolder(), "persistence", "rrd4j").toAbsolutePath();
@@ -120,10 +123,8 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
     private static final RrdDbPool DATABASE_POOL = new RrdDbPool();
 
     private final Logger logger = LoggerFactory.getLogger(RRD4jPersistenceService.class);
-
-    private final Map<String, ScheduledFuture<?>> scheduledJobs = new HashMap<>();
-
     private final ItemRegistry itemRegistry;
+    private boolean active = false;
 
     public static Path getDatabasePath(String name) {
         return DB_FOLDER.resolve(name + ".rrd");
@@ -133,9 +134,132 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
         return DATABASE_POOL;
     }
 
+    private final ScheduledFuture<?> storeJob;
+
     @Activate
-    public RRD4jPersistenceService(final @Reference ItemRegistry itemRegistry) {
+    public RRD4jPersistenceService(final @Reference ItemRegistry itemRegistry, Map<String, Object> config) {
         this.itemRegistry = itemRegistry;
+        storeJob = scheduler.scheduleWithFixedDelay(() -> doStore(false), 1, 1, TimeUnit.SECONDS);
+        modified(config);
+        active = true;
+    }
+
+    @Modified
+    protected void modified(final Map<String, Object> config) {
+        // clean existing definitions
+        rrdDefs.clear();
+
+        // add default configurations
+
+        RrdDefConfig defaultNumeric = new RrdDefConfig(DEFAULT_NUMERIC);
+        // use 10 seconds as a step size for numeric values and allow a 10 minute silence between updates
+        defaultNumeric.setDef("GAUGE,600,U,U,10");
+        // define 5 different boxes:
+        // 1. granularity of 10s for the last hour
+        // 2. granularity of 1m for the last week
+        // 3. granularity of 15m for the last year
+        // 4. granularity of 1h for the last 5 years
+        // 5. granularity of 1d for the last 10 years
+        defaultNumeric
+                .addArchives("LAST,0.5,1,360:LAST,0.5,6,10080:LAST,0.5,90,36500:LAST,0.5,360,43800:LAST,0.5,8640,3650");
+        rrdDefs.put(DEFAULT_NUMERIC, defaultNumeric);
+
+        RrdDefConfig defaultQuantifiable = new RrdDefConfig(DEFAULT_QUANTIFIABLE);
+        // use 10 seconds as a step size for numeric values and allow a 10 minute silence between updates
+        defaultQuantifiable.setDef("GAUGE,600,U,U,10");
+        // define 5 different boxes:
+        // 1. granularity of 10s for the last hour
+        // 2. granularity of 1m for the last week
+        // 3. granularity of 15m for the last year
+        // 4. granularity of 1h for the last 5 years
+        // 5. granularity of 1d for the last 10 years
+        defaultQuantifiable.addArchives(
+                "AVERAGE,0.5,1,360:AVERAGE,0.5,6,10080:AVERAGE,0.5,90,36500:AVERAGE,0.5,360,43800:AVERAGE,0.5,8640,3650");
+        rrdDefs.put(DEFAULT_QUANTIFIABLE, defaultQuantifiable);
+
+        RrdDefConfig defaultOther = new RrdDefConfig(DEFAULT_OTHER);
+        // use 5 seconds as a step size for discrete values and allow a 1h silence between updates
+        defaultOther.setDef("GAUGE,3600,U,U,5");
+        // define 4 different boxes:
+        // 1. granularity of 5s for the last hour
+        // 2. granularity of 1m for the last week
+        // 3. granularity of 15m for the last year
+        // 4. granularity of 4h for the last 10 years
+        defaultOther.addArchives("LAST,0.5,1,720:LAST,0.5,12,10080:LAST,0.5,180,35040:LAST,0.5,2880,21900");
+        rrdDefs.put(DEFAULT_OTHER, defaultOther);
+
+        if (config.isEmpty()) {
+            logger.debug("using default configuration only");
+            return;
+        }
+
+        Iterator<String> keys = config.keySet().iterator();
+        while (keys.hasNext()) {
+            String key = keys.next();
+
+            if ("service.pid".equals(key) || "component.name".equals(key)) {
+                // ignore service.pid and name
+                continue;
+            }
+
+            String[] subkeys = key.split("\\.");
+            if (subkeys.length != 2) {
+                logger.debug("config '{}' should have the format 'name.configkey'", key);
+                continue;
+            }
+
+            Object v = config.get(key);
+            if (v instanceof String) {
+                String value = (String) v;
+                String name = subkeys[0].toLowerCase();
+                String property = subkeys[1].toLowerCase();
+
+                if (value.isBlank()) {
+                    logger.trace("Config is empty: {}", property);
+                    continue;
+                } else {
+                    logger.trace("Processing config: {} = {}", property, value);
+                }
+
+                RrdDefConfig rrdDef = rrdDefs.get(name);
+                if (rrdDef == null) {
+                    rrdDef = new RrdDefConfig(name);
+                    rrdDefs.put(name, rrdDef);
+                }
+
+                try {
+                    if ("def".equals(property)) {
+                        rrdDef.setDef(value);
+                    } else if ("archives".equals(property)) {
+                        rrdDef.addArchives(value);
+                    } else if ("items".equals(property)) {
+                        rrdDef.addItems(value);
+                    } else {
+                        logger.debug("Unknown property {} : {}", property, value);
+                    }
+                } catch (IllegalArgumentException e) {
+                    logger.warn("Ignoring illegal configuration: {}", e.getMessage());
+                }
+            }
+        }
+
+        for (RrdDefConfig rrdDef : rrdDefs.values()) {
+            if (rrdDef.isValid()) {
+                logger.debug("Created {}", rrdDef);
+            } else {
+                logger.info("Removing invalid definition {}", rrdDef);
+                rrdDefs.remove(rrdDef.name);
+            }
+        }
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        active = false;
+        storeJob.cancel(false);
+
+        // make sure we really store everything
+        doStore(true);
     }
 
     @Override
@@ -150,6 +274,11 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
 
     @Override
     public void store(final Item item, @Nullable final String alias) {
+        if (!active) {
+            logger.warn("Tried to store {} but service is not yet ready (or shutting down).", item);
+            return;
+        }
+
         if (!isSupportedItemType(item)) {
             logger.trace("Ignoring item '{}' since its type {} is not supported", item.getName(), item.getType());
             return;
@@ -158,9 +287,7 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
 
         Double value;
 
-        if (item instanceof NumberItem && item.getState() instanceof QuantityType) {
-            NumberItem nItem = (NumberItem) item;
-            QuantityType<?> qState = (QuantityType<?>) item.getState();
+        if (item instanceof NumberItem nItem && item.getState() instanceof QuantityType<?> qState) {
             Unit<? extends Quantity<?>> unit = nItem.getUnit();
             if (unit != null) {
                 QuantityType<?> convertedState = qState.toUnit(unit);
@@ -190,11 +317,30 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
         }
 
         long now = System.currentTimeMillis() / 1000;
+        Double oldValue = storageMap.computeIfAbsent(now, t -> new ConcurrentHashMap<>()).put(name, value);
+        if (oldValue != null && !oldValue.equals(value)) {
+            logger.debug(
+                    "Discarding value {} for item {} with timestamp {} because a new value ({}) arrived with the same timestamp.",
+                    oldValue, name, now, value);
+        }
+    }
 
-        scheduler.schedule(() -> internalStore(name, value, now, true), 0, TimeUnit.SECONDS);
+    private void doStore(boolean force) {
+        while (!storageMap.isEmpty()) {
+            long timestamp = storageMap.firstKey();
+            long now = System.currentTimeMillis() / 1000;
+            if (now > timestamp || force) {
+                // no new elements can be added for this timestamp because we are already past that time or the service
+                // requires forced storing
+                Map<String, Double> values = storageMap.pollFirstEntry().getValue();
+                values.forEach((name, value) -> writePointToDatabase(name, value, timestamp));
+            } else {
+                return;
+            }
+        }
     }
 
-    private synchronized void internalStore(String name, double value, long now, boolean retry) {
+    private synchronized void writePointToDatabase(String name, double value, long timestamp) {
         RrdDb db = null;
         try {
             db = getDB(name, true);
@@ -205,41 +351,31 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
             return;
         }
 
-        try {
-            if (now < db.getLastUpdateTime()) {
-                logger.warn("RRD4J does not support adding past value this={}, last update={}. Discarding {} - {}", now,
-                        db.getLastUpdateTime(), name, value);
-                return;
-            }
-        } catch (IOException ignored) {
-            // we can ignore that here, we'll fail again later.
-        }
-
         ConsolFun function = getConsolidationFunction(db);
         if (function != ConsolFun.AVERAGE) {
             try {
                 // we store the last value again, so that the value change
                 // in the database is not interpolated, but
                 // happens right at this spot
-                if (now - 1 > db.getLastUpdateTime()) {
+                if (timestamp - 1 > db.getLastUpdateTime()) {
                     // only do it if there is not already a value
                     double lastValue = db.getLastDatasourceValue(DATASOURCE_STATE);
                     if (!Double.isNaN(lastValue)) {
                         Sample sample = db.createSample();
-                        sample.setTime(now - 1);
+                        sample.setTime(timestamp - 1);
                         sample.setValue(DATASOURCE_STATE, lastValue);
                         sample.update();
                         logger.debug("Stored '{}' as value '{}' with timestamp {} in rrd4j database (again)", name,
-                                lastValue, now - 1);
+                                lastValue, timestamp - 1);
                     }
                 }
             } catch (IOException e) {
-                logger.debug("Error storing last value (again): {}", e.getMessage());
+                logger.debug("Error storing last value (again) for {}: {}", e.getMessage(), name);
             }
         }
         try {
             Sample sample = db.createSample();
-            sample.setTime(now);
+            sample.setTime(timestamp);
             double storeValue = value;
             if (db.getDatasource(DATASOURCE_STATE).getType() == DsType.COUNTER) {
                 // counter values must be adjusted by stepsize
@@ -247,20 +383,7 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
             }
             sample.setValue(DATASOURCE_STATE, storeValue);
             sample.update();
-            logger.debug("Stored '{}' as value '{}' with timestamp {} in rrd4j database", name, storeValue, now);
-        } catch (IllegalArgumentException e) {
-            String message = e.getMessage();
-            if (message != null && message.contains("at least one second step is required") && retry) {
-                // we try to store the value one second later
-                ScheduledFuture<?> job = scheduledJobs.get(name);
-                if (job != null) {
-                    job.cancel(true);
-                    scheduledJobs.remove(name);
-                }
-                internalStore(name, value, now + 1, false);
-            } else {
-                logger.warn("Could not persist '{}' to rrd4j database: {}", name, e.getMessage());
-            }
+            logger.debug("Stored '{}' as value '{}' with timestamp {} in rrd4j database", name, storeValue, timestamp);
         } catch (Exception e) {
             logger.warn("Could not persist '{}' to rrd4j database: {}", name, e.getMessage());
         }
@@ -524,120 +647,6 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
         }
     }
 
-    @Activate
-    protected void activate(final Map<String, Object> config) {
-        modified(config);
-    }
-
-    @Modified
-    protected void modified(final Map<String, Object> config) {
-        // clean existing definitions
-        rrdDefs.clear();
-
-        // add default configurations
-
-        RrdDefConfig defaultNumeric = new RrdDefConfig(DEFAULT_NUMERIC);
-        // use 10 seconds as a step size for numeric values and allow a 10 minute silence between updates
-        defaultNumeric.setDef("GAUGE,600,U,U,10");
-        // define 5 different boxes:
-        // 1. granularity of 10s for the last hour
-        // 2. granularity of 1m for the last week
-        // 3. granularity of 15m for the last year
-        // 4. granularity of 1h for the last 5 years
-        // 5. granularity of 1d for the last 10 years
-        defaultNumeric
-                .addArchives("LAST,0.5,1,360:LAST,0.5,6,10080:LAST,0.5,90,36500:LAST,0.5,360,43800:LAST,0.5,8640,3650");
-        rrdDefs.put(DEFAULT_NUMERIC, defaultNumeric);
-
-        RrdDefConfig defaultQuantifiable = new RrdDefConfig(DEFAULT_QUANTIFIABLE);
-        // use 10 seconds as a step size for numeric values and allow a 10 minute silence between updates
-        defaultQuantifiable.setDef("GAUGE,600,U,U,10");
-        // define 5 different boxes:
-        // 1. granularity of 10s for the last hour
-        // 2. granularity of 1m for the last week
-        // 3. granularity of 15m for the last year
-        // 4. granularity of 1h for the last 5 years
-        // 5. granularity of 1d for the last 10 years
-        defaultQuantifiable.addArchives(
-                "AVERAGE,0.5,1,360:AVERAGE,0.5,6,10080:AVERAGE,0.5,90,36500:AVERAGE,0.5,360,43800:AVERAGE,0.5,8640,3650");
-        rrdDefs.put(DEFAULT_QUANTIFIABLE, defaultQuantifiable);
-
-        RrdDefConfig defaultOther = new RrdDefConfig(DEFAULT_OTHER);
-        // use 5 seconds as a step size for discrete values and allow a 1h silence between updates
-        defaultOther.setDef("GAUGE,3600,U,U,5");
-        // define 4 different boxes:
-        // 1. granularity of 5s for the last hour
-        // 2. granularity of 1m for the last week
-        // 3. granularity of 15m for the last year
-        // 4. granularity of 4h for the last 10 years
-        defaultOther.addArchives("LAST,0.5,1,720:LAST,0.5,12,10080:LAST,0.5,180,35040:LAST,0.5,2880,21900");
-        rrdDefs.put(DEFAULT_OTHER, defaultOther);
-
-        if (config.isEmpty()) {
-            logger.debug("using default configuration only");
-            return;
-        }
-
-        Iterator<String> keys = config.keySet().iterator();
-        while (keys.hasNext()) {
-            String key = keys.next();
-
-            if ("service.pid".equals(key) || "component.name".equals(key)) {
-                // ignore service.pid and name
-                continue;
-            }
-
-            String[] subkeys = key.split("\\.");
-            if (subkeys.length != 2) {
-                logger.debug("config '{}' should have the format 'name.configkey'", key);
-                continue;
-            }
-
-            Object v = config.get(key);
-            if (v instanceof String) {
-                String value = (String) v;
-                String name = subkeys[0].toLowerCase();
-                String property = subkeys[1].toLowerCase();
-
-                if (value.isBlank()) {
-                    logger.trace("Config is empty: {}", property);
-                    continue;
-                } else {
-                    logger.trace("Processing config: {} = {}", property, value);
-                }
-
-                RrdDefConfig rrdDef = rrdDefs.get(name);
-                if (rrdDef == null) {
-                    rrdDef = new RrdDefConfig(name);
-                    rrdDefs.put(name, rrdDef);
-                }
-
-                try {
-                    if ("def".equals(property)) {
-                        rrdDef.setDef(value);
-                    } else if ("archives".equals(property)) {
-                        rrdDef.addArchives(value);
-                    } else if ("items".equals(property)) {
-                        rrdDef.addItems(value);
-                    } else {
-                        logger.debug("Unknown property {} : {}", property, value);
-                    }
-                } catch (IllegalArgumentException e) {
-                    logger.warn("Ignoring illegal configuration: {}", e.getMessage());
-                }
-            }
-        }
-
-        for (RrdDefConfig rrdDef : rrdDefs.values()) {
-            if (rrdDef.isValid()) {
-                logger.debug("Created {}", rrdDef);
-            } else {
-                logger.info("Removing invalid definition {}", rrdDef);
-                rrdDefs.remove(rrdDef.name);
-            }
-        }
-    }
-
     private static class RrdArchiveDef {
         public @Nullable ConsolFun fcn;
         public double xff;