]> git.basschouten.com Git - openhab-addons.git/commitdiff
[influxdb] Write asynchronously to database and improve performance (#14888)
authorJ-N-K <github@klug.nrw>
Mon, 1 May 2023 09:43:39 +0000 (11:43 +0200)
committerGitHub <noreply@github.com>
Mon, 1 May 2023 09:43:39 +0000 (11:43 +0200)
* [influxdb] Write asynchronously to database

---------

Signed-off-by: Jan N. Klug <github@klug.nrw>
bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/InfluxDBPersistenceService.java
bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/InfluxDBRepository.java
bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/ItemToStorePointCreator.java [deleted file]
bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/influx1/InfluxDB1RepositoryImpl.java
bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/influx2/InfluxDB2RepositoryImpl.java
bundles/org.openhab.persistence.influxdb/src/test/java/org/openhab/persistence/influxdb/InfluxDBPersistenceServiceTest.java [new file with mode: 0644]
bundles/org.openhab.persistence.influxdb/src/test/java/org/openhab/persistence/influxdb/ItemToStorePointCreatorTest.java [new file with mode: 0644]
bundles/org.openhab.persistence.influxdb/src/test/java/org/openhab/persistence/influxdb/internal/InfluxDBPersistenceServiceTest.java [deleted file]
bundles/org.openhab.persistence.influxdb/src/test/java/org/openhab/persistence/influxdb/internal/ItemToStorePointCreatorTest.java [deleted file]

index 31dff022172b6c3b60419921992e6fde63bd344b..168bc8e474735bdef4a01f61f7163d533debfb69 100644 (file)
  */
 package org.openhab.persistence.influxdb;
 
+import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.*;
+
+import java.time.Instant;
 import java.time.ZoneId;
 import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.eclipse.jdt.annotation.Nullable;
+import org.openhab.core.common.ThreadPoolManager;
 import org.openhab.core.config.core.ConfigurableService;
 import org.openhab.core.items.Item;
+import org.openhab.core.items.ItemFactory;
 import org.openhab.core.items.ItemRegistry;
+import org.openhab.core.items.ItemUtil;
 import org.openhab.core.persistence.FilterCriteria;
 import org.openhab.core.persistence.HistoricItem;
 import org.openhab.core.persistence.PersistenceItemInfo;
@@ -32,6 +47,7 @@ import org.openhab.core.persistence.PersistenceService;
 import org.openhab.core.persistence.QueryablePersistenceService;
 import org.openhab.core.persistence.strategy.PersistenceStrategy;
 import org.openhab.core.types.State;
+import org.openhab.core.types.UnDefType;
 import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
 import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
 import org.openhab.persistence.influxdb.internal.InfluxDBHistoricItem;
@@ -40,8 +56,6 @@ import org.openhab.persistence.influxdb.internal.InfluxDBPersistentItemInfo;
 import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
 import org.openhab.persistence.influxdb.internal.InfluxDBStateConvertUtils;
 import org.openhab.persistence.influxdb.internal.InfluxPoint;
-import org.openhab.persistence.influxdb.internal.ItemToStorePointCreator;
-import org.openhab.persistence.influxdb.internal.UnexpectedConditionException;
 import org.openhab.persistence.influxdb.internal.influx1.InfluxDB1RepositoryImpl;
 import org.openhab.persistence.influxdb.internal.influx2.InfluxDB2RepositoryImpl;
 import org.osgi.framework.Constants;
@@ -49,6 +63,8 @@ import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
 import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.ReferencePolicy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -81,6 +97,7 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService {
 
     private final Logger logger = LoggerFactory.getLogger(InfluxDBPersistenceService.class);
 
+    private static final int COMMIT_INTERVAL = 3; // in s
     protected static final String CONFIG_URI = "persistence:influxdb";
 
     // External dependencies
@@ -88,9 +105,16 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService {
     private final InfluxDBMetadataService influxDBMetadataService;
 
     private final InfluxDBConfiguration configuration;
-    private final ItemToStorePointCreator itemToStorePointCreator;
     private final InfluxDBRepository influxDBRepository;
-    private boolean tryReconnection;
+    private boolean serviceActivated;
+
+    // storage
+    private final ScheduledFuture<?> storeJob;
+    private final BlockingQueue<InfluxPoint> pointsQueue = new LinkedBlockingQueue<>();
+
+    // conversion
+    private final Set<ItemFactory> itemFactories = new HashSet<>();
+    private Map<String, Class<? extends State>> desiredClasses = new HashMap<>();
 
     @Activate
     public InfluxDBPersistenceService(final @Reference ItemRegistry itemRegistry,
@@ -101,8 +125,9 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService {
         if (configuration.isValid()) {
             this.influxDBRepository = createInfluxDBRepository();
             this.influxDBRepository.connect();
-            this.itemToStorePointCreator = new ItemToStorePointCreator(configuration, influxDBMetadataService);
-            tryReconnection = true;
+            this.storeJob = ThreadPoolManager.getScheduledPool("org.openhab.influxdb")
+                    .scheduleWithFixedDelay(this::commit, COMMIT_INTERVAL, COMMIT_INTERVAL, TimeUnit.SECONDS);
+            serviceActivated = true;
         } else {
             throw new IllegalArgumentException("Configuration invalid.");
         }
@@ -124,7 +149,15 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService {
      */
     @Deactivate
     public void deactivate() {
-        tryReconnection = false;
+        serviceActivated = false;
+
+        storeJob.cancel(false);
+        commit(); // ensure we at least tried to store the data;
+
+        if (!pointsQueue.isEmpty()) {
+            logger.warn("InfluxDB failed to finally store {} points.", pointsQueue.size());
+        }
+
         influxDBRepository.disconnect();
         logger.info("InfluxDB persistence service stopped.");
     }
@@ -157,26 +190,26 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService {
 
     @Override
     public void store(Item item, @Nullable String alias) {
-        if (checkConnection()) {
-            InfluxPoint point = itemToStorePointCreator.convert(item, alias);
-            if (point != null) {
-                try {
-                    influxDBRepository.write(point);
-                    logger.trace("Stored item {} in InfluxDB point {}", item, point);
-                } catch (UnexpectedConditionException e) {
-                    logger.warn("Failed to store item {} in InfluxDB point {}", point, item);
-                }
+        if (!serviceActivated) {
+            logger.warn("InfluxDB service not ready. Storing {} rejected.", item);
+            return;
+        }
+        convert(item, alias).thenAccept(point -> {
+            if (point == null) {
+                logger.trace("Ignoring item {}, conversion to an InfluxDB point failed.", item.getName());
+                return;
+            }
+            if (pointsQueue.offer(point)) {
+                logger.trace("Queued {} for item {}", point, item);
             } else {
-                logger.trace("Ignoring item {}, conversion to an InfluxDB point failed.", item);
+                logger.warn("Failed to queue {} for item {}", point, item);
             }
-        } else {
-            logger.debug("store ignored, InfluxDB is not connected");
-        }
+        });
     }
 
     @Override
     public Iterable<HistoricItem> query(FilterCriteria filter) {
-        if (checkConnection()) {
+        if (serviceActivated && checkConnection()) {
             logger.trace(
                     "Query-Filter: itemname: {}, ordering: {}, state: {},  operator: {}, getBeginDate: {}, getEndDate: {}, getPageSize: {}, getPageNumber: {}",
                     filter.getItemName(), filter.getOrdering().toString(), filter.getState(), filter.getOperator(),
@@ -211,10 +244,109 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService {
     private boolean checkConnection() {
         if (influxDBRepository.isConnected()) {
             return true;
-        } else if (tryReconnection) {
+        } else if (serviceActivated) {
             logger.debug("Connection lost, trying re-connection");
             return influxDBRepository.connect();
         }
         return false;
     }
+
+    private void commit() {
+        if (!pointsQueue.isEmpty() && checkConnection()) {
+            List<InfluxPoint> points = new ArrayList<>();
+            pointsQueue.drainTo(points);
+            if (!influxDBRepository.write(points)) {
+                logger.warn("Re-queuing {} elements, failed to write batch.", points.size());
+                pointsQueue.addAll(points);
+            } else {
+                logger.trace("Wrote {} elements to database", points.size());
+            }
+        }
+    }
+
+    /**
+     * Convert incoming data to an {@link InfluxPoint} for further processing. This is needed because storage is
+     * asynchronous and the item data may have changed.
+     * <p />
+     * The method is package-private for testing.
+     *
+     * @param item the {@link Item} that needs conversion
+     * @param storeAlias an (optional) alias for the item
+     * @return a {@link CompletableFuture} that contains either <code>null</code> for item states that cannot be
+     *         converted or the corresponding {@link InfluxPoint}
+     */
+    CompletableFuture<@Nullable InfluxPoint> convert(Item item, @Nullable String storeAlias) {
+        String itemName = item.getName();
+        String itemLabel = item.getLabel();
+        String category = item.getCategory();
+        State state = item.getState();
+        String itemType = item.getType();
+        Instant timeStamp = Instant.now();
+
+        if (state instanceof UnDefType) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        return CompletableFuture.supplyAsync(() -> {
+            String measurementName = storeAlias != null && !storeAlias.isBlank() ? storeAlias : itemName;
+            measurementName = influxDBMetadataService.getMeasurementNameOrDefault(itemName, measurementName);
+
+            if (configuration.isReplaceUnderscore()) {
+                measurementName = measurementName.replace('_', '.');
+            }
+
+            State storeState = Objects
+                    .requireNonNullElse(state.as(desiredClasses.get(ItemUtil.getMainItemType(itemType))), state);
+            Object value = InfluxDBStateConvertUtils.stateToObject(storeState);
+
+            InfluxPoint.Builder pointBuilder = InfluxPoint.newBuilder(measurementName).withTime(timeStamp)
+                    .withValue(value).withTag(TAG_ITEM_NAME, itemName);
+
+            if (configuration.isAddCategoryTag()) {
+                String categoryName = Objects.requireNonNullElse(category, "n/a");
+                pointBuilder.withTag(TAG_CATEGORY_NAME, categoryName);
+            }
+
+            if (configuration.isAddTypeTag()) {
+                pointBuilder.withTag(TAG_TYPE_NAME, itemType);
+            }
+
+            if (configuration.isAddLabelTag()) {
+                String labelName = Objects.requireNonNullElse(itemLabel, "n/a");
+                pointBuilder.withTag(TAG_LABEL_NAME, labelName);
+            }
+
+            influxDBMetadataService.getMetaData(itemName)
+                    .ifPresent(metadata -> metadata.getConfiguration().forEach(pointBuilder::withTag));
+
+            return pointBuilder.build();
+        });
+    }
+
+    @Reference(cardinality = ReferenceCardinality.AT_LEAST_ONE, policy = ReferencePolicy.DYNAMIC)
+    public void setItemFactory(ItemFactory itemFactory) {
+        itemFactories.add(itemFactory);
+        calculateItemTypeClasses();
+    }
+
+    public void unsetItemFactory(ItemFactory itemFactory) {
+        itemFactories.remove(itemFactory);
+        calculateItemTypeClasses();
+    }
+
+    private synchronized void calculateItemTypeClasses() {
+        Map<String, Class<? extends State>> desiredClasses = new HashMap<>();
+        itemFactories.forEach(factory -> {
+            for (String itemType : factory.getSupportedItemTypes()) {
+                Item item = factory.createItem(itemType, "influxItem");
+                if (item != null) {
+                    item.getAcceptedCommandTypes().stream()
+                            .filter(commandType -> commandType.isAssignableFrom(State.class)).findFirst()
+                            .map(commandType -> (Class<? extends State>) commandType.asSubclass(State.class))
+                            .ifPresent(desiredClass -> desiredClasses.put(itemType, desiredClass));
+                }
+            }
+        });
+        this.desiredClasses = desiredClasses;
+    }
 }
index efb749a8269e6a75ef24e65cceb4b2e558e832c6..f955a6b3c1c6a2405eef9d025e7f265d215a353c 100644 (file)
@@ -68,12 +68,12 @@ public interface InfluxDBRepository {
     List<InfluxRow> query(String query);
 
     /**
-     * Write point to database
+     * Write points to database
      *
-     * @param influxPoint Point to write
-     * @throws UnexpectedConditionException when an error occurs
+     * @param influxPoints {@link List<InfluxPoint>} to write
+     * @returns <code>true</code> if points have been written, <code>false</code> otherwise
      */
-    void write(InfluxPoint influxPoint) throws UnexpectedConditionException;
+    boolean write(List<InfluxPoint> influxPoints);
 
     /**
      * create a query creator on this repository
diff --git a/bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/ItemToStorePointCreator.java b/bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/ItemToStorePointCreator.java
deleted file mode 100644 (file)
index 25929d8..0000000
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Copyright (c) 2010-2023 Contributors to the openHAB project
- *
- * See the NOTICE file(s) distributed with this work for additional
- * information.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License 2.0 which is available at
- * http://www.eclipse.org/legal/epl-2.0
- *
- * SPDX-License-Identifier: EPL-2.0
- */
-package org.openhab.persistence.influxdb.internal;
-
-import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.TAG_CATEGORY_NAME;
-import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.TAG_ITEM_NAME;
-import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.TAG_LABEL_NAME;
-import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.TAG_TYPE_NAME;
-
-import java.time.Instant;
-import java.util.Objects;
-import java.util.Optional;
-
-import org.eclipse.jdt.annotation.NonNullByDefault;
-import org.eclipse.jdt.annotation.Nullable;
-import org.openhab.core.items.Item;
-import org.openhab.core.types.State;
-import org.openhab.core.types.UnDefType;
-
-/**
- * Logic to create an InfluxDB {@link InfluxPoint} from an openHAB {@link Item}
- *
- * @author Joan Pujol Espinar - Initial contribution
- */
-@NonNullByDefault
-public class ItemToStorePointCreator {
-    private final InfluxDBConfiguration configuration;
-    private final InfluxDBMetadataService influxDBMetadataService;
-
-    public ItemToStorePointCreator(InfluxDBConfiguration configuration,
-            InfluxDBMetadataService influxDBMetadataService) {
-        this.configuration = configuration;
-        this.influxDBMetadataService = influxDBMetadataService;
-    }
-
-    public @Nullable InfluxPoint convert(Item item, @Nullable String storeAlias) {
-        if (item.getState() instanceof UnDefType) {
-            return null;
-        }
-
-        String measurementName = calculateMeasurementName(item, storeAlias);
-        String itemName = item.getName();
-        State state = getItemState(item);
-
-        Object value = InfluxDBStateConvertUtils.stateToObject(state);
-
-        InfluxPoint.Builder pointBuilder = InfluxPoint.newBuilder(measurementName).withTime(Instant.now())
-                .withValue(value).withTag(TAG_ITEM_NAME, itemName);
-
-        addPointTags(item, pointBuilder);
-
-        return pointBuilder.build();
-    }
-
-    private String calculateMeasurementName(Item item, @Nullable String storeAlias) {
-        String name = storeAlias != null && !storeAlias.isBlank() ? storeAlias : item.getName();
-        name = influxDBMetadataService.getMeasurementNameOrDefault(item.getName(), name);
-
-        if (configuration.isReplaceUnderscore()) {
-            name = name.replace('_', '.');
-        }
-
-        return name;
-    }
-
-    private State getItemState(Item item) {
-        return calculateDesiredTypeConversionToStore(item)
-                .map(desiredClass -> Objects.requireNonNullElseGet(item.getStateAs(desiredClass), item::getState))
-                .orElseGet(item::getState);
-    }
-
-    private Optional<Class<? extends State>> calculateDesiredTypeConversionToStore(Item item) {
-        return item.getAcceptedCommandTypes().stream().filter(commandType -> commandType.isAssignableFrom(State.class))
-                .findFirst().map(commandType -> commandType.asSubclass(State.class));
-    }
-
-    private void addPointTags(Item item, InfluxPoint.Builder pointBuilder) {
-        if (configuration.isAddCategoryTag()) {
-            String categoryName = Objects.requireNonNullElse(item.getCategory(), "n/a");
-            pointBuilder.withTag(TAG_CATEGORY_NAME, categoryName);
-        }
-
-        if (configuration.isAddTypeTag()) {
-            pointBuilder.withTag(TAG_TYPE_NAME, item.getType());
-        }
-
-        if (configuration.isAddLabelTag()) {
-            String labelName = Objects.requireNonNullElse(item.getLabel(), "n/a");
-            pointBuilder.withTag(TAG_LABEL_NAME, labelName);
-        }
-
-        influxDBMetadataService.getMetaData(item.getName())
-                .ifPresent(metadata -> metadata.getConfiguration().forEach(pointBuilder::withTag));
-    }
-}
index dda799785e65df11a1200a2214ca299659f0ade3..0319603e5f39bf44fdc379d5968d195488790d42 100644 (file)
@@ -23,12 +23,14 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
 import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.eclipse.jdt.annotation.Nullable;
 import org.influxdb.InfluxDB;
 import org.influxdb.InfluxDBFactory;
+import org.influxdb.dto.BatchPoints;
 import org.influxdb.dto.Point;
 import org.influxdb.dto.Pong;
 import org.influxdb.dto.Query;
@@ -38,10 +40,11 @@ import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
 import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
 import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
 import org.openhab.persistence.influxdb.internal.InfluxPoint;
-import org.openhab.persistence.influxdb.internal.UnexpectedConditionException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.influxdb.exceptions.InfluxException;
+
 /**
  * Implementation of {@link InfluxDBRepository} for InfluxDB 1.0
  *
@@ -113,17 +116,25 @@ public class InfluxDB1RepositoryImpl implements InfluxDBRepository {
     }
 
     @Override
-    public void write(InfluxPoint point) throws UnexpectedConditionException {
+    public boolean write(List<InfluxPoint> influxPoints) {
         final InfluxDB currentClient = this.client;
-        if (currentClient != null) {
-            Point clientPoint = convertPointToClientFormat(point);
-            currentClient.write(configuration.getDatabaseName(), configuration.getRetentionPolicy(), clientPoint);
-        } else {
-            logger.warn("Write point {} ignored due to client isn't connected", point);
+        if (currentClient == null) {
+            return false;
+        }
+        try {
+            List<Point> points = influxPoints.stream().map(this::convertPointToClientFormat).filter(Optional::isPresent)
+                    .map(Optional::get).toList();
+            BatchPoints batchPoints = BatchPoints.database(configuration.getDatabaseName())
+                    .retentionPolicy(configuration.getRetentionPolicy()).points(points).build();
+            currentClient.write(batchPoints);
+        } catch (InfluxException e) {
+            logger.debug("Writing to database failed", e);
+            return false;
         }
+        return true;
     }
 
-    private Point convertPointToClientFormat(InfluxPoint point) throws UnexpectedConditionException {
+    private Optional<Point> convertPointToClientFormat(InfluxPoint point) {
         Point.Builder clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime().toEpochMilli(),
                 TimeUnit.MILLISECONDS);
         Object value = point.getValue();
@@ -136,10 +147,11 @@ public class InfluxDB1RepositoryImpl implements InfluxDBRepository {
         } else if (value == null) {
             clientPoint.addField(FIELD_VALUE_NAME, "null");
         } else {
-            throw new UnexpectedConditionException("Not expected value type");
+            logger.warn("Could not convert {}, discarding this datapoint", point);
+            return Optional.empty();
         }
         point.getTags().forEach(clientPoint::tag);
-        return clientPoint.build();
+        return Optional.of(clientPoint.build());
     }
 
     @Override
index 43d578d727ded9781c4192a70031fe5a3a691696..dade69cf59b86a36c10f936746dd50b0d3af85fd 100644 (file)
@@ -20,6 +20,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.stream.Stream;
 
 import org.eclipse.jdt.annotation.NonNullByDefault;
@@ -30,7 +31,6 @@ import org.openhab.persistence.influxdb.internal.InfluxDBConstants;
 import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
 import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
 import org.openhab.persistence.influxdb.internal.InfluxPoint;
-import org.openhab.persistence.influxdb.internal.UnexpectedConditionException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,6 +42,7 @@ import com.influxdb.client.WriteApi;
 import com.influxdb.client.domain.Ready;
 import com.influxdb.client.domain.WritePrecision;
 import com.influxdb.client.write.Point;
+import com.influxdb.exceptions.InfluxException;
 import com.influxdb.query.FluxTable;
 
 /**
@@ -120,34 +121,40 @@ public class InfluxDB2RepositoryImpl implements InfluxDBRepository {
     }
 
     @Override
-    public void write(InfluxPoint point) throws UnexpectedConditionException {
+    public boolean write(List<InfluxPoint> influxPoints) {
         final WriteApi currentWriteAPI = writeAPI;
-        if (currentWriteAPI != null) {
-            currentWriteAPI.writePoint(convertPointToClientFormat(point));
-        } else {
-            logger.warn("Write point {} ignored due to writeAPI isn't present", point);
+        if (currentWriteAPI == null) {
+            return false;
+        }
+        try {
+            List<Point> clientPoints = influxPoints.stream().map(this::convertPointToClientFormat)
+                    .filter(Optional::isPresent).map(Optional::get).toList();
+            currentWriteAPI.writePoints(clientPoints);
+        } catch (InfluxException e) {
+            logger.debug("Writing to database failed", e);
+            return false;
         }
+        return true;
     }
 
-    private Point convertPointToClientFormat(InfluxPoint point) throws UnexpectedConditionException {
+    private Optional<Point> convertPointToClientFormat(InfluxPoint point) {
         Point clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime(), WritePrecision.MS);
-        setPointValue(point.getValue(), clientPoint);
-        point.getTags().forEach(clientPoint::addTag);
-        return clientPoint;
-    }
-
-    private void setPointValue(@Nullable Object value, Point point) throws UnexpectedConditionException {
+        @Nullable
+        Object value = point.getValue();
         if (value instanceof String) {
-            point.addField(FIELD_VALUE_NAME, (String) value);
+            clientPoint.addField(FIELD_VALUE_NAME, (String) value);
         } else if (value instanceof Number) {
-            point.addField(FIELD_VALUE_NAME, (Number) value);
+            clientPoint.addField(FIELD_VALUE_NAME, (Number) value);
         } else if (value instanceof Boolean) {
-            point.addField(FIELD_VALUE_NAME, (Boolean) value);
+            clientPoint.addField(FIELD_VALUE_NAME, (Boolean) value);
         } else if (value == null) {
-            point.addField(FIELD_VALUE_NAME, (String) null);
+            clientPoint.addField(FIELD_VALUE_NAME, (String) null);
         } else {
-            throw new UnexpectedConditionException("Not expected value type");
+            logger.warn("Could not convert {}, discarding this datapoint)", clientPoint);
+            return Optional.empty();
         }
+        point.getTags().forEach(clientPoint::addTag);
+        return Optional.of(clientPoint);
     }
 
     @Override
diff --git a/bundles/org.openhab.persistence.influxdb/src/test/java/org/openhab/persistence/influxdb/InfluxDBPersistenceServiceTest.java b/bundles/org.openhab.persistence.influxdb/src/test/java/org/openhab/persistence/influxdb/InfluxDBPersistenceServiceTest.java
new file mode 100644 (file)
index 0000000..2207d35
--- /dev/null
@@ -0,0 +1,135 @@
+/**
+ * Copyright (c) 2010-2023 Contributors to the openHAB project
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.openhab.persistence.influxdb;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.DATABASE_PARAM;
+import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.PASSWORD_PARAM;
+import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.RETENTION_POLICY_PARAM;
+import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.TOKEN_PARAM;
+import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.URL_PARAM;
+import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.USER_PARAM;
+import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.VERSION_PARAM;
+
+import java.util.Map;
+
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+import org.openhab.core.items.ItemRegistry;
+import org.openhab.core.items.MetadataRegistry;
+import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
+import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
+import org.openhab.persistence.influxdb.internal.InfluxDBVersion;
+import org.openhab.persistence.influxdb.internal.ItemTestHelper;
+import org.openhab.persistence.influxdb.internal.UnexpectedConditionException;
+
+/**
+ * @author Joan Pujol Espinar - Initial contribution
+ */
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
+@NonNullByDefault
+public class InfluxDBPersistenceServiceTest {
+    private static final Map<String, Object> VALID_V1_CONFIGURATION = Map.of( //
+            URL_PARAM, "http://localhost:8086", //
+            VERSION_PARAM, InfluxDBVersion.V1.name(), //
+            USER_PARAM, "user", PASSWORD_PARAM, "password", //
+            DATABASE_PARAM, "openhab", //
+            RETENTION_POLICY_PARAM, "default");
+
+    private static final Map<String, Object> VALID_V2_CONFIGURATION = Map.of( //
+            URL_PARAM, "http://localhost:8086", //
+            VERSION_PARAM, InfluxDBVersion.V2.name(), //
+            TOKEN_PARAM, "sampletoken", //
+            DATABASE_PARAM, "openhab", //
+            RETENTION_POLICY_PARAM, "default");
+
+    private static final Map<String, Object> INVALID_V1_CONFIGURATION = Map.of(//
+            URL_PARAM, "http://localhost:8086", //
+            VERSION_PARAM, InfluxDBVersion.V1.name(), //
+            USER_PARAM, "user", //
+            DATABASE_PARAM, "openhab", //
+            RETENTION_POLICY_PARAM, "default");
+
+    private static final Map<String, Object> INVALID_V2_CONFIGURATION = Map.of( //
+            URL_PARAM, "http://localhost:8086", //
+            VERSION_PARAM, InfluxDBVersion.V2.name(), //
+            DATABASE_PARAM, "openhab", //
+            RETENTION_POLICY_PARAM, "default");
+
+    private @Mock @NonNullByDefault({}) InfluxDBRepository influxDBRepositoryMock;
+
+    private final InfluxDBMetadataService influxDBMetadataService = new InfluxDBMetadataService(
+            mock(MetadataRegistry.class));
+
+    @Test
+    public void activateWithValidV1ConfigShouldConnectRepository() {
+        getService(VALID_V1_CONFIGURATION);
+        verify(influxDBRepositoryMock).connect();
+    }
+
+    @Test
+    public void activateWithValidV2ConfigShouldConnectRepository() {
+        getService(VALID_V2_CONFIGURATION);
+        verify(influxDBRepositoryMock).connect();
+    }
+
+    @Test
+    public void activateWithInvalidV1ConfigShouldFail() {
+        assertThrows(IllegalArgumentException.class, () -> getService(INVALID_V1_CONFIGURATION));
+    }
+
+    @Test
+    public void activateWithInvalidV2ShouldFail() {
+        assertThrows(IllegalArgumentException.class, () -> getService(INVALID_V2_CONFIGURATION));
+    }
+
+    @Test
+    public void deactivateShouldDisconnectRepository() {
+        InfluxDBPersistenceService instance = getService(VALID_V2_CONFIGURATION);
+        instance.deactivate();
+        verify(influxDBRepositoryMock).disconnect();
+    }
+
+    @Test
+    public void storeItemWithConnectedRepository() throws UnexpectedConditionException {
+        InfluxDBPersistenceService instance = getService(VALID_V2_CONFIGURATION);
+        when(influxDBRepositoryMock.isConnected()).thenReturn(true);
+        instance.store(ItemTestHelper.createNumberItem("number", 5));
+        verify(influxDBRepositoryMock, timeout(5000)).write(any());
+    }
+
+    @Test
+    public void storeItemWithDisconnectedRepositoryIsIgnored() throws UnexpectedConditionException {
+        InfluxDBPersistenceService instance = getService(VALID_V2_CONFIGURATION);
+        when(influxDBRepositoryMock.isConnected()).thenReturn(false);
+        instance.store(ItemTestHelper.createNumberItem("number", 5));
+        verify(influxDBRepositoryMock, never()).write(any());
+    }
+
+    private InfluxDBPersistenceService getService(Map<String, Object> config) {
+        return new InfluxDBPersistenceService(mock(ItemRegistry.class), influxDBMetadataService, config) {
+            @Override
+            protected InfluxDBRepository createInfluxDBRepository() {
+                return influxDBRepositoryMock;
+            }
+        };
+    }
+}
diff --git a/bundles/org.openhab.persistence.influxdb/src/test/java/org/openhab/persistence/influxdb/ItemToStorePointCreatorTest.java b/bundles/org.openhab.persistence.influxdb/src/test/java/org/openhab/persistence/influxdb/ItemToStorePointCreatorTest.java
new file mode 100644 (file)
index 0000000..10a653b
--- /dev/null
@@ -0,0 +1,261 @@
+/**
+ * Copyright (c) 2010-2023 Contributors to the openHAB project
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.openhab.persistence.influxdb;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.*;
+import static org.mockito.Mockito.when;
+import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.*;
+
+import java.math.BigDecimal;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Stream;
+
+import org.eclipse.jdt.annotation.DefaultLocation;
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.openhab.core.items.ItemRegistry;
+import org.openhab.core.items.Metadata;
+import org.openhab.core.items.MetadataKey;
+import org.openhab.core.items.MetadataRegistry;
+import org.openhab.core.library.CoreItemFactory;
+import org.openhab.core.library.items.NumberItem;
+import org.openhab.persistence.influxdb.internal.InfluxDBConstants;
+import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
+import org.openhab.persistence.influxdb.internal.InfluxDBVersion;
+import org.openhab.persistence.influxdb.internal.InfluxPoint;
+import org.openhab.persistence.influxdb.internal.ItemTestHelper;
+
+/**
+ * @author Joan Pujol Espinar - Initial contribution
+ */
+@ExtendWith(MockitoExtension.class)
+@NonNullByDefault(value = { DefaultLocation.PARAMETER, DefaultLocation.RETURN_TYPE })
+public class ItemToStorePointCreatorTest {
+
+    private static final Map<String, Object> BASE_CONFIGURATION = Map.of( //
+            URL_PARAM, "http://localhost:8086", //
+            VERSION_PARAM, InfluxDBVersion.V1.name(), //
+            USER_PARAM, "user", PASSWORD_PARAM, "password", //
+            DATABASE_PARAM, "openhab", //
+            RETENTION_POLICY_PARAM, "default");
+
+    private @Mock ItemRegistry itemRegistryMock;
+    private @Mock MetadataRegistry metadataRegistry;
+    private InfluxDBPersistenceService instance;
+
+    @BeforeEach
+    public void setup() {
+        instance = getService(false, false, false, false);
+    }
+
+    @ParameterizedTest
+    @MethodSource
+    public void convertBasicItem(Number number) throws ExecutionException, InterruptedException {
+        NumberItem item = ItemTestHelper.createNumberItem("myitem", number);
+        InfluxPoint point = instance.convert(item, null).get();
+
+        if (point == null) {
+            Assertions.fail("'point' is null");
+            return;
+        }
+
+        assertThat(point.getMeasurementName(), equalTo(item.getName()));
+        assertThat("Must Store item name", point.getTags(), hasEntry("item", item.getName()));
+        assertThat(point.getValue(), equalTo(new BigDecimal(number.toString())));
+    }
+
+    @SuppressWarnings("unused")
+    private static Stream<Number> convertBasicItem() {
+        return Stream.of(5, 5.5, 5L);
+    }
+
+    @Test
+    public void shouldUseAliasAsMeasurementNameIfProvided() throws ExecutionException, InterruptedException {
+        NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
+        InfluxPoint point = instance.convert(item, "aliasName").get();
+
+        if (point == null) {
+            Assertions.fail("'point' is null");
+            return;
+        }
+
+        assertThat(point.getMeasurementName(), is("aliasName"));
+    }
+
+    @Test
+    public void shouldStoreCategoryTagIfProvidedAndConfigured() throws ExecutionException, InterruptedException {
+        NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
+        item.setCategory("categoryValue");
+
+        instance = getService(false, true, false, false);
+        InfluxPoint point = instance.convert(item, null).get();
+
+        if (point == null) {
+            Assertions.fail("'point' is null");
+            return;
+        }
+
+        assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_CATEGORY_NAME, "categoryValue"));
+
+        instance = getService(false, false, false, false);
+        point = instance.convert(item, null).get();
+
+        if (point == null) {
+            Assertions.fail("'point' is null");
+            return;
+        }
+
+        assertThat(point.getTags(), not(hasKey(InfluxDBConstants.TAG_CATEGORY_NAME)));
+    }
+
+    @Test
+    public void shouldStoreTypeTagIfProvidedAndConfigured() throws ExecutionException, InterruptedException {
+        NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
+
+        instance = getService(false, false, false, true);
+        InfluxPoint point = instance.convert(item, null).get();
+
+        if (point == null) {
+            Assertions.fail("'point' is null");
+            return;
+        }
+
+        assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_TYPE_NAME, "Number"));
+
+        instance = getService(false, false, false, false);
+        point = instance.convert(item, null).get();
+
+        if (point == null) {
+            Assertions.fail("'point' is null");
+            return;
+        }
+
+        assertThat(point.getTags(), not(hasKey(InfluxDBConstants.TAG_TYPE_NAME)));
+    }
+
+    @Test
+    public void shouldStoreTypeLabelIfProvidedAndConfigured() throws ExecutionException, InterruptedException {
+        NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
+        item.setLabel("ItemLabel");
+
+        instance = getService(false, false, true, false);
+        InfluxPoint point = instance.convert(item, null).get();
+
+        if (point == null) {
+            Assertions.fail("'point' is null");
+            return;
+        }
+
+        assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_LABEL_NAME, "ItemLabel"));
+
+        instance = getService(false, false, false, false);
+        point = instance.convert(item, null).get();
+
+        if (point == null) {
+            Assertions.fail("'point' is null");
+            return;
+        }
+
+        assertThat(point.getTags(), not(hasKey(InfluxDBConstants.TAG_LABEL_NAME)));
+    }
+
+    @Test
+    public void shouldStoreMetadataAsTagsIfProvided() throws ExecutionException, InterruptedException {
+        NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
+        MetadataKey metadataKey = new MetadataKey(InfluxDBPersistenceService.SERVICE_NAME, item.getName());
+
+        when(metadataRegistry.get(metadataKey))
+                .thenReturn(new Metadata(metadataKey, "", Map.of("key1", "val1", "key2", "val2")));
+
+        InfluxPoint point = instance.convert(item, null).get();
+
+        if (point == null) {
+            Assertions.fail("'point' is null");
+            return;
+        }
+
+        assertThat(point.getTags(), hasEntry("key1", "val1"));
+        assertThat(point.getTags(), hasEntry("key2", "val2"));
+    }
+
+    @Test
+    public void shouldUseMeasurementNameFromMetadataIfProvided() throws ExecutionException, InterruptedException {
+        NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
+        MetadataKey metadataKey = new MetadataKey(InfluxDBPersistenceService.SERVICE_NAME, item.getName());
+
+        InfluxPoint point = instance.convert(item, null).get();
+        if (point == null) {
+            Assertions.fail();
+            return;
+        }
+        assertThat(point.getMeasurementName(), equalTo(item.getName()));
+
+        point = instance.convert(item, null).get();
+        if (point == null) {
+            Assertions.fail();
+            return;
+        }
+        assertThat(point.getMeasurementName(), equalTo(item.getName()));
+        assertThat(point.getTags(), hasEntry("item", item.getName()));
+
+        when(metadataRegistry.get(metadataKey))
+                .thenReturn(new Metadata(metadataKey, "measurementName", Map.of("key1", "val1", "key2", "val2")));
+
+        point = instance.convert(item, null).get();
+        if (point == null) {
+            Assertions.fail();
+            return;
+        }
+        assertThat(point.getMeasurementName(), equalTo("measurementName"));
+        assertThat(point.getTags(), hasEntry("item", item.getName()));
+
+        when(metadataRegistry.get(metadataKey))
+                .thenReturn(new Metadata(metadataKey, "", Map.of("key1", "val1", "key2", "val2")));
+
+        point = instance.convert(item, null).get();
+        if (point == null) {
+            Assertions.fail();
+            return;
+        }
+        assertThat(point.getMeasurementName(), equalTo(item.getName()));
+        assertThat(point.getTags(), hasEntry("item", item.getName()));
+    }
+
+    private InfluxDBPersistenceService getService(boolean replaceUnderscore, boolean category, boolean label,
+            boolean typeTag) {
+        InfluxDBMetadataService influxDBMetadataService = new InfluxDBMetadataService(metadataRegistry);
+
+        Map<String, Object> configuration = new HashMap<>();
+        configuration.putAll(BASE_CONFIGURATION);
+        configuration.put(REPLACE_UNDERSCORE_PARAM, replaceUnderscore);
+        configuration.put(ADD_CATEGORY_TAG_PARAM, category);
+        configuration.put(ADD_LABEL_TAG_PARAM, label);
+        configuration.put(ADD_TYPE_TAG_PARAM, typeTag);
+
+        InfluxDBPersistenceService instance = new InfluxDBPersistenceService(itemRegistryMock, influxDBMetadataService,
+                configuration);
+        instance.setItemFactory(new CoreItemFactory());
+
+        return instance;
+    }
+}
diff --git a/bundles/org.openhab.persistence.influxdb/src/test/java/org/openhab/persistence/influxdb/internal/InfluxDBPersistenceServiceTest.java b/bundles/org.openhab.persistence.influxdb/src/test/java/org/openhab/persistence/influxdb/internal/InfluxDBPersistenceServiceTest.java
deleted file mode 100644 (file)
index 05ead8f..0000000
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Copyright (c) 2010-2023 Contributors to the openHAB project
- *
- * See the NOTICE file(s) distributed with this work for additional
- * information.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License 2.0 which is available at
- * http://www.eclipse.org/legal/epl-2.0
- *
- * SPDX-License-Identifier: EPL-2.0
- */
-package org.openhab.persistence.influxdb.internal;
-
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.*;
-import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.DATABASE_PARAM;
-import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.PASSWORD_PARAM;
-import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.RETENTION_POLICY_PARAM;
-import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.TOKEN_PARAM;
-import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.URL_PARAM;
-import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.USER_PARAM;
-import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.VERSION_PARAM;
-
-import java.util.Map;
-
-import org.eclipse.jdt.annotation.NonNullByDefault;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.openhab.core.items.ItemRegistry;
-import org.openhab.core.items.MetadataRegistry;
-import org.openhab.persistence.influxdb.InfluxDBPersistenceService;
-
-/**
- * @author Joan Pujol Espinar - Initial contribution
- */
-@ExtendWith(MockitoExtension.class)
-@NonNullByDefault
-public class InfluxDBPersistenceServiceTest {
-    private static final Map<String, Object> VALID_V1_CONFIGURATION = Map.of( //
-            URL_PARAM, "http://localhost:8086", //
-            VERSION_PARAM, InfluxDBVersion.V1.name(), //
-            USER_PARAM, "user", PASSWORD_PARAM, "password", //
-            DATABASE_PARAM, "openhab", //
-            RETENTION_POLICY_PARAM, "default");
-
-    private static final Map<String, Object> VALID_V2_CONFIGURATION = Map.of( //
-            URL_PARAM, "http://localhost:8086", //
-            VERSION_PARAM, InfluxDBVersion.V2.name(), //
-            TOKEN_PARAM, "sampletoken", //
-            DATABASE_PARAM, "openhab", //
-            RETENTION_POLICY_PARAM, "default");
-
-    private static final Map<String, Object> INVALID_V1_CONFIGURATION = Map.of(//
-            URL_PARAM, "http://localhost:8086", //
-            VERSION_PARAM, InfluxDBVersion.V1.name(), //
-            USER_PARAM, "user", //
-            DATABASE_PARAM, "openhab", //
-            RETENTION_POLICY_PARAM, "default");
-
-    private static final Map<String, Object> INVALID_V2_CONFIGURATION = Map.of( //
-            URL_PARAM, "http://localhost:8086", //
-            VERSION_PARAM, InfluxDBVersion.V2.name(), //
-            DATABASE_PARAM, "openhab", //
-            RETENTION_POLICY_PARAM, "default");
-
-    private @Mock @NonNullByDefault({}) InfluxDBRepository influxDBRepositoryMock;
-
-    private final InfluxDBMetadataService influxDBMetadataService = new InfluxDBMetadataService(
-            mock(MetadataRegistry.class));
-
-    @Test
-    public void activateWithValidV1ConfigShouldConnectRepository() {
-        getService(VALID_V1_CONFIGURATION);
-        verify(influxDBRepositoryMock).connect();
-    }
-
-    @Test
-    public void activateWithValidV2ConfigShouldConnectRepository() {
-        getService(VALID_V2_CONFIGURATION);
-        verify(influxDBRepositoryMock).connect();
-    }
-
-    @Test
-    public void activateWithInvalidV1ConfigShouldFail() {
-        assertThrows(IllegalArgumentException.class, () -> getService(INVALID_V1_CONFIGURATION));
-    }
-
-    @Test
-    public void activateWithInvalidV2ShouldFail() {
-        assertThrows(IllegalArgumentException.class, () -> getService(INVALID_V2_CONFIGURATION));
-    }
-
-    @Test
-    public void deactivateShouldDisconnectRepository() {
-        InfluxDBPersistenceService instance = getService(VALID_V2_CONFIGURATION);
-        instance.deactivate();
-        verify(influxDBRepositoryMock).disconnect();
-    }
-
-    @Test
-    public void storeItemWithConnectedRepository() throws UnexpectedConditionException {
-        InfluxDBPersistenceService instance = getService(VALID_V2_CONFIGURATION);
-        when(influxDBRepositoryMock.isConnected()).thenReturn(true);
-        instance.store(ItemTestHelper.createNumberItem("number", 5));
-        verify(influxDBRepositoryMock).write(any());
-    }
-
-    @Test
-    public void storeItemWithDisconnectedRepositoryIsIgnored() throws UnexpectedConditionException {
-        InfluxDBPersistenceService instance = getService(VALID_V2_CONFIGURATION);
-        when(influxDBRepositoryMock.isConnected()).thenReturn(false);
-        instance.store(ItemTestHelper.createNumberItem("number", 5));
-        verify(influxDBRepositoryMock, never()).write(any());
-    }
-
-    private InfluxDBPersistenceService getService(Map<String, Object> config) {
-        return new InfluxDBPersistenceService(mock(ItemRegistry.class), influxDBMetadataService, config) {
-            @Override
-            protected InfluxDBRepository createInfluxDBRepository() {
-                return influxDBRepositoryMock;
-            }
-        };
-    }
-}
diff --git a/bundles/org.openhab.persistence.influxdb/src/test/java/org/openhab/persistence/influxdb/internal/ItemToStorePointCreatorTest.java b/bundles/org.openhab.persistence.influxdb/src/test/java/org/openhab/persistence/influxdb/internal/ItemToStorePointCreatorTest.java
deleted file mode 100644 (file)
index 364489c..0000000
+++ /dev/null
@@ -1,241 +0,0 @@
-/**
- * Copyright (c) 2010-2023 Contributors to the openHAB project
- *
- * See the NOTICE file(s) distributed with this work for additional
- * information.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License 2.0 which is available at
- * http://www.eclipse.org/legal/epl-2.0
- *
- * SPDX-License-Identifier: EPL-2.0
- */
-package org.openhab.persistence.influxdb.internal;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.*;
-import static org.mockito.Mockito.when;
-
-import java.math.BigDecimal;
-import java.util.Map;
-import java.util.stream.Stream;
-
-import org.eclipse.jdt.annotation.DefaultLocation;
-import org.eclipse.jdt.annotation.NonNullByDefault;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.MethodSource;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.openhab.core.items.Metadata;
-import org.openhab.core.items.MetadataKey;
-import org.openhab.core.items.MetadataRegistry;
-import org.openhab.core.library.items.NumberItem;
-import org.openhab.persistence.influxdb.InfluxDBPersistenceService;
-
-/**
- * @author Joan Pujol Espinar - Initial contribution
- */
-@ExtendWith(MockitoExtension.class)
-@NonNullByDefault(value = { DefaultLocation.PARAMETER, DefaultLocation.RETURN_TYPE })
-public class ItemToStorePointCreatorTest {
-
-    private @Mock InfluxDBConfiguration influxDBConfiguration;
-    private @Mock MetadataRegistry metadataRegistry;
-    private ItemToStorePointCreator instance;
-
-    @BeforeEach
-    public void before() {
-        InfluxDBMetadataService influxDBMetadataService = new InfluxDBMetadataService(metadataRegistry);
-        when(influxDBConfiguration.isAddCategoryTag()).thenReturn(false);
-        when(influxDBConfiguration.isAddLabelTag()).thenReturn(false);
-        when(influxDBConfiguration.isAddTypeTag()).thenReturn(false);
-        when(influxDBConfiguration.isReplaceUnderscore()).thenReturn(false);
-
-        instance = new ItemToStorePointCreator(influxDBConfiguration, influxDBMetadataService);
-    }
-
-    @AfterEach
-    public void after() {
-        instance = null;
-        influxDBConfiguration = null;
-        metadataRegistry = null;
-    }
-
-    @ParameterizedTest
-    @MethodSource
-    public void convertBasicItem(Number number) {
-        NumberItem item = ItemTestHelper.createNumberItem("myitem", number);
-        InfluxPoint point = instance.convert(item, null);
-
-        if (point == null) {
-            Assertions.fail("'point' is null");
-            return;
-        }
-
-        assertThat(point.getMeasurementName(), equalTo(item.getName()));
-        assertThat("Must Store item name", point.getTags(), hasEntry("item", item.getName()));
-        assertThat(point.getValue(), equalTo(new BigDecimal(number.toString())));
-    }
-
-    @SuppressWarnings("unused")
-    private static Stream<Number> convertBasicItem() {
-        return Stream.of(5, 5.5, 5L);
-    }
-
-    @Test
-    public void shouldUseAliasAsMeasurementNameIfProvided() {
-        NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
-        InfluxPoint point = instance.convert(item, "aliasName");
-
-        if (point == null) {
-            Assertions.fail("'point' is null");
-            return;
-        }
-
-        assertThat(point.getMeasurementName(), is("aliasName"));
-    }
-
-    @Test
-    public void shouldStoreCategoryTagIfProvidedAndConfigured() {
-        NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
-        item.setCategory("categoryValue");
-
-        when(influxDBConfiguration.isAddCategoryTag()).thenReturn(true);
-        InfluxPoint point = instance.convert(item, null);
-
-        if (point == null) {
-            Assertions.fail("'point' is null");
-            return;
-        }
-
-        assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_CATEGORY_NAME, "categoryValue"));
-
-        when(influxDBConfiguration.isAddCategoryTag()).thenReturn(false);
-        point = instance.convert(item, null);
-
-        if (point == null) {
-            Assertions.fail("'point' is null");
-            return;
-        }
-
-        assertThat(point.getTags(), not(hasKey(InfluxDBConstants.TAG_CATEGORY_NAME)));
-    }
-
-    @Test
-    public void shouldStoreTypeTagIfProvidedAndConfigured() {
-        NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
-
-        when(influxDBConfiguration.isAddTypeTag()).thenReturn(true);
-        InfluxPoint point = instance.convert(item, null);
-
-        if (point == null) {
-            Assertions.fail("'point' is null");
-            return;
-        }
-
-        assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_TYPE_NAME, "Number"));
-
-        when(influxDBConfiguration.isAddTypeTag()).thenReturn(false);
-        point = instance.convert(item, null);
-
-        if (point == null) {
-            Assertions.fail("'point' is null");
-            return;
-        }
-
-        assertThat(point.getTags(), not(hasKey(InfluxDBConstants.TAG_TYPE_NAME)));
-    }
-
-    @Test
-    public void shouldStoreTypeLabelIfProvidedAndConfigured() {
-        NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
-        item.setLabel("ItemLabel");
-
-        when(influxDBConfiguration.isAddLabelTag()).thenReturn(true);
-        InfluxPoint point = instance.convert(item, null);
-
-        if (point == null) {
-            Assertions.fail("'point' is null");
-            return;
-        }
-
-        assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_LABEL_NAME, "ItemLabel"));
-
-        when(influxDBConfiguration.isAddLabelTag()).thenReturn(false);
-        point = instance.convert(item, null);
-
-        if (point == null) {
-            Assertions.fail("'point' is null");
-            return;
-        }
-
-        assertThat(point.getTags(), not(hasKey(InfluxDBConstants.TAG_LABEL_NAME)));
-    }
-
-    @Test
-    public void shouldStoreMetadataAsTagsIfProvided() {
-        NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
-        MetadataKey metadataKey = new MetadataKey(InfluxDBPersistenceService.SERVICE_NAME, item.getName());
-
-        when(metadataRegistry.get(metadataKey))
-                .thenReturn(new Metadata(metadataKey, "", Map.of("key1", "val1", "key2", "val2")));
-
-        InfluxPoint point = instance.convert(item, null);
-
-        if (point == null) {
-            Assertions.fail("'point' is null");
-            return;
-        }
-
-        assertThat(point.getTags(), hasEntry("key1", "val1"));
-        assertThat(point.getTags(), hasEntry("key2", "val2"));
-    }
-
-    @Test
-    public void shouldUseMeasurementNameFromMetadataIfProvided() {
-        NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
-        MetadataKey metadataKey = new MetadataKey(InfluxDBPersistenceService.SERVICE_NAME, item.getName());
-
-        InfluxPoint point = instance.convert(item, null);
-        if (point == null) {
-            Assertions.fail();
-            return;
-        }
-        assertThat(point.getMeasurementName(), equalTo(item.getName()));
-
-        point = instance.convert(item, null);
-        if (point == null) {
-            Assertions.fail();
-            return;
-        }
-        assertThat(point.getMeasurementName(), equalTo(item.getName()));
-        assertThat(point.getTags(), hasEntry("item", item.getName()));
-
-        when(metadataRegistry.get(metadataKey))
-                .thenReturn(new Metadata(metadataKey, "measurementName", Map.of("key1", "val1", "key2", "val2")));
-
-        point = instance.convert(item, null);
-        if (point == null) {
-            Assertions.fail();
-            return;
-        }
-        assertThat(point.getMeasurementName(), equalTo("measurementName"));
-        assertThat(point.getTags(), hasEntry("item", item.getName()));
-
-        when(metadataRegistry.get(metadataKey))
-                .thenReturn(new Metadata(metadataKey, "", Map.of("key1", "val1", "key2", "val2")));
-
-        point = instance.convert(item, null);
-        if (point == null) {
-            Assertions.fail();
-            return;
-        }
-        assertThat(point.getMeasurementName(), equalTo(item.getName()));
-        assertThat(point.getTags(), hasEntry("item", item.getName()));
-    }
-}