*/
package org.openhab.persistence.influxdb;
+import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.*;
+
+import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
+import org.openhab.core.common.ThreadPoolManager;
import org.openhab.core.config.core.ConfigurableService;
import org.openhab.core.items.Item;
+import org.openhab.core.items.ItemFactory;
import org.openhab.core.items.ItemRegistry;
+import org.openhab.core.items.ItemUtil;
import org.openhab.core.persistence.FilterCriteria;
import org.openhab.core.persistence.HistoricItem;
import org.openhab.core.persistence.PersistenceItemInfo;
import org.openhab.core.persistence.QueryablePersistenceService;
import org.openhab.core.persistence.strategy.PersistenceStrategy;
import org.openhab.core.types.State;
+import org.openhab.core.types.UnDefType;
import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
import org.openhab.persistence.influxdb.internal.InfluxDBHistoricItem;
import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
import org.openhab.persistence.influxdb.internal.InfluxDBStateConvertUtils;
import org.openhab.persistence.influxdb.internal.InfluxPoint;
-import org.openhab.persistence.influxdb.internal.ItemToStorePointCreator;
-import org.openhab.persistence.influxdb.internal.UnexpectedConditionException;
import org.openhab.persistence.influxdb.internal.influx1.InfluxDB1RepositoryImpl;
import org.openhab.persistence.influxdb.internal.influx2.InfluxDB2RepositoryImpl;
import org.osgi.framework.Constants;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.ReferencePolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final Logger logger = LoggerFactory.getLogger(InfluxDBPersistenceService.class);
+ private static final int COMMIT_INTERVAL = 3; // in s
protected static final String CONFIG_URI = "persistence:influxdb";
// External dependencies
private final InfluxDBMetadataService influxDBMetadataService;
private final InfluxDBConfiguration configuration;
- private final ItemToStorePointCreator itemToStorePointCreator;
private final InfluxDBRepository influxDBRepository;
- private boolean tryReconnection;
+ private boolean serviceActivated;
+
+ // storage
+ private final ScheduledFuture<?> storeJob;
+ private final BlockingQueue<InfluxPoint> pointsQueue = new LinkedBlockingQueue<>();
+
+ // conversion
+ private final Set<ItemFactory> itemFactories = new HashSet<>();
+ private Map<String, Class<? extends State>> desiredClasses = new HashMap<>();
@Activate
public InfluxDBPersistenceService(final @Reference ItemRegistry itemRegistry,
if (configuration.isValid()) {
this.influxDBRepository = createInfluxDBRepository();
this.influxDBRepository.connect();
- this.itemToStorePointCreator = new ItemToStorePointCreator(configuration, influxDBMetadataService);
- tryReconnection = true;
+ this.storeJob = ThreadPoolManager.getScheduledPool("org.openhab.influxdb")
+ .scheduleWithFixedDelay(this::commit, COMMIT_INTERVAL, COMMIT_INTERVAL, TimeUnit.SECONDS);
+ serviceActivated = true;
} else {
throw new IllegalArgumentException("Configuration invalid.");
}
*/
@Deactivate
public void deactivate() {
- tryReconnection = false;
+ serviceActivated = false;
+
+ storeJob.cancel(false);
+ commit(); // ensure we at least tried to store the data;
+
+ if (!pointsQueue.isEmpty()) {
+ logger.warn("InfluxDB failed to finally store {} points.", pointsQueue.size());
+ }
+
influxDBRepository.disconnect();
logger.info("InfluxDB persistence service stopped.");
}
@Override
public void store(Item item, @Nullable String alias) {
- if (checkConnection()) {
- InfluxPoint point = itemToStorePointCreator.convert(item, alias);
- if (point != null) {
- try {
- influxDBRepository.write(point);
- logger.trace("Stored item {} in InfluxDB point {}", item, point);
- } catch (UnexpectedConditionException e) {
- logger.warn("Failed to store item {} in InfluxDB point {}", point, item);
- }
+ if (!serviceActivated) {
+ logger.warn("InfluxDB service not ready. Storing {} rejected.", item);
+ return;
+ }
+ convert(item, alias).thenAccept(point -> {
+ if (point == null) {
+ logger.trace("Ignoring item {}, conversion to an InfluxDB point failed.", item.getName());
+ return;
+ }
+ if (pointsQueue.offer(point)) {
+ logger.trace("Queued {} for item {}", point, item);
} else {
- logger.trace("Ignoring item {}, conversion to an InfluxDB point failed.", item);
+ logger.warn("Failed to queue {} for item {}", point, item);
}
- } else {
- logger.debug("store ignored, InfluxDB is not connected");
- }
+ });
}
@Override
public Iterable<HistoricItem> query(FilterCriteria filter) {
- if (checkConnection()) {
+ if (serviceActivated && checkConnection()) {
logger.trace(
"Query-Filter: itemname: {}, ordering: {}, state: {}, operator: {}, getBeginDate: {}, getEndDate: {}, getPageSize: {}, getPageNumber: {}",
filter.getItemName(), filter.getOrdering().toString(), filter.getState(), filter.getOperator(),
private boolean checkConnection() {
if (influxDBRepository.isConnected()) {
return true;
- } else if (tryReconnection) {
+ } else if (serviceActivated) {
logger.debug("Connection lost, trying re-connection");
return influxDBRepository.connect();
}
return false;
}
+
+ private void commit() {
+ if (!pointsQueue.isEmpty() && checkConnection()) {
+ List<InfluxPoint> points = new ArrayList<>();
+ pointsQueue.drainTo(points);
+ if (!influxDBRepository.write(points)) {
+ logger.warn("Re-queuing {} elements, failed to write batch.", points.size());
+ pointsQueue.addAll(points);
+ } else {
+ logger.trace("Wrote {} elements to database", points.size());
+ }
+ }
+ }
+
+ /**
+ * Convert incoming data to an {@link InfluxPoint} for further processing. This is needed because storage is
+ * asynchronous and the item data may have changed.
+ * <p />
+ * The method is package-private for testing.
+ *
+ * @param item the {@link Item} that needs conversion
+ * @param storeAlias an (optional) alias for the item
+ * @return a {@link CompletableFuture} that contains either <code>null</code> for item states that cannot be
+ * converted or the corresponding {@link InfluxPoint}
+ */
+ CompletableFuture<@Nullable InfluxPoint> convert(Item item, @Nullable String storeAlias) {
+ String itemName = item.getName();
+ String itemLabel = item.getLabel();
+ String category = item.getCategory();
+ State state = item.getState();
+ String itemType = item.getType();
+ Instant timeStamp = Instant.now();
+
+ if (state instanceof UnDefType) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ return CompletableFuture.supplyAsync(() -> {
+ String measurementName = storeAlias != null && !storeAlias.isBlank() ? storeAlias : itemName;
+ measurementName = influxDBMetadataService.getMeasurementNameOrDefault(itemName, measurementName);
+
+ if (configuration.isReplaceUnderscore()) {
+ measurementName = measurementName.replace('_', '.');
+ }
+
+ State storeState = Objects
+ .requireNonNullElse(state.as(desiredClasses.get(ItemUtil.getMainItemType(itemType))), state);
+ Object value = InfluxDBStateConvertUtils.stateToObject(storeState);
+
+ InfluxPoint.Builder pointBuilder = InfluxPoint.newBuilder(measurementName).withTime(timeStamp)
+ .withValue(value).withTag(TAG_ITEM_NAME, itemName);
+
+ if (configuration.isAddCategoryTag()) {
+ String categoryName = Objects.requireNonNullElse(category, "n/a");
+ pointBuilder.withTag(TAG_CATEGORY_NAME, categoryName);
+ }
+
+ if (configuration.isAddTypeTag()) {
+ pointBuilder.withTag(TAG_TYPE_NAME, itemType);
+ }
+
+ if (configuration.isAddLabelTag()) {
+ String labelName = Objects.requireNonNullElse(itemLabel, "n/a");
+ pointBuilder.withTag(TAG_LABEL_NAME, labelName);
+ }
+
+ influxDBMetadataService.getMetaData(itemName)
+ .ifPresent(metadata -> metadata.getConfiguration().forEach(pointBuilder::withTag));
+
+ return pointBuilder.build();
+ });
+ }
+
+ @Reference(cardinality = ReferenceCardinality.AT_LEAST_ONE, policy = ReferencePolicy.DYNAMIC)
+ public void setItemFactory(ItemFactory itemFactory) {
+ itemFactories.add(itemFactory);
+ calculateItemTypeClasses();
+ }
+
+ public void unsetItemFactory(ItemFactory itemFactory) {
+ itemFactories.remove(itemFactory);
+ calculateItemTypeClasses();
+ }
+
+ private synchronized void calculateItemTypeClasses() {
+ Map<String, Class<? extends State>> desiredClasses = new HashMap<>();
+ itemFactories.forEach(factory -> {
+ for (String itemType : factory.getSupportedItemTypes()) {
+ Item item = factory.createItem(itemType, "influxItem");
+ if (item != null) {
+ item.getAcceptedCommandTypes().stream()
+ .filter(commandType -> commandType.isAssignableFrom(State.class)).findFirst()
+ .map(commandType -> (Class<? extends State>) commandType.asSubclass(State.class))
+ .ifPresent(desiredClass -> desiredClasses.put(itemType, desiredClass));
+ }
+ }
+ });
+ this.desiredClasses = desiredClasses;
+ }
}
List<InfluxRow> query(String query);
/**
- * Write point to database
+ * Write points to database
*
- * @param influxPoint Point to write
- * @throws UnexpectedConditionException when an error occurs
+ * @param influxPoints {@link List<InfluxPoint>} to write
+ * @returns <code>true</code> if points have been written, <code>false</code> otherwise
*/
- void write(InfluxPoint influxPoint) throws UnexpectedConditionException;
+ boolean write(List<InfluxPoint> influxPoints);
/**
* create a query creator on this repository
+++ /dev/null
-/**
- * Copyright (c) 2010-2023 Contributors to the openHAB project
- *
- * See the NOTICE file(s) distributed with this work for additional
- * information.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License 2.0 which is available at
- * http://www.eclipse.org/legal/epl-2.0
- *
- * SPDX-License-Identifier: EPL-2.0
- */
-package org.openhab.persistence.influxdb.internal;
-
-import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.TAG_CATEGORY_NAME;
-import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.TAG_ITEM_NAME;
-import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.TAG_LABEL_NAME;
-import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.TAG_TYPE_NAME;
-
-import java.time.Instant;
-import java.util.Objects;
-import java.util.Optional;
-
-import org.eclipse.jdt.annotation.NonNullByDefault;
-import org.eclipse.jdt.annotation.Nullable;
-import org.openhab.core.items.Item;
-import org.openhab.core.types.State;
-import org.openhab.core.types.UnDefType;
-
-/**
- * Logic to create an InfluxDB {@link InfluxPoint} from an openHAB {@link Item}
- *
- * @author Joan Pujol Espinar - Initial contribution
- */
-@NonNullByDefault
-public class ItemToStorePointCreator {
- private final InfluxDBConfiguration configuration;
- private final InfluxDBMetadataService influxDBMetadataService;
-
- public ItemToStorePointCreator(InfluxDBConfiguration configuration,
- InfluxDBMetadataService influxDBMetadataService) {
- this.configuration = configuration;
- this.influxDBMetadataService = influxDBMetadataService;
- }
-
- public @Nullable InfluxPoint convert(Item item, @Nullable String storeAlias) {
- if (item.getState() instanceof UnDefType) {
- return null;
- }
-
- String measurementName = calculateMeasurementName(item, storeAlias);
- String itemName = item.getName();
- State state = getItemState(item);
-
- Object value = InfluxDBStateConvertUtils.stateToObject(state);
-
- InfluxPoint.Builder pointBuilder = InfluxPoint.newBuilder(measurementName).withTime(Instant.now())
- .withValue(value).withTag(TAG_ITEM_NAME, itemName);
-
- addPointTags(item, pointBuilder);
-
- return pointBuilder.build();
- }
-
- private String calculateMeasurementName(Item item, @Nullable String storeAlias) {
- String name = storeAlias != null && !storeAlias.isBlank() ? storeAlias : item.getName();
- name = influxDBMetadataService.getMeasurementNameOrDefault(item.getName(), name);
-
- if (configuration.isReplaceUnderscore()) {
- name = name.replace('_', '.');
- }
-
- return name;
- }
-
- private State getItemState(Item item) {
- return calculateDesiredTypeConversionToStore(item)
- .map(desiredClass -> Objects.requireNonNullElseGet(item.getStateAs(desiredClass), item::getState))
- .orElseGet(item::getState);
- }
-
- private Optional<Class<? extends State>> calculateDesiredTypeConversionToStore(Item item) {
- return item.getAcceptedCommandTypes().stream().filter(commandType -> commandType.isAssignableFrom(State.class))
- .findFirst().map(commandType -> commandType.asSubclass(State.class));
- }
-
- private void addPointTags(Item item, InfluxPoint.Builder pointBuilder) {
- if (configuration.isAddCategoryTag()) {
- String categoryName = Objects.requireNonNullElse(item.getCategory(), "n/a");
- pointBuilder.withTag(TAG_CATEGORY_NAME, categoryName);
- }
-
- if (configuration.isAddTypeTag()) {
- pointBuilder.withTag(TAG_TYPE_NAME, item.getType());
- }
-
- if (configuration.isAddLabelTag()) {
- String labelName = Objects.requireNonNullElse(item.getLabel(), "n/a");
- pointBuilder.withTag(TAG_LABEL_NAME, labelName);
- }
-
- influxDBMetadataService.getMetaData(item.getName())
- .ifPresent(metadata -> metadata.getConfiguration().forEach(pointBuilder::withTag));
- }
-}
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
+import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.Pong;
import org.influxdb.dto.Query;
import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
import org.openhab.persistence.influxdb.internal.InfluxPoint;
-import org.openhab.persistence.influxdb.internal.UnexpectedConditionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.influxdb.exceptions.InfluxException;
+
/**
* Implementation of {@link InfluxDBRepository} for InfluxDB 1.0
*
}
@Override
- public void write(InfluxPoint point) throws UnexpectedConditionException {
+ public boolean write(List<InfluxPoint> influxPoints) {
final InfluxDB currentClient = this.client;
- if (currentClient != null) {
- Point clientPoint = convertPointToClientFormat(point);
- currentClient.write(configuration.getDatabaseName(), configuration.getRetentionPolicy(), clientPoint);
- } else {
- logger.warn("Write point {} ignored due to client isn't connected", point);
+ if (currentClient == null) {
+ return false;
+ }
+ try {
+ List<Point> points = influxPoints.stream().map(this::convertPointToClientFormat).filter(Optional::isPresent)
+ .map(Optional::get).toList();
+ BatchPoints batchPoints = BatchPoints.database(configuration.getDatabaseName())
+ .retentionPolicy(configuration.getRetentionPolicy()).points(points).build();
+ currentClient.write(batchPoints);
+ } catch (InfluxException e) {
+ logger.debug("Writing to database failed", e);
+ return false;
}
+ return true;
}
- private Point convertPointToClientFormat(InfluxPoint point) throws UnexpectedConditionException {
+ private Optional<Point> convertPointToClientFormat(InfluxPoint point) {
Point.Builder clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime().toEpochMilli(),
TimeUnit.MILLISECONDS);
Object value = point.getValue();
} else if (value == null) {
clientPoint.addField(FIELD_VALUE_NAME, "null");
} else {
- throw new UnexpectedConditionException("Not expected value type");
+ logger.warn("Could not convert {}, discarding this datapoint", point);
+ return Optional.empty();
}
point.getTags().forEach(clientPoint::tag);
- return clientPoint.build();
+ return Optional.of(clientPoint.build());
}
@Override
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.stream.Stream;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
import org.openhab.persistence.influxdb.internal.InfluxPoint;
-import org.openhab.persistence.influxdb.internal.UnexpectedConditionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.influxdb.client.domain.Ready;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
+import com.influxdb.exceptions.InfluxException;
import com.influxdb.query.FluxTable;
/**
}
@Override
- public void write(InfluxPoint point) throws UnexpectedConditionException {
+ public boolean write(List<InfluxPoint> influxPoints) {
final WriteApi currentWriteAPI = writeAPI;
- if (currentWriteAPI != null) {
- currentWriteAPI.writePoint(convertPointToClientFormat(point));
- } else {
- logger.warn("Write point {} ignored due to writeAPI isn't present", point);
+ if (currentWriteAPI == null) {
+ return false;
+ }
+ try {
+ List<Point> clientPoints = influxPoints.stream().map(this::convertPointToClientFormat)
+ .filter(Optional::isPresent).map(Optional::get).toList();
+ currentWriteAPI.writePoints(clientPoints);
+ } catch (InfluxException e) {
+ logger.debug("Writing to database failed", e);
+ return false;
}
+ return true;
}
- private Point convertPointToClientFormat(InfluxPoint point) throws UnexpectedConditionException {
+ private Optional<Point> convertPointToClientFormat(InfluxPoint point) {
Point clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime(), WritePrecision.MS);
- setPointValue(point.getValue(), clientPoint);
- point.getTags().forEach(clientPoint::addTag);
- return clientPoint;
- }
-
- private void setPointValue(@Nullable Object value, Point point) throws UnexpectedConditionException {
+ @Nullable
+ Object value = point.getValue();
if (value instanceof String) {
- point.addField(FIELD_VALUE_NAME, (String) value);
+ clientPoint.addField(FIELD_VALUE_NAME, (String) value);
} else if (value instanceof Number) {
- point.addField(FIELD_VALUE_NAME, (Number) value);
+ clientPoint.addField(FIELD_VALUE_NAME, (Number) value);
} else if (value instanceof Boolean) {
- point.addField(FIELD_VALUE_NAME, (Boolean) value);
+ clientPoint.addField(FIELD_VALUE_NAME, (Boolean) value);
} else if (value == null) {
- point.addField(FIELD_VALUE_NAME, (String) null);
+ clientPoint.addField(FIELD_VALUE_NAME, (String) null);
} else {
- throw new UnexpectedConditionException("Not expected value type");
+ logger.warn("Could not convert {}, discarding this datapoint)", clientPoint);
+ return Optional.empty();
}
+ point.getTags().forEach(clientPoint::addTag);
+ return Optional.of(clientPoint);
}
@Override
--- /dev/null
+/**
+ * Copyright (c) 2010-2023 Contributors to the openHAB project
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.openhab.persistence.influxdb;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.DATABASE_PARAM;
+import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.PASSWORD_PARAM;
+import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.RETENTION_POLICY_PARAM;
+import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.TOKEN_PARAM;
+import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.URL_PARAM;
+import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.USER_PARAM;
+import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.VERSION_PARAM;
+
+import java.util.Map;
+
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+import org.openhab.core.items.ItemRegistry;
+import org.openhab.core.items.MetadataRegistry;
+import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
+import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
+import org.openhab.persistence.influxdb.internal.InfluxDBVersion;
+import org.openhab.persistence.influxdb.internal.ItemTestHelper;
+import org.openhab.persistence.influxdb.internal.UnexpectedConditionException;
+
+/**
+ * @author Joan Pujol Espinar - Initial contribution
+ */
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
+@NonNullByDefault
+public class InfluxDBPersistenceServiceTest {
+ private static final Map<String, Object> VALID_V1_CONFIGURATION = Map.of( //
+ URL_PARAM, "http://localhost:8086", //
+ VERSION_PARAM, InfluxDBVersion.V1.name(), //
+ USER_PARAM, "user", PASSWORD_PARAM, "password", //
+ DATABASE_PARAM, "openhab", //
+ RETENTION_POLICY_PARAM, "default");
+
+ private static final Map<String, Object> VALID_V2_CONFIGURATION = Map.of( //
+ URL_PARAM, "http://localhost:8086", //
+ VERSION_PARAM, InfluxDBVersion.V2.name(), //
+ TOKEN_PARAM, "sampletoken", //
+ DATABASE_PARAM, "openhab", //
+ RETENTION_POLICY_PARAM, "default");
+
+ private static final Map<String, Object> INVALID_V1_CONFIGURATION = Map.of(//
+ URL_PARAM, "http://localhost:8086", //
+ VERSION_PARAM, InfluxDBVersion.V1.name(), //
+ USER_PARAM, "user", //
+ DATABASE_PARAM, "openhab", //
+ RETENTION_POLICY_PARAM, "default");
+
+ private static final Map<String, Object> INVALID_V2_CONFIGURATION = Map.of( //
+ URL_PARAM, "http://localhost:8086", //
+ VERSION_PARAM, InfluxDBVersion.V2.name(), //
+ DATABASE_PARAM, "openhab", //
+ RETENTION_POLICY_PARAM, "default");
+
+ private @Mock @NonNullByDefault({}) InfluxDBRepository influxDBRepositoryMock;
+
+ private final InfluxDBMetadataService influxDBMetadataService = new InfluxDBMetadataService(
+ mock(MetadataRegistry.class));
+
+ @Test
+ public void activateWithValidV1ConfigShouldConnectRepository() {
+ getService(VALID_V1_CONFIGURATION);
+ verify(influxDBRepositoryMock).connect();
+ }
+
+ @Test
+ public void activateWithValidV2ConfigShouldConnectRepository() {
+ getService(VALID_V2_CONFIGURATION);
+ verify(influxDBRepositoryMock).connect();
+ }
+
+ @Test
+ public void activateWithInvalidV1ConfigShouldFail() {
+ assertThrows(IllegalArgumentException.class, () -> getService(INVALID_V1_CONFIGURATION));
+ }
+
+ @Test
+ public void activateWithInvalidV2ShouldFail() {
+ assertThrows(IllegalArgumentException.class, () -> getService(INVALID_V2_CONFIGURATION));
+ }
+
+ @Test
+ public void deactivateShouldDisconnectRepository() {
+ InfluxDBPersistenceService instance = getService(VALID_V2_CONFIGURATION);
+ instance.deactivate();
+ verify(influxDBRepositoryMock).disconnect();
+ }
+
+ @Test
+ public void storeItemWithConnectedRepository() throws UnexpectedConditionException {
+ InfluxDBPersistenceService instance = getService(VALID_V2_CONFIGURATION);
+ when(influxDBRepositoryMock.isConnected()).thenReturn(true);
+ instance.store(ItemTestHelper.createNumberItem("number", 5));
+ verify(influxDBRepositoryMock, timeout(5000)).write(any());
+ }
+
+ @Test
+ public void storeItemWithDisconnectedRepositoryIsIgnored() throws UnexpectedConditionException {
+ InfluxDBPersistenceService instance = getService(VALID_V2_CONFIGURATION);
+ when(influxDBRepositoryMock.isConnected()).thenReturn(false);
+ instance.store(ItemTestHelper.createNumberItem("number", 5));
+ verify(influxDBRepositoryMock, never()).write(any());
+ }
+
+ private InfluxDBPersistenceService getService(Map<String, Object> config) {
+ return new InfluxDBPersistenceService(mock(ItemRegistry.class), influxDBMetadataService, config) {
+ @Override
+ protected InfluxDBRepository createInfluxDBRepository() {
+ return influxDBRepositoryMock;
+ }
+ };
+ }
+}
--- /dev/null
+/**
+ * Copyright (c) 2010-2023 Contributors to the openHAB project
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.openhab.persistence.influxdb;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.*;
+import static org.mockito.Mockito.when;
+import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.*;
+
+import java.math.BigDecimal;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Stream;
+
+import org.eclipse.jdt.annotation.DefaultLocation;
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.openhab.core.items.ItemRegistry;
+import org.openhab.core.items.Metadata;
+import org.openhab.core.items.MetadataKey;
+import org.openhab.core.items.MetadataRegistry;
+import org.openhab.core.library.CoreItemFactory;
+import org.openhab.core.library.items.NumberItem;
+import org.openhab.persistence.influxdb.internal.InfluxDBConstants;
+import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
+import org.openhab.persistence.influxdb.internal.InfluxDBVersion;
+import org.openhab.persistence.influxdb.internal.InfluxPoint;
+import org.openhab.persistence.influxdb.internal.ItemTestHelper;
+
+/**
+ * @author Joan Pujol Espinar - Initial contribution
+ */
+@ExtendWith(MockitoExtension.class)
+@NonNullByDefault(value = { DefaultLocation.PARAMETER, DefaultLocation.RETURN_TYPE })
+public class ItemToStorePointCreatorTest {
+
+ private static final Map<String, Object> BASE_CONFIGURATION = Map.of( //
+ URL_PARAM, "http://localhost:8086", //
+ VERSION_PARAM, InfluxDBVersion.V1.name(), //
+ USER_PARAM, "user", PASSWORD_PARAM, "password", //
+ DATABASE_PARAM, "openhab", //
+ RETENTION_POLICY_PARAM, "default");
+
+ private @Mock ItemRegistry itemRegistryMock;
+ private @Mock MetadataRegistry metadataRegistry;
+ private InfluxDBPersistenceService instance;
+
+ @BeforeEach
+ public void setup() {
+ instance = getService(false, false, false, false);
+ }
+
+ @ParameterizedTest
+ @MethodSource
+ public void convertBasicItem(Number number) throws ExecutionException, InterruptedException {
+ NumberItem item = ItemTestHelper.createNumberItem("myitem", number);
+ InfluxPoint point = instance.convert(item, null).get();
+
+ if (point == null) {
+ Assertions.fail("'point' is null");
+ return;
+ }
+
+ assertThat(point.getMeasurementName(), equalTo(item.getName()));
+ assertThat("Must Store item name", point.getTags(), hasEntry("item", item.getName()));
+ assertThat(point.getValue(), equalTo(new BigDecimal(number.toString())));
+ }
+
+ @SuppressWarnings("unused")
+ private static Stream<Number> convertBasicItem() {
+ return Stream.of(5, 5.5, 5L);
+ }
+
+ @Test
+ public void shouldUseAliasAsMeasurementNameIfProvided() throws ExecutionException, InterruptedException {
+ NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
+ InfluxPoint point = instance.convert(item, "aliasName").get();
+
+ if (point == null) {
+ Assertions.fail("'point' is null");
+ return;
+ }
+
+ assertThat(point.getMeasurementName(), is("aliasName"));
+ }
+
+ @Test
+ public void shouldStoreCategoryTagIfProvidedAndConfigured() throws ExecutionException, InterruptedException {
+ NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
+ item.setCategory("categoryValue");
+
+ instance = getService(false, true, false, false);
+ InfluxPoint point = instance.convert(item, null).get();
+
+ if (point == null) {
+ Assertions.fail("'point' is null");
+ return;
+ }
+
+ assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_CATEGORY_NAME, "categoryValue"));
+
+ instance = getService(false, false, false, false);
+ point = instance.convert(item, null).get();
+
+ if (point == null) {
+ Assertions.fail("'point' is null");
+ return;
+ }
+
+ assertThat(point.getTags(), not(hasKey(InfluxDBConstants.TAG_CATEGORY_NAME)));
+ }
+
+ @Test
+ public void shouldStoreTypeTagIfProvidedAndConfigured() throws ExecutionException, InterruptedException {
+ NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
+
+ instance = getService(false, false, false, true);
+ InfluxPoint point = instance.convert(item, null).get();
+
+ if (point == null) {
+ Assertions.fail("'point' is null");
+ return;
+ }
+
+ assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_TYPE_NAME, "Number"));
+
+ instance = getService(false, false, false, false);
+ point = instance.convert(item, null).get();
+
+ if (point == null) {
+ Assertions.fail("'point' is null");
+ return;
+ }
+
+ assertThat(point.getTags(), not(hasKey(InfluxDBConstants.TAG_TYPE_NAME)));
+ }
+
+ @Test
+ public void shouldStoreTypeLabelIfProvidedAndConfigured() throws ExecutionException, InterruptedException {
+ NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
+ item.setLabel("ItemLabel");
+
+ instance = getService(false, false, true, false);
+ InfluxPoint point = instance.convert(item, null).get();
+
+ if (point == null) {
+ Assertions.fail("'point' is null");
+ return;
+ }
+
+ assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_LABEL_NAME, "ItemLabel"));
+
+ instance = getService(false, false, false, false);
+ point = instance.convert(item, null).get();
+
+ if (point == null) {
+ Assertions.fail("'point' is null");
+ return;
+ }
+
+ assertThat(point.getTags(), not(hasKey(InfluxDBConstants.TAG_LABEL_NAME)));
+ }
+
+ @Test
+ public void shouldStoreMetadataAsTagsIfProvided() throws ExecutionException, InterruptedException {
+ NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
+ MetadataKey metadataKey = new MetadataKey(InfluxDBPersistenceService.SERVICE_NAME, item.getName());
+
+ when(metadataRegistry.get(metadataKey))
+ .thenReturn(new Metadata(metadataKey, "", Map.of("key1", "val1", "key2", "val2")));
+
+ InfluxPoint point = instance.convert(item, null).get();
+
+ if (point == null) {
+ Assertions.fail("'point' is null");
+ return;
+ }
+
+ assertThat(point.getTags(), hasEntry("key1", "val1"));
+ assertThat(point.getTags(), hasEntry("key2", "val2"));
+ }
+
+ @Test
+ public void shouldUseMeasurementNameFromMetadataIfProvided() throws ExecutionException, InterruptedException {
+ NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
+ MetadataKey metadataKey = new MetadataKey(InfluxDBPersistenceService.SERVICE_NAME, item.getName());
+
+ InfluxPoint point = instance.convert(item, null).get();
+ if (point == null) {
+ Assertions.fail();
+ return;
+ }
+ assertThat(point.getMeasurementName(), equalTo(item.getName()));
+
+ point = instance.convert(item, null).get();
+ if (point == null) {
+ Assertions.fail();
+ return;
+ }
+ assertThat(point.getMeasurementName(), equalTo(item.getName()));
+ assertThat(point.getTags(), hasEntry("item", item.getName()));
+
+ when(metadataRegistry.get(metadataKey))
+ .thenReturn(new Metadata(metadataKey, "measurementName", Map.of("key1", "val1", "key2", "val2")));
+
+ point = instance.convert(item, null).get();
+ if (point == null) {
+ Assertions.fail();
+ return;
+ }
+ assertThat(point.getMeasurementName(), equalTo("measurementName"));
+ assertThat(point.getTags(), hasEntry("item", item.getName()));
+
+ when(metadataRegistry.get(metadataKey))
+ .thenReturn(new Metadata(metadataKey, "", Map.of("key1", "val1", "key2", "val2")));
+
+ point = instance.convert(item, null).get();
+ if (point == null) {
+ Assertions.fail();
+ return;
+ }
+ assertThat(point.getMeasurementName(), equalTo(item.getName()));
+ assertThat(point.getTags(), hasEntry("item", item.getName()));
+ }
+
+ private InfluxDBPersistenceService getService(boolean replaceUnderscore, boolean category, boolean label,
+ boolean typeTag) {
+ InfluxDBMetadataService influxDBMetadataService = new InfluxDBMetadataService(metadataRegistry);
+
+ Map<String, Object> configuration = new HashMap<>();
+ configuration.putAll(BASE_CONFIGURATION);
+ configuration.put(REPLACE_UNDERSCORE_PARAM, replaceUnderscore);
+ configuration.put(ADD_CATEGORY_TAG_PARAM, category);
+ configuration.put(ADD_LABEL_TAG_PARAM, label);
+ configuration.put(ADD_TYPE_TAG_PARAM, typeTag);
+
+ InfluxDBPersistenceService instance = new InfluxDBPersistenceService(itemRegistryMock, influxDBMetadataService,
+ configuration);
+ instance.setItemFactory(new CoreItemFactory());
+
+ return instance;
+ }
+}
+++ /dev/null
-/**
- * Copyright (c) 2010-2023 Contributors to the openHAB project
- *
- * See the NOTICE file(s) distributed with this work for additional
- * information.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License 2.0 which is available at
- * http://www.eclipse.org/legal/epl-2.0
- *
- * SPDX-License-Identifier: EPL-2.0
- */
-package org.openhab.persistence.influxdb.internal;
-
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.*;
-import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.DATABASE_PARAM;
-import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.PASSWORD_PARAM;
-import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.RETENTION_POLICY_PARAM;
-import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.TOKEN_PARAM;
-import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.URL_PARAM;
-import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.USER_PARAM;
-import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.VERSION_PARAM;
-
-import java.util.Map;
-
-import org.eclipse.jdt.annotation.NonNullByDefault;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.openhab.core.items.ItemRegistry;
-import org.openhab.core.items.MetadataRegistry;
-import org.openhab.persistence.influxdb.InfluxDBPersistenceService;
-
-/**
- * @author Joan Pujol Espinar - Initial contribution
- */
-@ExtendWith(MockitoExtension.class)
-@NonNullByDefault
-public class InfluxDBPersistenceServiceTest {
- private static final Map<String, Object> VALID_V1_CONFIGURATION = Map.of( //
- URL_PARAM, "http://localhost:8086", //
- VERSION_PARAM, InfluxDBVersion.V1.name(), //
- USER_PARAM, "user", PASSWORD_PARAM, "password", //
- DATABASE_PARAM, "openhab", //
- RETENTION_POLICY_PARAM, "default");
-
- private static final Map<String, Object> VALID_V2_CONFIGURATION = Map.of( //
- URL_PARAM, "http://localhost:8086", //
- VERSION_PARAM, InfluxDBVersion.V2.name(), //
- TOKEN_PARAM, "sampletoken", //
- DATABASE_PARAM, "openhab", //
- RETENTION_POLICY_PARAM, "default");
-
- private static final Map<String, Object> INVALID_V1_CONFIGURATION = Map.of(//
- URL_PARAM, "http://localhost:8086", //
- VERSION_PARAM, InfluxDBVersion.V1.name(), //
- USER_PARAM, "user", //
- DATABASE_PARAM, "openhab", //
- RETENTION_POLICY_PARAM, "default");
-
- private static final Map<String, Object> INVALID_V2_CONFIGURATION = Map.of( //
- URL_PARAM, "http://localhost:8086", //
- VERSION_PARAM, InfluxDBVersion.V2.name(), //
- DATABASE_PARAM, "openhab", //
- RETENTION_POLICY_PARAM, "default");
-
- private @Mock @NonNullByDefault({}) InfluxDBRepository influxDBRepositoryMock;
-
- private final InfluxDBMetadataService influxDBMetadataService = new InfluxDBMetadataService(
- mock(MetadataRegistry.class));
-
- @Test
- public void activateWithValidV1ConfigShouldConnectRepository() {
- getService(VALID_V1_CONFIGURATION);
- verify(influxDBRepositoryMock).connect();
- }
-
- @Test
- public void activateWithValidV2ConfigShouldConnectRepository() {
- getService(VALID_V2_CONFIGURATION);
- verify(influxDBRepositoryMock).connect();
- }
-
- @Test
- public void activateWithInvalidV1ConfigShouldFail() {
- assertThrows(IllegalArgumentException.class, () -> getService(INVALID_V1_CONFIGURATION));
- }
-
- @Test
- public void activateWithInvalidV2ShouldFail() {
- assertThrows(IllegalArgumentException.class, () -> getService(INVALID_V2_CONFIGURATION));
- }
-
- @Test
- public void deactivateShouldDisconnectRepository() {
- InfluxDBPersistenceService instance = getService(VALID_V2_CONFIGURATION);
- instance.deactivate();
- verify(influxDBRepositoryMock).disconnect();
- }
-
- @Test
- public void storeItemWithConnectedRepository() throws UnexpectedConditionException {
- InfluxDBPersistenceService instance = getService(VALID_V2_CONFIGURATION);
- when(influxDBRepositoryMock.isConnected()).thenReturn(true);
- instance.store(ItemTestHelper.createNumberItem("number", 5));
- verify(influxDBRepositoryMock).write(any());
- }
-
- @Test
- public void storeItemWithDisconnectedRepositoryIsIgnored() throws UnexpectedConditionException {
- InfluxDBPersistenceService instance = getService(VALID_V2_CONFIGURATION);
- when(influxDBRepositoryMock.isConnected()).thenReturn(false);
- instance.store(ItemTestHelper.createNumberItem("number", 5));
- verify(influxDBRepositoryMock, never()).write(any());
- }
-
- private InfluxDBPersistenceService getService(Map<String, Object> config) {
- return new InfluxDBPersistenceService(mock(ItemRegistry.class), influxDBMetadataService, config) {
- @Override
- protected InfluxDBRepository createInfluxDBRepository() {
- return influxDBRepositoryMock;
- }
- };
- }
-}
+++ /dev/null
-/**
- * Copyright (c) 2010-2023 Contributors to the openHAB project
- *
- * See the NOTICE file(s) distributed with this work for additional
- * information.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License 2.0 which is available at
- * http://www.eclipse.org/legal/epl-2.0
- *
- * SPDX-License-Identifier: EPL-2.0
- */
-package org.openhab.persistence.influxdb.internal;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.*;
-import static org.mockito.Mockito.when;
-
-import java.math.BigDecimal;
-import java.util.Map;
-import java.util.stream.Stream;
-
-import org.eclipse.jdt.annotation.DefaultLocation;
-import org.eclipse.jdt.annotation.NonNullByDefault;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.MethodSource;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.openhab.core.items.Metadata;
-import org.openhab.core.items.MetadataKey;
-import org.openhab.core.items.MetadataRegistry;
-import org.openhab.core.library.items.NumberItem;
-import org.openhab.persistence.influxdb.InfluxDBPersistenceService;
-
-/**
- * @author Joan Pujol Espinar - Initial contribution
- */
-@ExtendWith(MockitoExtension.class)
-@NonNullByDefault(value = { DefaultLocation.PARAMETER, DefaultLocation.RETURN_TYPE })
-public class ItemToStorePointCreatorTest {
-
- private @Mock InfluxDBConfiguration influxDBConfiguration;
- private @Mock MetadataRegistry metadataRegistry;
- private ItemToStorePointCreator instance;
-
- @BeforeEach
- public void before() {
- InfluxDBMetadataService influxDBMetadataService = new InfluxDBMetadataService(metadataRegistry);
- when(influxDBConfiguration.isAddCategoryTag()).thenReturn(false);
- when(influxDBConfiguration.isAddLabelTag()).thenReturn(false);
- when(influxDBConfiguration.isAddTypeTag()).thenReturn(false);
- when(influxDBConfiguration.isReplaceUnderscore()).thenReturn(false);
-
- instance = new ItemToStorePointCreator(influxDBConfiguration, influxDBMetadataService);
- }
-
- @AfterEach
- public void after() {
- instance = null;
- influxDBConfiguration = null;
- metadataRegistry = null;
- }
-
- @ParameterizedTest
- @MethodSource
- public void convertBasicItem(Number number) {
- NumberItem item = ItemTestHelper.createNumberItem("myitem", number);
- InfluxPoint point = instance.convert(item, null);
-
- if (point == null) {
- Assertions.fail("'point' is null");
- return;
- }
-
- assertThat(point.getMeasurementName(), equalTo(item.getName()));
- assertThat("Must Store item name", point.getTags(), hasEntry("item", item.getName()));
- assertThat(point.getValue(), equalTo(new BigDecimal(number.toString())));
- }
-
- @SuppressWarnings("unused")
- private static Stream<Number> convertBasicItem() {
- return Stream.of(5, 5.5, 5L);
- }
-
- @Test
- public void shouldUseAliasAsMeasurementNameIfProvided() {
- NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
- InfluxPoint point = instance.convert(item, "aliasName");
-
- if (point == null) {
- Assertions.fail("'point' is null");
- return;
- }
-
- assertThat(point.getMeasurementName(), is("aliasName"));
- }
-
- @Test
- public void shouldStoreCategoryTagIfProvidedAndConfigured() {
- NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
- item.setCategory("categoryValue");
-
- when(influxDBConfiguration.isAddCategoryTag()).thenReturn(true);
- InfluxPoint point = instance.convert(item, null);
-
- if (point == null) {
- Assertions.fail("'point' is null");
- return;
- }
-
- assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_CATEGORY_NAME, "categoryValue"));
-
- when(influxDBConfiguration.isAddCategoryTag()).thenReturn(false);
- point = instance.convert(item, null);
-
- if (point == null) {
- Assertions.fail("'point' is null");
- return;
- }
-
- assertThat(point.getTags(), not(hasKey(InfluxDBConstants.TAG_CATEGORY_NAME)));
- }
-
- @Test
- public void shouldStoreTypeTagIfProvidedAndConfigured() {
- NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
-
- when(influxDBConfiguration.isAddTypeTag()).thenReturn(true);
- InfluxPoint point = instance.convert(item, null);
-
- if (point == null) {
- Assertions.fail("'point' is null");
- return;
- }
-
- assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_TYPE_NAME, "Number"));
-
- when(influxDBConfiguration.isAddTypeTag()).thenReturn(false);
- point = instance.convert(item, null);
-
- if (point == null) {
- Assertions.fail("'point' is null");
- return;
- }
-
- assertThat(point.getTags(), not(hasKey(InfluxDBConstants.TAG_TYPE_NAME)));
- }
-
- @Test
- public void shouldStoreTypeLabelIfProvidedAndConfigured() {
- NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
- item.setLabel("ItemLabel");
-
- when(influxDBConfiguration.isAddLabelTag()).thenReturn(true);
- InfluxPoint point = instance.convert(item, null);
-
- if (point == null) {
- Assertions.fail("'point' is null");
- return;
- }
-
- assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_LABEL_NAME, "ItemLabel"));
-
- when(influxDBConfiguration.isAddLabelTag()).thenReturn(false);
- point = instance.convert(item, null);
-
- if (point == null) {
- Assertions.fail("'point' is null");
- return;
- }
-
- assertThat(point.getTags(), not(hasKey(InfluxDBConstants.TAG_LABEL_NAME)));
- }
-
- @Test
- public void shouldStoreMetadataAsTagsIfProvided() {
- NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
- MetadataKey metadataKey = new MetadataKey(InfluxDBPersistenceService.SERVICE_NAME, item.getName());
-
- when(metadataRegistry.get(metadataKey))
- .thenReturn(new Metadata(metadataKey, "", Map.of("key1", "val1", "key2", "val2")));
-
- InfluxPoint point = instance.convert(item, null);
-
- if (point == null) {
- Assertions.fail("'point' is null");
- return;
- }
-
- assertThat(point.getTags(), hasEntry("key1", "val1"));
- assertThat(point.getTags(), hasEntry("key2", "val2"));
- }
-
- @Test
- public void shouldUseMeasurementNameFromMetadataIfProvided() {
- NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
- MetadataKey metadataKey = new MetadataKey(InfluxDBPersistenceService.SERVICE_NAME, item.getName());
-
- InfluxPoint point = instance.convert(item, null);
- if (point == null) {
- Assertions.fail();
- return;
- }
- assertThat(point.getMeasurementName(), equalTo(item.getName()));
-
- point = instance.convert(item, null);
- if (point == null) {
- Assertions.fail();
- return;
- }
- assertThat(point.getMeasurementName(), equalTo(item.getName()));
- assertThat(point.getTags(), hasEntry("item", item.getName()));
-
- when(metadataRegistry.get(metadataKey))
- .thenReturn(new Metadata(metadataKey, "measurementName", Map.of("key1", "val1", "key2", "val2")));
-
- point = instance.convert(item, null);
- if (point == null) {
- Assertions.fail();
- return;
- }
- assertThat(point.getMeasurementName(), equalTo("measurementName"));
- assertThat(point.getTags(), hasEntry("item", item.getName()));
-
- when(metadataRegistry.get(metadataKey))
- .thenReturn(new Metadata(metadataKey, "", Map.of("key1", "val1", "key2", "val2")));
-
- point = instance.convert(item, null);
- if (point == null) {
- Assertions.fail();
- return;
- }
- assertThat(point.getMeasurementName(), equalTo(item.getName()));
- assertThat(point.getTags(), hasEntry("item", item.getName()));
- }
-}