]> git.basschouten.com Git - openhab-addons.git/commitdiff
[influxdb] Implement ModifiablePersistenceService (#14959)
authorJ-N-K <github@klug.nrw>
Sat, 13 May 2023 10:37:48 +0000 (12:37 +0200)
committerGitHub <noreply@github.com>
Sat, 13 May 2023 10:37:48 +0000 (12:37 +0200)
* [influxdb] Implement ModifiablePersistenceService

Signed-off-by: Jan N. Klug <github@klug.nrw>
bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/InfluxDBPersistenceService.java
bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/InfluxDBRepository.java
bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/influx1/InfluxDB1RepositoryImpl.java
bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/influx2/InfluxDB2RepositoryImpl.java
bundles/org.openhab.persistence.influxdb/src/test/java/org/openhab/persistence/influxdb/ItemToStorePointCreatorTest.java

index 783624651f480b1cb9d9e4fbb7e3bba051057fa8..4e7dfb33daa4f7af00cf6e02a55927cad0699223 100644 (file)
@@ -42,6 +42,7 @@ import org.openhab.core.items.ItemRegistry;
 import org.openhab.core.items.ItemUtil;
 import org.openhab.core.persistence.FilterCriteria;
 import org.openhab.core.persistence.HistoricItem;
+import org.openhab.core.persistence.ModifiablePersistenceService;
 import org.openhab.core.persistence.PersistenceItemInfo;
 import org.openhab.core.persistence.PersistenceService;
 import org.openhab.core.persistence.QueryablePersistenceService;
@@ -92,7 +93,7 @@ import org.slf4j.LoggerFactory;
         QueryablePersistenceService.class }, configurationPid = "org.openhab.influxdb", //
         property = Constants.SERVICE_PID + "=org.openhab.influxdb")
 @ConfigurableService(category = "persistence", label = "InfluxDB Persistence Service", description_uri = InfluxDBPersistenceService.CONFIG_URI)
-public class InfluxDBPersistenceService implements QueryablePersistenceService {
+public class InfluxDBPersistenceService implements ModifiablePersistenceService {
     public static final String SERVICE_NAME = "influxdb";
 
     private final Logger logger = LoggerFactory.getLogger(InfluxDBPersistenceService.class);
@@ -190,11 +191,20 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService {
 
     @Override
     public void store(Item item, @Nullable String alias) {
+        store(item, ZonedDateTime.now(), item.getState(), alias);
+    }
+
+    @Override
+    public void store(Item item, ZonedDateTime date, State state) {
+        store(item, date, state, null);
+    }
+
+    public void store(Item item, ZonedDateTime date, State state, @Nullable String alias) {
         if (!serviceActivated) {
             logger.warn("InfluxDB service not ready. Storing {} rejected.", item);
             return;
         }
-        convert(item, alias).thenAccept(point -> {
+        convert(item, state, date.toInstant(), null).thenAccept(point -> {
             if (point == null) {
                 logger.trace("Ignoring item {}, conversion to an InfluxDB point failed.", item.getName());
                 return;
@@ -207,6 +217,20 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService {
         });
     }
 
+    @Override
+    public boolean remove(FilterCriteria filter) throws IllegalArgumentException {
+        if (serviceActivated && checkConnection()) {
+            if (filter.getItemName() == null) {
+                logger.warn("Item name is missing in filter {} when trying to remove data.", filter);
+                return false;
+            }
+            return influxDBRepository.remove(filter);
+        } else {
+            logger.debug("Remove query {} ignored, InfluxDB is not connected.", filter);
+            return false;
+        }
+    }
+
     @Override
     public Iterable<HistoricItem> query(FilterCriteria filter) {
         if (serviceActivated && checkConnection()) {
@@ -215,13 +239,12 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService {
                     filter.getItemName(), filter.getOrdering().toString(), filter.getState(), filter.getOperator(),
                     filter.getBeginDate(), filter.getEndDate(), filter.getPageSize(), filter.getPageNumber());
             if (filter.getItemName() == null) {
-                logger.warn("Item name is missing in filter {}", filter);
+                logger.warn("Item name is missing in filter {} when querying data.", filter);
                 return List.of();
             }
-            String query = influxDBRepository.createQueryCreator().createQuery(filter,
+
+            List<InfluxDBRepository.InfluxRow> results = influxDBRepository.query(filter,
                     configuration.getRetentionPolicy());
-            logger.trace("Query {}", query);
-            List<InfluxDBRepository.InfluxRow> results = influxDBRepository.query(query);
             return results.stream().map(this::mapRowToHistoricItem).collect(Collectors.toList());
         } else {
             logger.debug("Query for persisted data ignored, InfluxDB is not connected");
@@ -279,13 +302,12 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService {
      * @return a {@link CompletableFuture} that contains either <code>null</code> for item states that cannot be
      *         converted or the corresponding {@link InfluxPoint}
      */
-    CompletableFuture<@Nullable InfluxPoint> convert(Item item, @Nullable String storeAlias) {
+    CompletableFuture<@Nullable InfluxPoint> convert(Item item, State state, Instant timeStamp,
+            @Nullable String storeAlias) {
         String itemName = item.getName();
         String itemLabel = item.getLabel();
         String category = item.getCategory();
-        State state = item.getState();
         String itemType = item.getType();
-        Instant timeStamp = Instant.now();
 
         if (state instanceof UnDefType) {
             return CompletableFuture.completedFuture(null);
index f955a6b3c1c6a2405eef9d025e7f265d215a353c..f73b151490702a7a8d6f5a06ba99db7e735abed6 100644 (file)
@@ -17,6 +17,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.openhab.core.persistence.FilterCriteria;
 
 /**
  * Manages InfluxDB server interaction maintaining client connection
@@ -61,11 +62,11 @@ public interface InfluxDBRepository {
     /**
      * Executes Flux query
      *
-     * @param query Query
+     * @param filter the query filter
      * @return Query results
      * 
      */
-    List<InfluxRow> query(String query);
+    List<InfluxRow> query(FilterCriteria filter, String retentionPolicy);
 
     /**
      * Write points to database
@@ -76,11 +77,12 @@ public interface InfluxDBRepository {
     boolean write(List<InfluxPoint> influxPoints);
 
     /**
-     * create a query creator on this repository
+     * Execute delete query
      *
-     * @return the query creator for this repository
+     * @param filter the query filter
+     * @return <code>true</code> if query executed successfully, <code>false</code> otherwise
      */
-    FilterCriteriaQueryCreator createQueryCreator();
+    boolean remove(FilterCriteria filter);
 
     record InfluxRow(Instant time, String itemName, Object value) {
     }
index 0319603e5f39bf44fdc379d5968d195488790d42..ef66df495e683ef8e21e5a3c9db9c667086f7112 100644 (file)
@@ -35,6 +35,7 @@ import org.influxdb.dto.Point;
 import org.influxdb.dto.Pong;
 import org.influxdb.dto.Query;
 import org.influxdb.dto.QueryResult;
+import org.openhab.core.persistence.FilterCriteria;
 import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
 import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
 import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
@@ -58,12 +59,14 @@ public class InfluxDB1RepositoryImpl implements InfluxDBRepository {
     private final Logger logger = LoggerFactory.getLogger(InfluxDB1RepositoryImpl.class);
     private final InfluxDBConfiguration configuration;
     private final InfluxDBMetadataService influxDBMetadataService;
+    private final FilterCriteriaQueryCreator queryCreator;
     private @Nullable InfluxDB client;
 
     public InfluxDB1RepositoryImpl(InfluxDBConfiguration configuration,
             InfluxDBMetadataService influxDBMetadataService) {
         this.configuration = configuration;
         this.influxDBMetadataService = influxDBMetadataService;
+        this.queryCreator = new InfluxDB1FilterCriteriaQueryCreatorImpl(configuration, influxDBMetadataService);
     }
 
     @Override
@@ -134,6 +137,12 @@ public class InfluxDB1RepositoryImpl implements InfluxDBRepository {
         return true;
     }
 
+    @Override
+    public boolean remove(FilterCriteria filter) {
+        logger.warn("Removing data is not supported in InfluxDB v1.");
+        return false;
+    }
+
     private Optional<Point> convertPointToClientFormat(InfluxPoint point) {
         Point.Builder clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime().toEpochMilli(),
                 TimeUnit.MILLISECONDS);
@@ -155,9 +164,11 @@ public class InfluxDB1RepositoryImpl implements InfluxDBRepository {
     }
 
     @Override
-    public List<InfluxRow> query(String query) {
+    public List<InfluxRow> query(FilterCriteria filter, String retentionPolicy) {
         final InfluxDB currentClient = client;
         if (currentClient != null) {
+            String query = queryCreator.createQuery(filter, retentionPolicy);
+            logger.trace("Query {}", query);
             Query parsedQuery = new Query(query, configuration.getDatabaseName());
             List<QueryResult.Result> results = currentClient.query(parsedQuery, TimeUnit.MILLISECONDS).getResults();
             return convertClientResultToRepository(results);
@@ -216,9 +227,4 @@ public class InfluxDB1RepositoryImpl implements InfluxDBRepository {
     public Map<String, Integer> getStoredItemsCount() {
         return Collections.emptyMap();
     }
-
-    @Override
-    public FilterCriteriaQueryCreator createQueryCreator() {
-        return new InfluxDB1FilterCriteriaQueryCreatorImpl(configuration, influxDBMetadataService);
-    }
 }
index dade69cf59b86a36c10f936746dd50b0d3af85fd..f675bf8241e5749d3d586fb985360a65fc6cd3bb 100644 (file)
@@ -15,6 +15,8 @@ package org.openhab.persistence.influxdb.internal.influx2;
 import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.*;
 
 import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.time.ZonedDateTime;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -25,6 +27,7 @@ import java.util.stream.Stream;
 
 import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.eclipse.jdt.annotation.Nullable;
+import org.openhab.core.persistence.FilterCriteria;
 import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
 import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
 import org.openhab.persistence.influxdb.internal.InfluxDBConstants;
@@ -34,6 +37,7 @@ import org.openhab.persistence.influxdb.internal.InfluxPoint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.influxdb.client.DeleteApi;
 import com.influxdb.client.InfluxDBClient;
 import com.influxdb.client.InfluxDBClientFactory;
 import com.influxdb.client.InfluxDBClientOptions;
@@ -55,15 +59,18 @@ public class InfluxDB2RepositoryImpl implements InfluxDBRepository {
     private final Logger logger = LoggerFactory.getLogger(InfluxDB2RepositoryImpl.class);
     private final InfluxDBConfiguration configuration;
     private final InfluxDBMetadataService influxDBMetadataService;
+    private final FilterCriteriaQueryCreator queryCreator;
 
     private @Nullable InfluxDBClient client;
     private @Nullable QueryApi queryAPI;
     private @Nullable WriteApi writeAPI;
+    private @Nullable DeleteApi deleteAPI;
 
     public InfluxDB2RepositoryImpl(InfluxDBConfiguration configuration,
             InfluxDBMetadataService influxDBMetadataService) {
         this.configuration = configuration;
         this.influxDBMetadataService = influxDBMetadataService;
+        this.queryCreator = new InfluxDB2FilterCriteriaQueryCreatorImpl(configuration, influxDBMetadataService);
     }
 
     @Override
@@ -88,6 +95,7 @@ public class InfluxDB2RepositoryImpl implements InfluxDBRepository {
 
         queryAPI = createdClient.getQueryApi();
         writeAPI = createdClient.getWriteApi();
+        deleteAPI = createdClient.getDeleteApi();
         logger.debug("Successfully connected to InfluxDB. Instance ready={}", createdClient.ready());
 
         return checkConnectionStatus();
@@ -137,6 +145,42 @@ public class InfluxDB2RepositoryImpl implements InfluxDBRepository {
         return true;
     }
 
+    @Override
+    public boolean remove(FilterCriteria filter) {
+        final DeleteApi currentDeleteApi = deleteAPI;
+        if (currentDeleteApi == null) {
+            return false;
+        }
+
+        if (filter.getState() != null) {
+            logger.warn("Deleting by value is not supported in InfluxDB v2.");
+            return false;
+        }
+        OffsetDateTime start = Objects.requireNonNullElse(filter.getBeginDate(), ZonedDateTime.now().minusYears(100))
+                .toOffsetDateTime();
+        OffsetDateTime stop = Objects.requireNonNullElse(filter.getEndDate(), ZonedDateTime.now().plusYears(100))
+                .toOffsetDateTime();
+
+        // create predicate
+        String predicate = "";
+        String itemName = filter.getItemName();
+        if (itemName != null) {
+            String name = influxDBMetadataService.getMeasurementNameOrDefault(itemName, itemName);
+            String measurementName = configuration.isReplaceUnderscore() ? name.replace('_', '.') : name;
+            predicate = "(_measurement=\"" + measurementName + "\")";
+        }
+
+        try {
+            deleteAPI.delete(start, stop, predicate, configuration.getRetentionPolicy(),
+                    configuration.getDatabaseName());
+        } catch (InfluxException e) {
+            logger.debug("Deleting from database failed", e);
+            return false;
+        }
+
+        return true;
+    }
+
     private Optional<Point> convertPointToClientFormat(InfluxPoint point) {
         Point clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime(), WritePrecision.MS);
         @Nullable
@@ -158,9 +202,11 @@ public class InfluxDB2RepositoryImpl implements InfluxDBRepository {
     }
 
     @Override
-    public List<InfluxRow> query(String query) {
+    public List<InfluxRow> query(FilterCriteria filter, String retentionPolicy) {
         final QueryApi currentQueryAPI = queryAPI;
         if (currentQueryAPI != null) {
+            String query = queryCreator.createQuery(filter, retentionPolicy);
+            logger.trace("Query {}", query);
             List<FluxTable> clientResult = currentQueryAPI.query(query);
             return clientResult.stream().flatMap(this::mapRawResultToHistoric).toList();
         } else {
@@ -204,9 +250,4 @@ public class InfluxDB2RepositoryImpl implements InfluxDBRepository {
             return Collections.emptyMap();
         }
     }
-
-    @Override
-    public FilterCriteriaQueryCreator createQueryCreator() {
-        return new InfluxDB2FilterCriteriaQueryCreatorImpl(configuration, influxDBMetadataService);
-    }
 }
index 397ba8e1dd2c2d94f10baaa55612a9cfadf95a96..8f7387f15b602134014e334b74738b33948a2118 100644 (file)
@@ -18,6 +18,7 @@ import static org.mockito.Mockito.when;
 import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.*;
 
 import java.math.BigDecimal;
+import java.time.Instant;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
@@ -74,7 +75,7 @@ public class ItemToStorePointCreatorTest {
     @MethodSource
     public void convertBasicItem(Number number) throws ExecutionException, InterruptedException {
         NumberItem item = ItemTestHelper.createNumberItem("myitem", number);
-        InfluxPoint point = instance.convert(item, null).get();
+        InfluxPoint point = instance.convert(item, item.getState(), Instant.now(), null).get();
 
         if (point == null) {
             Assertions.fail("'point' is null");
@@ -94,7 +95,7 @@ public class ItemToStorePointCreatorTest {
     @Test
     public void shouldUseAliasAsMeasurementNameIfProvided() throws ExecutionException, InterruptedException {
         NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
-        InfluxPoint point = instance.convert(item, "aliasName").get();
+        InfluxPoint point = instance.convert(item, item.getState(), Instant.now(), "aliasName").get();
 
         if (point == null) {
             Assertions.fail("'point' is null");
@@ -110,7 +111,7 @@ public class ItemToStorePointCreatorTest {
         item.setCategory("categoryValue");
 
         instance = getService(false, true, false, false);
-        InfluxPoint point = instance.convert(item, null).get();
+        InfluxPoint point = instance.convert(item, item.getState(), Instant.now(), null).get();
 
         if (point == null) {
             Assertions.fail("'point' is null");
@@ -120,7 +121,7 @@ public class ItemToStorePointCreatorTest {
         assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_CATEGORY_NAME, "categoryValue"));
 
         instance = getService(false, false, false, false);
-        point = instance.convert(item, null).get();
+        point = instance.convert(item, item.getState(), Instant.now(), null).get();
 
         if (point == null) {
             Assertions.fail("'point' is null");
@@ -135,7 +136,7 @@ public class ItemToStorePointCreatorTest {
         NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
 
         instance = getService(false, false, false, true);
-        InfluxPoint point = instance.convert(item, null).get();
+        InfluxPoint point = instance.convert(item, item.getState(), Instant.now(), null).get();
 
         if (point == null) {
             Assertions.fail("'point' is null");
@@ -145,7 +146,7 @@ public class ItemToStorePointCreatorTest {
         assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_TYPE_NAME, "Number"));
 
         instance = getService(false, false, false, false);
-        point = instance.convert(item, null).get();
+        point = instance.convert(item, item.getState(), Instant.now(), null).get();
 
         if (point == null) {
             Assertions.fail("'point' is null");
@@ -161,7 +162,7 @@ public class ItemToStorePointCreatorTest {
         item.setLabel("ItemLabel");
 
         instance = getService(false, false, true, false);
-        InfluxPoint point = instance.convert(item, null).get();
+        InfluxPoint point = instance.convert(item, item.getState(), Instant.now(), null).get();
 
         if (point == null) {
             Assertions.fail("'point' is null");
@@ -171,7 +172,7 @@ public class ItemToStorePointCreatorTest {
         assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_LABEL_NAME, "ItemLabel"));
 
         instance = getService(false, false, false, false);
-        point = instance.convert(item, null).get();
+        point = instance.convert(item, item.getState(), Instant.now(), null).get();
 
         if (point == null) {
             Assertions.fail("'point' is null");
@@ -189,7 +190,7 @@ public class ItemToStorePointCreatorTest {
         when(metadataRegistry.get(metadataKey))
                 .thenReturn(new Metadata(metadataKey, "", Map.of("key1", "val1", "key2", "val2")));
 
-        InfluxPoint point = instance.convert(item, null).get();
+        InfluxPoint point = instance.convert(item, item.getState(), Instant.now(), null).get();
 
         if (point == null) {
             Assertions.fail("'point' is null");
@@ -205,14 +206,14 @@ public class ItemToStorePointCreatorTest {
         NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
         MetadataKey metadataKey = new MetadataKey(InfluxDBPersistenceService.SERVICE_NAME, item.getName());
 
-        InfluxPoint point = instance.convert(item, null).get();
+        InfluxPoint point = instance.convert(item, item.getState(), Instant.now(), null).get();
         if (point == null) {
             Assertions.fail();
             return;
         }
         assertThat(point.getMeasurementName(), equalTo(item.getName()));
 
-        point = instance.convert(item, null).get();
+        point = instance.convert(item, item.getState(), Instant.now(), null).get();
         if (point == null) {
             Assertions.fail();
             return;
@@ -223,7 +224,7 @@ public class ItemToStorePointCreatorTest {
         when(metadataRegistry.get(metadataKey))
                 .thenReturn(new Metadata(metadataKey, "measurementName", Map.of("key1", "val1", "key2", "val2")));
 
-        point = instance.convert(item, null).get();
+        point = instance.convert(item, item.getState(), Instant.now(), null).get();
         if (point == null) {
             Assertions.fail();
             return;
@@ -234,7 +235,7 @@ public class ItemToStorePointCreatorTest {
         when(metadataRegistry.get(metadataKey))
                 .thenReturn(new Metadata(metadataKey, "", Map.of("key1", "val1", "key2", "val2")));
 
-        point = instance.convert(item, null).get();
+        point = instance.convert(item, item.getState(), Instant.now(), null).get();
         if (point == null) {
             Assertions.fail();
             return;