]> git.basschouten.com Git - openhab-addons.git/commitdiff
[rrd4j] Use RrdDbPool to prevent ClosedByInterruptException (#13332)
authorWouter Born <github@maindrain.net>
Mon, 29 Aug 2022 15:15:17 +0000 (17:15 +0200)
committerGitHub <noreply@github.com>
Mon, 29 Aug 2022 15:15:17 +0000 (17:15 +0200)
Using the pool prevents exceptions like:

```
java.nio.channels.ClosedByInterruptException: null
at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:199) ~[?:?]
at sun.nio.ch.FileChannelImpl.endBlocking(FileChannelImpl.java:162) ~[?:?]
at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:388) ~[?:?]
at org.rrd4j.core.RrdNioBackend.<init>(RrdNioBackend.java:94) ~[?:?]
at org.rrd4j.core.RrdNioBackendFactory.open(RrdNioBackendFactory.java:163) ~[?:?]
at org.rrd4j.core.RrdBackendFactory.getBackend(RrdBackendFactory.java:521) ~[?:?]
at org.rrd4j.core.RrdDb.<init>(RrdDb.java:627) ~[?:?]
at org.rrd4j.core.RrdDb.of(RrdDb.java:500) ~[?:?]
at org.openhab.persistence.rrd4j.internal.RRD4jPersistenceService.getDB(RRD4jPersistenceService.java:323) ~[?:?]
at org.openhab.persistence.rrd4j.internal.RRD4jPersistenceService.store(RRD4jPersistenceService.java:141) ~[?:?]
at org.openhab.persistence.rrd4j.internal.RRD4jPersistenceService.lambda$0(RRD4jPersistenceService.java:211) ~[?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
```

Fixes #13297

Also includes a few code improvements.

Signed-off-by: Wouter Born <github@maindrain.net>
bundles/org.openhab.persistence.rrd4j/src/main/java/org/openhab/persistence/rrd4j/internal/RRD4jItem.java
bundles/org.openhab.persistence.rrd4j/src/main/java/org/openhab/persistence/rrd4j/internal/RRD4jPersistenceService.java
bundles/org.openhab.persistence.rrd4j/src/main/java/org/openhab/persistence/rrd4j/internal/charts/RRD4jChartServlet.java

index b763eb25c746efe24cc1638f59ed2ea3583cc05d..cccaf1bd7880301334c27a650889b5660948f149 100644 (file)
@@ -17,6 +17,7 @@ import java.time.format.DateTimeFormatter;
 import java.time.format.FormatStyle;
 import java.util.Locale;
 
+import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.openhab.core.persistence.HistoricItem;
 import org.openhab.core.types.State;
 
@@ -26,6 +27,7 @@ import org.openhab.core.types.State;
  * @author Kai Kreuzer - Initial contribution
  *
  */
+@NonNullByDefault
 public class RRD4jItem implements HistoricItem {
 
     private final String name;
index 91ebd63a0a13fecbb00e90d68e8509bf3f260030..d29170d508b7644c283e4adffb609967f4af9617 100644 (file)
  */
 package org.openhab.persistence.rrd4j.internal;
 
-import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.time.Instant;
 import java.time.ZoneId;
 import java.time.ZonedDateTime;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -74,6 +76,8 @@ import org.rrd4j.DsType;
 import org.rrd4j.core.FetchData;
 import org.rrd4j.core.FetchRequest;
 import org.rrd4j.core.RrdDb;
+import org.rrd4j.core.RrdDb.Builder;
+import org.rrd4j.core.RrdDbPool;
 import org.rrd4j.core.RrdDef;
 import org.rrd4j.core.Sample;
 import org.slf4j.Logger;
@@ -107,13 +111,23 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
 
     private static final String DATASOURCE_STATE = "state";
 
-    public static final String DB_FOLDER = getUserPersistenceDataFolder() + File.separator + "rrd4j";
+    private static final Path DB_FOLDER = Path.of(OpenHAB.getUserDataFolder(), "persistence", "rrd4j").toAbsolutePath();
+
+    private static final RrdDbPool DATABASE_POOL = new RrdDbPool();
 
     private final Logger logger = LoggerFactory.getLogger(RRD4jPersistenceService.class);
 
     private final Map<String, ScheduledFuture<?>> scheduledJobs = new HashMap<>();
 
-    protected final ItemRegistry itemRegistry;
+    private final ItemRegistry itemRegistry;
+
+    public static Path getDatabasePath(String name) {
+        return DB_FOLDER.resolve(name + ".rrd");
+    }
+
+    public static RrdDbPool getDatabasePool() {
+        return DATABASE_POOL;
+    }
 
     @Activate
     public RRD4jPersistenceService(final @Reference ItemRegistry itemRegistry) {
@@ -137,89 +151,92 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
             return;
         }
         final String name = alias == null ? item.getName() : alias;
+
         RrdDb db = getDB(name);
-        if (db != null) {
-            ConsolFun function = getConsolidationFunction(db);
-            long now = System.currentTimeMillis() / 1000;
-            if (function != ConsolFun.AVERAGE) {
-                try {
-                    // we store the last value again, so that the value change
-                    // in the database is not interpolated, but
-                    // happens right at this spot
-                    if (now - 1 > db.getLastUpdateTime()) {
-                        // only do it if there is not already a value
-                        double lastValue = db.getLastDatasourceValue(DATASOURCE_STATE);
-                        if (!Double.isNaN(lastValue)) {
-                            Sample sample = db.createSample();
-                            sample.setTime(now - 1);
-                            sample.setValue(DATASOURCE_STATE, lastValue);
-                            sample.update();
-                            logger.debug("Stored '{}' as value '{}' in rrd4j database (again)", name, lastValue);
-                        }
+        if (db == null) {
+            return;
+        }
+
+        ConsolFun function = getConsolidationFunction(db);
+        long now = System.currentTimeMillis() / 1000;
+        if (function != ConsolFun.AVERAGE) {
+            try {
+                // we store the last value again, so that the value change
+                // in the database is not interpolated, but
+                // happens right at this spot
+                if (now - 1 > db.getLastUpdateTime()) {
+                    // only do it if there is not already a value
+                    double lastValue = db.getLastDatasourceValue(DATASOURCE_STATE);
+                    if (!Double.isNaN(lastValue)) {
+                        Sample sample = db.createSample();
+                        sample.setTime(now - 1);
+                        sample.setValue(DATASOURCE_STATE, lastValue);
+                        sample.update();
+                        logger.debug("Stored '{}' as value '{}' in rrd4j database (again)", name, lastValue);
                     }
-                } catch (IOException e) {
-                    logger.debug("Error storing last value (again): {}", e.getMessage());
                 }
+            } catch (IOException e) {
+                logger.debug("Error storing last value (again): {}", e.getMessage());
             }
-            try {
-                Sample sample = db.createSample();
-                sample.setTime(now);
-
-                Double value = null;
-
-                if (item instanceof NumberItem && item.getState() instanceof QuantityType) {
-                    NumberItem nItem = (NumberItem) item;
-                    QuantityType<?> qState = (QuantityType<?>) item.getState();
-                    Unit<? extends Quantity<?>> unit = nItem.getUnit();
-                    if (unit != null) {
-                        QuantityType<?> convertedState = qState.toUnit(unit);
-                        if (convertedState != null) {
-                            value = convertedState.doubleValue();
-                        } else {
-                            logger.warn(
-                                    "Failed to convert state '{}' to unit '{}'. Please check your item definition for correctness.",
-                                    qState, unit);
-                        }
+        }
+        try {
+            Sample sample = db.createSample();
+            sample.setTime(now);
+
+            Double value = null;
+
+            if (item instanceof NumberItem && item.getState() instanceof QuantityType) {
+                NumberItem nItem = (NumberItem) item;
+                QuantityType<?> qState = (QuantityType<?>) item.getState();
+                Unit<? extends Quantity<?>> unit = nItem.getUnit();
+                if (unit != null) {
+                    QuantityType<?> convertedState = qState.toUnit(unit);
+                    if (convertedState != null) {
+                        value = convertedState.doubleValue();
                     } else {
-                        value = qState.doubleValue();
+                        logger.warn(
+                                "Failed to convert state '{}' to unit '{}'. Please check your item definition for correctness.",
+                                qState, unit);
                     }
                 } else {
-                    DecimalType state = item.getStateAs(DecimalType.class);
-                    if (state != null) {
-                        value = state.toBigDecimal().doubleValue();
-                    }
+                    value = qState.doubleValue();
                 }
-                if (value != null) {
-                    if (db.getDatasource(DATASOURCE_STATE).getType() == DsType.COUNTER) { // counter values must be
-                                                                                          // adjusted by stepsize
-                        value = value * db.getRrdDef().getStep();
-                    }
-                    sample.setValue(DATASOURCE_STATE, value);
-                    sample.update();
-                    logger.debug("Stored '{}' as value '{}' in rrd4j database", name, value);
+            } else {
+                DecimalType state = item.getStateAs(DecimalType.class);
+                if (state != null) {
+                    value = state.toBigDecimal().doubleValue();
                 }
-            } catch (IllegalArgumentException e) {
-                String message = e.getMessage();
-                if (message != null && message.contains("at least one second step is required")) {
-                    // we try to store the value one second later
-                    ScheduledFuture<?> job = scheduledJobs.get(name);
-                    if (job != null) {
-                        job.cancel(true);
-                        scheduledJobs.remove(name);
-                    }
-                    job = scheduler.schedule(() -> store(item, name), 1, TimeUnit.SECONDS);
-                    scheduledJobs.put(name, job);
-                } else {
-                    logger.warn("Could not persist '{}' to rrd4j database: {}", name, e.getMessage());
+            }
+            if (value != null) {
+                if (db.getDatasource(DATASOURCE_STATE).getType() == DsType.COUNTER) { // counter values must be
+                                                                                      // adjusted by stepsize
+                    value = value * db.getRrdDef().getStep();
                 }
-            } catch (Exception e) {
-                logger.warn("Could not persist '{}' to rrd4j database: {}", name, e.getMessage());
+                sample.setValue(DATASOURCE_STATE, value);
+                sample.update();
+                logger.debug("Stored '{}' as value '{}' in rrd4j database", name, value);
             }
-            try {
-                db.close();
-            } catch (IOException e) {
-                logger.debug("Error closing rrd4j database: {}", e.getMessage());
+        } catch (IllegalArgumentException e) {
+            String message = e.getMessage();
+            if (message != null && message.contains("at least one second step is required")) {
+                // we try to store the value one second later
+                ScheduledFuture<?> job = scheduledJobs.get(name);
+                if (job != null) {
+                    job.cancel(true);
+                    scheduledJobs.remove(name);
+                }
+                job = scheduler.schedule(() -> store(item, name), 1, TimeUnit.SECONDS);
+                scheduledJobs.put(name, job);
+            } else {
+                logger.warn("Could not persist '{}' to rrd4j database: {}", name, e.getMessage());
             }
+        } catch (Exception e) {
+            logger.warn("Could not persist '{}' to rrd4j database: {}", name, e.getMessage());
+        }
+        try {
+            db.close();
+        } catch (IOException e) {
+            logger.debug("Error closing rrd4j database: {}", e.getMessage());
         }
     }
 
@@ -305,6 +322,12 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
         } catch (IOException e) {
             logger.warn("Could not query rrd4j database for item '{}': {}", itemName, e.getMessage());
             return List.of();
+        } finally {
+            try {
+                db.close();
+            } catch (IOException e) {
+                logger.debug("Error closing rrd4j database: {}", e.getMessage());
+            }
         }
     }
 
@@ -315,20 +338,24 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
 
     protected synchronized @Nullable RrdDb getDB(String alias) {
         RrdDb db = null;
-        File file = new File(DB_FOLDER + File.separator + alias + ".rrd");
+        Path path = getDatabasePath(alias);
         try {
-            if (file.exists()) {
+            Builder builder = RrdDb.getBuilder();
+            builder.setPool(DATABASE_POOL);
+
+            if (Files.exists(path)) {
                 // recreate the RrdDb instance from the file
-                db = RrdDb.of(file.getAbsolutePath());
+                builder.setPath(path.toString());
+                db = builder.build();
             } else {
-                File folder = new File(DB_FOLDER);
-                if (!folder.exists()) {
-                    folder.mkdirs();
+                if (!Files.exists(DB_FOLDER)) {
+                    Files.createDirectories(DB_FOLDER);
                 }
-                RrdDef rrdDef = getRrdDef(alias, file);
+                RrdDef rrdDef = getRrdDef(alias, path);
                 if (rrdDef != null) {
                     // create a new database file
-                    db = RrdDb.of(rrdDef);
+                    builder.setRrdDef(rrdDef);
+                    db = builder.build();
                 } else {
                     logger.debug(
                             "Did not create rrd4j database for item '{}' since no rrd definition could be determined. This is likely due to an unsupported item type.",
@@ -336,10 +363,10 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
                 }
             }
         } catch (IOException e) {
-            logger.error("Could not create rrd4j database file '{}': {}", file.getAbsolutePath(), e.getMessage());
+            logger.error("Could not create rrd4j database file '{}': {}", path, e.getMessage());
         } catch (RejectedExecutionException e) {
             // this happens if the system is shut down
-            logger.debug("Could not create rrd4j database file '{}': {}", file.getAbsolutePath(), e.getMessage());
+            logger.debug("Could not create rrd4j database file '{}': {}", path, e.getMessage());
         }
         return db;
     }
@@ -376,8 +403,8 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
         return useRdc;
     }
 
-    private @Nullable RrdDef getRrdDef(String itemName, File file) {
-        RrdDef rrdDef = new RrdDef(file.getAbsolutePath());
+    private @Nullable RrdDef getRrdDef(String itemName, Path path) {
+        RrdDef rrdDef = new RrdDef(path.toString());
         RrdDefConfig useRdc = getRrdDefConfig(itemName);
         if (useRdc != null) {
             rrdDef.setStep(useRdc.step);
@@ -416,8 +443,6 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
         } else if (item instanceof NumberItem) {
             if (unit != null) {
                 return new QuantityType(value, unit);
-            } else {
-                return new DecimalType(value);
             }
         }
         return new DecimalType(value);
@@ -434,10 +459,6 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
         return SUPPORTED_TYPES.contains(ItemUtil.getMainItemType(item.getType()));
     }
 
-    private static String getUserPersistenceDataFolder() {
-        return OpenHAB.getUserDataFolder() + File.separator + "persistence";
-    }
-
     @Activate
     protected void activate(final Map<String, Object> config) {
         modified(config);
@@ -496,7 +517,7 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
         while (keys.hasNext()) {
             String key = keys.next();
 
-            if (key.equals("service.pid") || key.equals("component.name")) {
+            if ("service.pid".equals(key) || "component.name".equals(key)) {
                 // ignore service.pid and name
                 continue;
             }
@@ -527,11 +548,11 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
                 }
 
                 try {
-                    if (property.equals("def")) {
+                    if ("def".equals(property)) {
                         rrdDef.setDef(value);
-                    } else if (property.equals("archives")) {
+                    } else if ("archives".equals(property)) {
                         rrdDef.addArchives(value);
-                    } else if (property.equals("items")) {
+                    } else if ("items".equals(property)) {
                         rrdDef.addItems(value);
                     } else {
                         logger.debug("Unknown property {} : {}", property, value);
@@ -552,7 +573,7 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
         }
     }
 
-    private class RrdArchiveDef {
+    private static class RrdArchiveDef {
         public @Nullable ConsolFun fcn;
         public double xff;
         public int steps, rows;
@@ -591,13 +612,13 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
                 return;
             }
 
-            if (opts[0].equals("ABSOLUTE")) { // dsType
+            if ("ABSOLUTE".equals(opts[0])) { // dsType
                 dsType = DsType.ABSOLUTE;
-            } else if (opts[0].equals("COUNTER")) {
+            } else if ("COUNTER".equals(opts[0])) {
                 dsType = DsType.COUNTER;
-            } else if (opts[0].equals("DERIVE")) {
+            } else if ("DERIVE".equals(opts[0])) {
                 dsType = DsType.DERIVE;
-            } else if (opts[0].equals("GAUGE")) {
+            } else if ("GAUGE".equals(opts[0])) {
                 dsType = DsType.GAUGE;
             } else {
                 logger.warn("{}: dsType {} not supported", name, opts[0]);
@@ -605,13 +626,13 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
 
             heartbeat = Integer.parseInt(opts[1]);
 
-            if (opts[2].equals("U")) {
+            if ("U".equals(opts[2])) {
                 min = Double.NaN;
             } else {
                 min = Double.parseDouble(opts[2]);
             }
 
-            if (opts[3].equals("U")) {
+            if ("U".equals(opts[3])) {
                 max = Double.NaN;
             } else {
                 max = Double.parseDouble(opts[3]);
@@ -634,17 +655,17 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
                 }
                 RrdArchiveDef arc = new RrdArchiveDef();
 
-                if (opts[0].equals("AVERAGE")) {
+                if ("AVERAGE".equals(opts[0])) {
                     arc.fcn = ConsolFun.AVERAGE;
-                } else if (opts[0].equals("MIN")) {
+                } else if ("MIN".equals(opts[0])) {
                     arc.fcn = ConsolFun.MIN;
-                } else if (opts[0].equals("MAX")) {
+                } else if ("MAX".equals(opts[0])) {
                     arc.fcn = ConsolFun.MAX;
-                } else if (opts[0].equals("LAST")) {
+                } else if ("LAST".equals(opts[0])) {
                     arc.fcn = ConsolFun.LAST;
-                } else if (opts[0].equals("FIRST")) {
+                } else if ("FIRST".equals(opts[0])) {
                     arc.fcn = ConsolFun.FIRST;
-                } else if (opts[0].equals("TOTAL")) {
+                } else if ("TOTAL".equals(opts[0])) {
                     arc.fcn = ConsolFun.TOTAL;
                 } else {
                     logger.warn("{}: consolidation function  {} not supported", name, opts[0]);
@@ -657,10 +678,7 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
         }
 
         public void addItems(String itemsString) {
-            String splitItems[] = itemsString.split(",");
-            for (String item : splitItems) {
-                itemNames.add(item);
-            }
+            Collections.addAll(itemNames, itemsString.split(","));
         }
 
         public boolean appliesTo(String item) {
index 09f4efcef5ccae6037951ed43330a90dd8d7cbd3..d1ebdad1844dc35131c2cc1eb3de0be03dafc995 100644 (file)
@@ -17,7 +17,6 @@ import static java.util.Map.entry;
 import java.awt.Color;
 import java.awt.Font;
 import java.awt.image.BufferedImage;
-import java.io.File;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.time.Duration;
@@ -50,6 +49,7 @@ import org.osgi.service.http.HttpService;
 import org.osgi.service.http.NamespaceException;
 import org.rrd4j.ConsolFun;
 import org.rrd4j.core.RrdDb;
+import org.rrd4j.core.RrdDb.Builder;
 import org.rrd4j.graph.RrdGraph;
 import org.rrd4j.graph.RrdGraphConstants.FontTag;
 import org.rrd4j.graph.RrdGraphDef;
@@ -120,9 +120,7 @@ public class RRD4jChartServlet implements Servlet, ChartProvider {
         try {
             logger.debug("Starting up rrd chart servlet at {}", SERVLET_NAME);
             httpService.registerServlet(SERVLET_NAME, this, new Hashtable<>(), httpService.createDefaultHttpContext());
-        } catch (NamespaceException e) {
-            logger.error("Error during servlet startup", e);
-        } catch (ServletException e) {
+        } catch (NamespaceException | ServletException e) {
             logger.error("Error during servlet startup", e);
         }
     }
@@ -184,13 +182,17 @@ public class RRD4jChartServlet implements Servlet, ChartProvider {
     protected void addLine(RrdGraphDef graphDef, Item item, int counter) {
         Color color = LINECOLORS[counter % LINECOLORS.length];
         String label = itemUIRegistry.getLabel(item.getName());
-        String rrdName = RRD4jPersistenceService.DB_FOLDER + File.separator + item.getName() + ".rrd";
+        String rrdName = RRD4jPersistenceService.getDatabasePath(item.getName()).toString();
         ConsolFun consolFun;
         if (label != null && label.contains("[") && label.contains("]")) {
             label = label.substring(0, label.indexOf('['));
         }
         try {
-            RrdDb db = RrdDb.of(rrdName);
+            Builder builder = RrdDb.getBuilder();
+            builder.setPool(RRD4jPersistenceService.getDatabasePool());
+            builder.setPath(rrdName);
+
+            RrdDb db = builder.build();
             consolFun = db.getRrdDef().getArcDefs()[0].getConsolFun();
             db.close();
         } catch (IOException e) {