]> git.basschouten.com Git - openhab-addons.git/blob
9dd09004cb488c547542476b401f61947f581293
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.persistence.rrd4j.internal;
14
15 import java.io.File;
16 import java.io.IOException;
17 import java.time.Instant;
18 import java.time.ZoneId;
19 import java.time.ZonedDateTime;
20 import java.util.ArrayList;
21 import java.util.Collections;
22 import java.util.HashMap;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Locale;
26 import java.util.Map;
27 import java.util.Set;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.RejectedExecutionException;
31 import java.util.concurrent.ScheduledExecutorService;
32 import java.util.concurrent.ScheduledFuture;
33 import java.util.concurrent.TimeUnit;
34
35 import org.eclipse.jdt.annotation.NonNullByDefault;
36 import org.eclipse.jdt.annotation.Nullable;
37 import org.openhab.core.OpenHAB;
38 import org.openhab.core.common.NamedThreadFactory;
39 import org.openhab.core.items.Item;
40 import org.openhab.core.items.ItemNotFoundException;
41 import org.openhab.core.items.ItemRegistry;
42 import org.openhab.core.library.items.ContactItem;
43 import org.openhab.core.library.items.DimmerItem;
44 import org.openhab.core.library.items.NumberItem;
45 import org.openhab.core.library.items.RollershutterItem;
46 import org.openhab.core.library.items.SwitchItem;
47 import org.openhab.core.library.types.DecimalType;
48 import org.openhab.core.library.types.OnOffType;
49 import org.openhab.core.library.types.OpenClosedType;
50 import org.openhab.core.library.types.PercentType;
51 import org.openhab.core.persistence.FilterCriteria;
52 import org.openhab.core.persistence.FilterCriteria.Ordering;
53 import org.openhab.core.persistence.HistoricItem;
54 import org.openhab.core.persistence.PersistenceItemInfo;
55 import org.openhab.core.persistence.PersistenceService;
56 import org.openhab.core.persistence.QueryablePersistenceService;
57 import org.openhab.core.persistence.strategy.PersistenceCronStrategy;
58 import org.openhab.core.persistence.strategy.PersistenceStrategy;
59 import org.openhab.core.types.State;
60 import org.osgi.service.component.annotations.Activate;
61 import org.osgi.service.component.annotations.Component;
62 import org.osgi.service.component.annotations.Reference;
63 import org.rrd4j.ConsolFun;
64 import org.rrd4j.DsType;
65 import org.rrd4j.core.FetchData;
66 import org.rrd4j.core.FetchRequest;
67 import org.rrd4j.core.RrdDb;
68 import org.rrd4j.core.RrdDef;
69 import org.rrd4j.core.Sample;
70 import org.slf4j.Logger;
71 import org.slf4j.LoggerFactory;
72
73 /**
74  * This is the implementation of the RRD4j {@link PersistenceService}. To learn
75  * more about RRD4j please visit their
76  * <a href="https://github.com/rrd4j/rrd4j">website</a>.
77  *
78  * @author Kai Kreuzer - Initial contribution
79  * @author Jan N. Klug - some improvements
80  * @author Karel Goderis - remove TimerThread dependency
81  */
82 @NonNullByDefault
83 @Component(service = { PersistenceService.class,
84         QueryablePersistenceService.class }, configurationPid = "org.openhab.rrd4j")
85 public class RRD4jPersistenceService implements QueryablePersistenceService {
86
87     private static final String DEFAULT_OTHER = "default_other";
88     private static final String DEFAULT_NUMERIC = "default_numeric";
89     private static final String DEFAULT_QUANTIFIABLE = "default_quantifiable";
90
91     private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3,
92             new NamedThreadFactory("RRD4j"));
93
94     private final Map<String, @Nullable RrdDefConfig> rrdDefs = new ConcurrentHashMap<>();
95
96     private static final String DATASOURCE_STATE = "state";
97
98     public static final String DB_FOLDER = getUserPersistenceDataFolder() + File.separator + "rrd4j";
99
100     private final Logger logger = LoggerFactory.getLogger(RRD4jPersistenceService.class);
101
102     private final Map<String, @Nullable ScheduledFuture<?>> scheduledJobs = new HashMap<>();
103
104     protected final ItemRegistry itemRegistry;
105
106     @Activate
107     public RRD4jPersistenceService(final @Reference ItemRegistry itemRegistry) {
108         this.itemRegistry = itemRegistry;
109     }
110
111     @Override
112     public String getId() {
113         return "rrd4j";
114     }
115
116     @Override
117     public String getLabel(@Nullable Locale locale) {
118         return "RRD4j";
119     }
120
121     @Override
122     public synchronized void store(final Item item, @Nullable final String alias) {
123         final String name = alias == null ? item.getName() : alias;
124         RrdDb db = getDB(name);
125         if (db != null) {
126             ConsolFun function = getConsolidationFunction(db);
127             long now = System.currentTimeMillis() / 1000;
128             if (function != ConsolFun.AVERAGE) {
129                 try {
130                     // we store the last value again, so that the value change
131                     // in the database is not interpolated, but
132                     // happens right at this spot
133                     if (now - 1 > db.getLastUpdateTime()) {
134                         // only do it if there is not already a value
135                         double lastValue = db.getLastDatasourceValue(DATASOURCE_STATE);
136                         if (!Double.isNaN(lastValue)) {
137                             Sample sample = db.createSample();
138                             sample.setTime(now - 1);
139                             sample.setValue(DATASOURCE_STATE, lastValue);
140                             sample.update();
141                             logger.debug("Stored '{}' with state '{}' in rrd4j database (again)", name,
142                                     mapToState(lastValue, item.getName()));
143                         }
144                     }
145                 } catch (IOException e) {
146                     logger.debug("Error storing last value (again): {}", e.getMessage());
147                 }
148             }
149             try {
150                 Sample sample = db.createSample();
151                 sample.setTime(now);
152
153                 DecimalType state = item.getStateAs(DecimalType.class);
154                 if (state != null) {
155                     double value = state.toBigDecimal().doubleValue();
156                     if (db.getDatasource(DATASOURCE_STATE).getType() == DsType.COUNTER) { // counter values must be
157                                                                                           // adjusted by stepsize
158                         value = value * db.getRrdDef().getStep();
159                     }
160                     sample.setValue(DATASOURCE_STATE, value);
161                     sample.update();
162                     logger.debug("Stored '{}' with state '{}' in rrd4j database", name, state);
163                 }
164             } catch (IllegalArgumentException e) {
165                 if (e.getMessage().contains("at least one second step is required")) {
166                     // we try to store the value one second later
167                     ScheduledFuture<?> job = scheduledJobs.get(name);
168                     if (job != null) {
169                         job.cancel(true);
170                         scheduledJobs.remove(name);
171                     }
172                     job = scheduler.schedule(() -> store(item, name), 1, TimeUnit.SECONDS);
173                     scheduledJobs.put(name, job);
174                 } else {
175                     logger.warn("Could not persist '{}' to rrd4j database: {}", name, e.getMessage());
176                 }
177             } catch (Exception e) {
178                 logger.warn("Could not persist '{}' to rrd4j database: {}", name, e.getMessage());
179             }
180             try {
181                 db.close();
182             } catch (IOException e) {
183                 logger.debug("Error closing rrd4j database: {}", e.getMessage());
184             }
185         }
186     }
187
188     @Override
189     public void store(Item item) {
190         store(item, null);
191     }
192
193     @Override
194     public Iterable<HistoricItem> query(FilterCriteria filter) {
195         String itemName = filter.getItemName();
196         RrdDb db = getDB(itemName);
197         if (db != null) {
198             ConsolFun consolidationFunction = getConsolidationFunction(db);
199             long start = 0L;
200             long end = filter.getEndDate() == null ? System.currentTimeMillis() / 1000
201                     : filter.getEndDate().toInstant().getEpochSecond();
202
203             try {
204                 if (filter.getBeginDate() == null) {
205                     // as rrd goes back for years and gets more and more
206                     // inaccurate, we only support descending order
207                     // and a single return value
208                     // if there is no begin date is given - this case is
209                     // required specifically for the historicState()
210                     // query, which we want to support
211                     if (filter.getOrdering() == Ordering.DESCENDING && filter.getPageSize() == 1
212                             && filter.getPageNumber() == 0) {
213                         if (filter.getEndDate() == null) {
214                             // we are asked only for the most recent value!
215                             double lastValue = db.getLastDatasourceValue(DATASOURCE_STATE);
216                             if (!Double.isNaN(lastValue)) {
217                                 HistoricItem rrd4jItem = new RRD4jItem(itemName, mapToState(lastValue, itemName),
218                                         ZonedDateTime.ofInstant(
219                                                 Instant.ofEpochMilli(db.getLastArchiveUpdateTime() * 1000),
220                                                 ZoneId.systemDefault()));
221                                 return Collections.singletonList(rrd4jItem);
222                             } else {
223                                 return Collections.emptyList();
224                             }
225                         } else {
226                             start = end;
227                         }
228                     } else {
229                         throw new UnsupportedOperationException("rrd4j does not allow querys without a begin date, "
230                                 + "unless order is descending and a single value is requested");
231                     }
232                 } else {
233                     start = filter.getBeginDate().toInstant().getEpochSecond();
234                 }
235                 FetchRequest request = db.createFetchRequest(consolidationFunction, start, end, 1);
236
237                 List<HistoricItem> items = new ArrayList<>();
238                 FetchData result = request.fetchData();
239                 long ts = result.getFirstTimestamp();
240                 long step = result.getRowCount() > 1 ? result.getStep() : 0;
241                 for (double value : result.getValues(DATASOURCE_STATE)) {
242                     if (!Double.isNaN(value) && (((ts >= start) && (ts <= end)) || (start == end))) {
243                         RRD4jItem rrd4jItem = new RRD4jItem(itemName, mapToState(value, itemName),
244                                 ZonedDateTime.ofInstant(Instant.ofEpochMilli(ts * 1000), ZoneId.systemDefault()));
245                         items.add(rrd4jItem);
246                     }
247                     ts += step;
248                 }
249                 return items;
250             } catch (IOException e) {
251                 logger.warn("Could not query rrd4j database for item '{}': {}", itemName, e.getMessage());
252             }
253         }
254         return Collections.emptyList();
255     }
256
257     @Override
258     public Set<PersistenceItemInfo> getItemInfo() {
259         return Collections.emptySet();
260     }
261
262     protected @Nullable synchronized RrdDb getDB(String alias) {
263         RrdDb db = null;
264         File file = new File(DB_FOLDER + File.separator + alias + ".rrd");
265         try {
266             if (file.exists()) {
267                 // recreate the RrdDb instance from the file
268                 db = new RrdDb(file.getAbsolutePath());
269             } else {
270                 File folder = new File(DB_FOLDER);
271                 if (!folder.exists()) {
272                     folder.mkdirs();
273                 }
274                 // create a new database file
275                 db = new RrdDb(getRrdDef(alias, file));
276             }
277         } catch (IOException e) {
278             logger.error("Could not create rrd4j database file '{}': {}", file.getAbsolutePath(), e.getMessage());
279         } catch (RejectedExecutionException e) {
280             // this happens if the system is shut down
281             logger.debug("Could not create rrd4j database file '{}': {}", file.getAbsolutePath(), e.getMessage());
282         }
283         return db;
284     }
285
286     private @Nullable RrdDefConfig getRrdDefConfig(String itemName) {
287         RrdDefConfig useRdc = null;
288         for (Map.Entry<String, @Nullable RrdDefConfig> e : rrdDefs.entrySet()) {
289             // try to find special config
290             RrdDefConfig rdc = e.getValue();
291             if (rdc != null && rdc.appliesTo(itemName)) {
292                 useRdc = rdc;
293                 break;
294             }
295         }
296         if (useRdc == null) { // not defined, use defaults
297             try {
298                 Item item = itemRegistry.getItem(itemName);
299                 if (item instanceof NumberItem) {
300                     NumberItem numberItem = (NumberItem) item;
301                     return numberItem.getDimension() != null ? rrdDefs.get(DEFAULT_QUANTIFIABLE)
302                             : rrdDefs.get(DEFAULT_NUMERIC);
303                 }
304             } catch (ItemNotFoundException e) {
305                 logger.debug("Could not find item '{}' in registry", itemName);
306             }
307         }
308         return rrdDefs.get(DEFAULT_OTHER);
309     }
310
311     private RrdDef getRrdDef(String itemName, File file) {
312         RrdDef rrdDef = new RrdDef(file.getAbsolutePath());
313         RrdDefConfig useRdc = getRrdDefConfig(itemName);
314         if (useRdc != null) {
315             rrdDef.setStep(useRdc.step);
316             rrdDef.setStartTime(System.currentTimeMillis() / 1000 - 1);
317             rrdDef.addDatasource(DATASOURCE_STATE, useRdc.dsType, useRdc.heartbeat, useRdc.min, useRdc.max);
318             for (RrdArchiveDef rad : useRdc.archives) {
319                 rrdDef.addArchive(rad.fcn, rad.xff, rad.steps, rad.rows);
320             }
321         }
322         return rrdDef;
323     }
324
325     public ConsolFun getConsolidationFunction(RrdDb db) {
326         try {
327             return db.getRrdDef().getArcDefs()[0].getConsolFun();
328         } catch (IOException e) {
329             return ConsolFun.MAX;
330         }
331     }
332
333     private State mapToState(double value, String itemName) {
334         try {
335             Item item = itemRegistry.getItem(itemName);
336             if (item instanceof SwitchItem && !(item instanceof DimmerItem)) {
337                 return value == 0.0d ? OnOffType.OFF : OnOffType.ON;
338             } else if (item instanceof ContactItem) {
339                 return value == 0.0d ? OpenClosedType.CLOSED : OpenClosedType.OPEN;
340             } else if (item instanceof DimmerItem || item instanceof RollershutterItem) {
341                 // make sure Items that need PercentTypes instead of DecimalTypes do receive the right information
342                 return new PercentType((int) Math.round(value * 100));
343             }
344         } catch (ItemNotFoundException e) {
345             logger.debug("Could not find item '{}' in registry", itemName);
346         }
347         // just return a DecimalType as a fallback
348         return new DecimalType(value);
349     }
350
351     private static String getUserPersistenceDataFolder() {
352         return OpenHAB.getUserDataFolder() + File.separator + "persistence";
353     }
354
355     /**
356      * @{inheritDoc
357      */
358     public void activate(final Map<String, Object> config) {
359         // add default configurations
360
361         RrdDefConfig defaultNumeric = new RrdDefConfig(DEFAULT_NUMERIC);
362         // use 10 seconds as a step size for numeric values and allow a 10 minute silence between updates
363         defaultNumeric.setDef("GAUGE,600,U,U,10");
364         // define 5 different boxes:
365         // 1. granularity of 10s for the last hour
366         // 2. granularity of 1m for the last week
367         // 3. granularity of 15m for the last year
368         // 4. granularity of 1h for the last 5 years
369         // 5. granularity of 1d for the last 10 years
370         defaultNumeric
371                 .addArchives("LAST,0.5,1,360:LAST,0.5,6,10080:LAST,0.5,90,36500:LAST,0.5,360,43800:LAST,0.5,8640,3650");
372         rrdDefs.put(DEFAULT_NUMERIC, defaultNumeric);
373
374         RrdDefConfig defaultQuantifiable = new RrdDefConfig(DEFAULT_QUANTIFIABLE);
375         // use 10 seconds as a step size for numeric values and allow a 10 minute silence between updates
376         defaultQuantifiable.setDef("GAUGE,600,U,U,10");
377         // define 5 different boxes:
378         // 1. granularity of 10s for the last hour
379         // 2. granularity of 1m for the last week
380         // 3. granularity of 15m for the last year
381         // 4. granularity of 1h for the last 5 years
382         // 5. granularity of 1d for the last 10 years
383         defaultQuantifiable.addArchives(
384                 "AVERAGE,0.5,1,360:AVERAGE,0.5,6,10080:AVERAGE,0.5,90,36500:AVERAGE,0.5,360,43800:AVERAGE,0.5,8640,3650");
385         rrdDefs.put(DEFAULT_QUANTIFIABLE, defaultQuantifiable);
386
387         RrdDefConfig defaultOther = new RrdDefConfig(DEFAULT_OTHER);
388         // use 5 seconds as a step size for discrete values and allow a 1h silence between updates
389         defaultOther.setDef("GAUGE,3600,U,U,5");
390         // define 4 different boxes:
391         // 1. granularity of 5s for the last hour
392         // 2. granularity of 1m for the last week
393         // 3. granularity of 15m for the last year
394         // 4. granularity of 4h for the last 10 years
395         defaultOther.addArchives("LAST,0.5,1,720:LAST,0.5,12,10080:LAST,0.5,180,35040:LAST,0.5,2880,21900");
396         rrdDefs.put(DEFAULT_OTHER, defaultOther);
397
398         if (config.isEmpty()) {
399             logger.debug("using default configuration only");
400             return;
401         }
402
403         Iterator<String> keys = config.keySet().iterator();
404         while (keys.hasNext()) {
405             String key = keys.next();
406
407             if (key.equals("service.pid") || key.equals("component.name")) {
408                 // ignore service.pid and name
409                 continue;
410             }
411
412             String[] subkeys = key.split("\\.");
413             if (subkeys.length != 2) {
414                 logger.debug("config '{}' should have the format 'name.configkey'", key);
415                 continue;
416             }
417
418             Object v = config.get(key);
419             if (v instanceof String) {
420                 String value = (String) v;
421                 String name = subkeys[0].toLowerCase();
422                 String property = subkeys[1].toLowerCase();
423
424                 if (value.isBlank()) {
425                     logger.trace("Config is empty: {}", property);
426                     continue;
427                 } else {
428                     logger.trace("Processing config: {} = {}", property, value);
429                 }
430
431                 RrdDefConfig rrdDef = rrdDefs.get(name);
432                 if (rrdDef == null) {
433                     rrdDef = new RrdDefConfig(name);
434                     rrdDefs.put(name, rrdDef);
435                 }
436
437                 try {
438                     if (property.equals("def")) {
439                         rrdDef.setDef(value);
440                     } else if (property.equals("archives")) {
441                         rrdDef.addArchives(value);
442                     } else if (property.equals("items")) {
443                         rrdDef.addItems(value);
444                     } else {
445                         logger.debug("Unknown property {} : {}", property, value);
446                     }
447                 } catch (IllegalArgumentException e) {
448                     logger.warn("Ignoring illegal configuration: {}", e.getMessage());
449                 }
450             }
451         }
452
453         for (RrdDefConfig rrdDef : rrdDefs.values()) {
454             if (rrdDef != null) {
455                 if (rrdDef.isValid()) {
456                     logger.debug("Created {}", rrdDef);
457                 } else {
458                     logger.info("Removing invalid definition {}", rrdDef);
459                     rrdDefs.remove(rrdDef.name);
460                 }
461             }
462         }
463     }
464
465     private class RrdArchiveDef {
466         public @Nullable ConsolFun fcn;
467         public double xff;
468         public int steps, rows;
469
470         @Override
471         public String toString() {
472             StringBuilder sb = new StringBuilder(" " + fcn);
473             sb.append(" xff = ").append(xff);
474             sb.append(" steps = ").append(steps);
475             sb.append(" rows = ").append(rows);
476             return sb.toString();
477         }
478     }
479
480     private class RrdDefConfig {
481         public String name;
482         public @Nullable DsType dsType;
483         public int heartbeat, step;
484         public double min, max;
485         public List<RrdArchiveDef> archives;
486         public List<String> itemNames;
487
488         private boolean isInitialized;
489
490         public RrdDefConfig(String name) {
491             this.name = name;
492             archives = new ArrayList<>();
493             itemNames = new ArrayList<>();
494             isInitialized = false;
495         }
496
497         public void setDef(String defString) {
498             String[] opts = defString.split(",");
499             if (opts.length != 5) { // check if correct number of parameters
500                 logger.warn("invalid number of parameters {}: {}", name, defString);
501                 return;
502             }
503
504             if (opts[0].equals("ABSOLUTE")) { // dsType
505                 dsType = DsType.ABSOLUTE;
506             } else if (opts[0].equals("COUNTER")) {
507                 dsType = DsType.COUNTER;
508             } else if (opts[0].equals("DERIVE")) {
509                 dsType = DsType.DERIVE;
510             } else if (opts[0].equals("GAUGE")) {
511                 dsType = DsType.GAUGE;
512             } else {
513                 logger.warn("{}: dsType {} not supported", name, opts[0]);
514             }
515
516             heartbeat = Integer.parseInt(opts[1]);
517
518             if (opts[2].equals("U")) {
519                 min = Double.NaN;
520             } else {
521                 min = Double.parseDouble(opts[2]);
522             }
523
524             if (opts[3].equals("U")) {
525                 max = Double.NaN;
526             } else {
527                 max = Double.parseDouble(opts[3]);
528             }
529
530             step = Integer.parseInt(opts[4]);
531
532             isInitialized = true; // successfully initialized
533
534             return;
535         }
536
537         public void addArchives(String archivesString) {
538             String splitArchives[] = archivesString.split(":");
539             for (String archiveString : splitArchives) {
540                 String[] opts = archiveString.split(",");
541                 if (opts.length != 4) { // check if correct number of parameters
542                     logger.warn("invalid number of parameters {}: {}", name, archiveString);
543                     return;
544                 }
545                 RrdArchiveDef arc = new RrdArchiveDef();
546
547                 if (opts[0].equals("AVERAGE")) {
548                     arc.fcn = ConsolFun.AVERAGE;
549                 } else if (opts[0].equals("MIN")) {
550                     arc.fcn = ConsolFun.MIN;
551                 } else if (opts[0].equals("MAX")) {
552                     arc.fcn = ConsolFun.MAX;
553                 } else if (opts[0].equals("LAST")) {
554                     arc.fcn = ConsolFun.LAST;
555                 } else if (opts[0].equals("FIRST")) {
556                     arc.fcn = ConsolFun.FIRST;
557                 } else if (opts[0].equals("TOTAL")) {
558                     arc.fcn = ConsolFun.TOTAL;
559                 } else {
560                     logger.warn("{}: consolidation function  {} not supported", name, opts[0]);
561                 }
562                 arc.xff = Double.parseDouble(opts[1]);
563                 arc.steps = Integer.parseInt(opts[2]);
564                 arc.rows = Integer.parseInt(opts[3]);
565                 archives.add(arc);
566             }
567         }
568
569         public void addItems(String itemsString) {
570             String splitItems[] = itemsString.split(",");
571             for (String item : splitItems) {
572                 itemNames.add(item);
573             }
574         }
575
576         public boolean appliesTo(String item) {
577             return itemNames.contains(item);
578         }
579
580         public boolean isValid() { // a valid configuration must be initialized
581             // and contain at least one function
582             return (isInitialized && (archives.size() > 0));
583         }
584
585         @Override
586         public String toString() {
587             StringBuilder sb = new StringBuilder(name);
588             sb.append(" = ").append(dsType);
589             sb.append(" heartbeat = ").append(heartbeat);
590             sb.append(" min/max = ").append(min).append("/").append(max);
591             sb.append(" step = ").append(step);
592             sb.append(" ").append(archives.size()).append(" archives(s) = [");
593             for (RrdArchiveDef arc : archives) {
594                 sb.append(arc.toString());
595             }
596             sb.append("] ");
597             sb.append(itemNames.size()).append(" items(s) = [");
598             for (String item : itemNames) {
599                 sb.append(item).append(" ");
600             }
601             sb.append("]");
602             return sb.toString();
603         }
604     }
605
606     @Override
607     public List<PersistenceStrategy> getDefaultStrategies() {
608         return List.of(PersistenceStrategy.Globals.RESTORE, PersistenceStrategy.Globals.CHANGE,
609                 new PersistenceCronStrategy("everyMinute", "0 * * * * ?"));
610     }
611 }