<properties>
<bnd.importpackage>
- !javax.annotation;!android.*,!com.android.*,!com.google.appengine.*,!dalvik.system,!kotlin.*,!kotlinx.*,!org.conscrypt,!sun.security.ssl,!org.apache.harmony.*,!org.apache.http.*,!rx.*,!org.msgpack.*
+ !javax.annotation.*;!android.*,!com.android.*,!com.google.appengine.*,!dalvik.system,!kotlin.*,!kotlinx.*,!org.conscrypt,!sun.security.ssl,!org.apache.harmony.*,!org.apache.http.*,!rx.*,!org.msgpack.*
</bnd.importpackage>
+ <okhttp3.version>3.14.9</okhttp3.version>
+ <retrofit.version>2.7.2</retrofit.version>
+ <influx2.version>1.15.0</influx2.version>
+ <influx1.version>2.21</influx1.version>
</properties>
<dependencies>
<!-- START InfluxDB 2.0 -->
- <!-- START influxdb-client-java -->
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-java</artifactId>
- <version>1.6.0</version>
+ <version>${influx2.version}</version>
</dependency>
<dependency>
+ <groupId>com.influxdb</groupId>
<artifactId>influxdb-client-core</artifactId>
+ <version>${influx2.version}</version>
+ </dependency>
+ <dependency>
<groupId>com.influxdb</groupId>
- <version>1.6.0</version>
+ <artifactId>flux-dsl</artifactId>
+ <version>${influx2.version}</version>
</dependency>
+
<dependency>
- <artifactId>converter-gson</artifactId>
<groupId>com.squareup.retrofit2</groupId>
- <version>2.5.0</version>
+ <artifactId>converter-gson</artifactId>
+ <version>${retrofit.version}</version>
</dependency>
<dependency>
+ <groupId>com.squareup.retrofit2</groupId>
<artifactId>converter-scalars</artifactId>
+ <version>${retrofit.version}</version>
+ </dependency>
+ <dependency>
+ <artifactId>retrofit</artifactId>
<groupId>com.squareup.retrofit2</groupId>
- <version>2.5.0</version>
+ <version>${retrofit.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>okhttp</artifactId>
+ <version>${okhttp3.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>logging-interceptor</artifactId>
+ <version>${okhttp3.version}</version>
</dependency>
<dependency>
- <artifactId>gson</artifactId>
<groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
<version>2.9.1</version>
</dependency>
<dependency>
- <artifactId>gson-fire</artifactId>
<groupId>io.gsonfire</groupId>
- <version>1.8.0</version>
+ <artifactId>gson-fire</artifactId>
+ <version>1.8.4</version>
</dependency>
<dependency>
- <artifactId>okio</artifactId>
<groupId>com.squareup.okio</groupId>
+ <artifactId>okio</artifactId>
<version>1.17.3</version>
</dependency>
<dependency>
- <artifactId>commons-csv</artifactId>
<groupId>org.apache.commons</groupId>
- <version>1.6</version>
+ <artifactId>commons-csv</artifactId>
+ <version>1.8</version>
</dependency>
<dependency>
<artifactId>json</artifactId>
<groupId>org.json</groupId>
- <version>20180813</version>
- </dependency>
- <dependency>
- <artifactId>okhttp</artifactId>
- <groupId>com.squareup.okhttp3</groupId>
- <version>${okhttp.version}</version>
- </dependency>
- <dependency>
- <artifactId>retrofit</artifactId>
- <groupId>com.squareup.retrofit2</groupId>
- <version>2.6.2</version>
- </dependency>
- <dependency>
- <artifactId>jsr305</artifactId>
- <groupId>com.google.code.findbugs</groupId>
- <version>3.0.2</version>
- </dependency>
- <dependency>
- <artifactId>logging-interceptor</artifactId>
- <groupId>com.squareup.okhttp3</groupId>
- <version>${okhttp.version}</version>
+ <version>20200518</version>
</dependency>
<dependency>
<artifactId>rxjava</artifactId>
<groupId>io.reactivex.rxjava2</groupId>
- <version>2.2.17</version>
+ <version>2.2.19</version>
</dependency>
<dependency>
<artifactId>reactive-streams</artifactId>
<dependency>
<artifactId>swagger-annotations</artifactId>
<groupId>io.swagger</groupId>
- <version>1.5.22</version>
- </dependency>
- <!--END influxdb-client-java -->
-
-
- <dependency>
- <groupId>com.influxdb</groupId>
- <artifactId>flux-dsl</artifactId>
- <version>1.6.0</version>
+ <version>1.6.1</version>
</dependency>
-
<!--END InfluxDB 2.0 -->
<!--START InfluxDB 1.0 -->
<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
- <version>2.17</version>
+ <version>${influx1.version}</version>
</dependency>
<dependency>
<groupId>com.squareup.retrofit2</groupId>
<artifactId>converter-moshi</artifactId>
- <version>2.6.2</version>
+ <version>${retrofit.version}</version>
</dependency>
<dependency>
<groupId>com.squareup.moshi</groupId>
<!-- END InfluxDB 1.0 -->
</dependencies>
+
</project>
import org.openhab.core.config.core.ConfigurableService;
import org.openhab.core.items.Item;
import org.openhab.core.items.ItemRegistry;
-import org.openhab.core.items.MetadataRegistry;
import org.openhab.core.persistence.FilterCriteria;
import org.openhab.core.persistence.HistoricItem;
import org.openhab.core.persistence.PersistenceItemInfo;
import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
import org.openhab.persistence.influxdb.internal.InfluxDBHistoricItem;
+import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
import org.openhab.persistence.influxdb.internal.InfluxDBPersistentItemInfo;
import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
import org.openhab.persistence.influxdb.internal.InfluxDBStateConvertUtils;
import org.openhab.persistence.influxdb.internal.InfluxPoint;
-import org.openhab.persistence.influxdb.internal.InfluxRow;
import org.openhab.persistence.influxdb.internal.ItemToStorePointCreator;
-import org.openhab.persistence.influxdb.internal.RepositoryFactory;
+import org.openhab.persistence.influxdb.internal.UnexpectedConditionException;
+import org.openhab.persistence.influxdb.internal.influx1.InfluxDB1RepositoryImpl;
+import org.openhab.persistence.influxdb.internal.influx2.InfluxDB2RepositoryImpl;
import org.osgi.framework.Constants;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Modified;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// External dependencies
private final ItemRegistry itemRegistry;
- private final MetadataRegistry metadataRegistry;
+ private final InfluxDBMetadataService influxDBMetadataService;
- // Internal dependencies/state
- private InfluxDBConfiguration configuration = InfluxDBConfiguration.NO_CONFIGURATION;
-
- // Relax rules because can only be null if component is not active
- private @NonNullByDefault({}) ItemToStorePointCreator itemToStorePointCreator;
- private @NonNullByDefault({}) InfluxDBRepository influxDBRepository;
-
- private boolean tryReconnection = false;
+ private final InfluxDBConfiguration configuration;
+ private final ItemToStorePointCreator itemToStorePointCreator;
+ private final InfluxDBRepository influxDBRepository;
+ private boolean tryReconnection;
@Activate
public InfluxDBPersistenceService(final @Reference ItemRegistry itemRegistry,
- final @Reference MetadataRegistry metadataRegistry) {
+ final @Reference InfluxDBMetadataService influxDBMetadataService, Map<String, Object> config) {
this.itemRegistry = itemRegistry;
- this.metadataRegistry = metadataRegistry;
- }
-
- /**
- * Connect to database when service is activated
- */
- @Activate
- public void activate(final @Nullable Map<String, Object> config) {
- logger.debug("InfluxDB persistence service is being activated");
-
- if (loadConfiguration(config)) {
- itemToStorePointCreator = new ItemToStorePointCreator(configuration, metadataRegistry);
- influxDBRepository = createInfluxDBRepository();
- influxDBRepository.connect();
+ this.influxDBMetadataService = influxDBMetadataService;
+ this.configuration = new InfluxDBConfiguration(config);
+ if (configuration.isValid()) {
+ this.influxDBRepository = createInfluxDBRepository();
+ this.influxDBRepository.connect();
+ this.itemToStorePointCreator = new ItemToStorePointCreator(configuration, influxDBMetadataService);
tryReconnection = true;
} else {
- logger.error("Cannot load configuration, persistence service wont work");
- tryReconnection = false;
+ throw new IllegalArgumentException("Configuration invalid.");
}
- logger.debug("InfluxDB persistence service is now activated");
+ logger.info("InfluxDB persistence service started.");
}
// Visible for testing
- protected InfluxDBRepository createInfluxDBRepository() {
- return RepositoryFactory.createRepository(configuration);
+ protected InfluxDBRepository createInfluxDBRepository() throws IllegalArgumentException {
+ return switch (configuration.getVersion()) {
+ case V1 -> new InfluxDB1RepositoryImpl(configuration, influxDBMetadataService);
+ case V2 -> new InfluxDB2RepositoryImpl(configuration, influxDBMetadataService);
+ default -> throw new IllegalArgumentException("Failed to instantiate repository.");
+ };
}
/**
*/
@Deactivate
public void deactivate() {
- logger.debug("InfluxDB persistence service deactivated");
tryReconnection = false;
- if (influxDBRepository != null) {
- influxDBRepository.disconnect();
- influxDBRepository = null;
- }
- if (itemToStorePointCreator != null) {
- itemToStorePointCreator = null;
- }
- }
-
- /**
- * Rerun deactivation/activation code each time configuration is changed
- */
- @Modified
- protected void modified(@Nullable Map<String, Object> config) {
- if (config != null) {
- logger.debug("Config has been modified will deactivate/activate with new config");
-
- deactivate();
- activate(config);
- } else {
- logger.warn("Null configuration, ignoring");
- }
- }
-
- private boolean loadConfiguration(@Nullable Map<String, Object> config) {
- boolean configurationIsValid;
- if (config != null) {
- configuration = new InfluxDBConfiguration(config);
- configurationIsValid = configuration.isValid();
- if (configurationIsValid) {
- logger.debug("Loaded configuration {}", config);
- } else {
- logger.warn("Some configuration properties are not valid {}", config);
- }
- } else {
- configuration = InfluxDBConfiguration.NO_CONFIGURATION;
- configurationIsValid = false;
- logger.warn("Ignoring configuration because it's null");
- }
- return configurationIsValid;
+ influxDBRepository.disconnect();
+ logger.info("InfluxDB persistence service stopped.");
}
@Override
@Override
public Set<PersistenceItemInfo> getItemInfo() {
if (checkConnection()) {
- return influxDBRepository.getStoredItemsCount().entrySet().stream()
- .map(entry -> new InfluxDBPersistentItemInfo(entry.getKey(), entry.getValue()))
+ return influxDBRepository.getStoredItemsCount().entrySet().stream().map(InfluxDBPersistentItemInfo::new)
.collect(Collectors.toUnmodifiableSet());
} else {
- logger.info("getItemInfo ignored, InfluxDB is not yet connected");
+ logger.info("getItemInfo ignored, InfluxDB is not connected");
return Set.of();
}
}
@Override
public void store(Item item) {
- store(item, item.getName());
+ store(item, null);
}
@Override
if (checkConnection()) {
InfluxPoint point = itemToStorePointCreator.convert(item, alias);
if (point != null) {
- logger.trace("Storing item {} in InfluxDB point {}", item, point);
- influxDBRepository.write(point);
+ try {
+ influxDBRepository.write(point);
+ logger.trace("Stored item {} in InfluxDB point {}", item, point);
+ } catch (UnexpectedConditionException e) {
+ logger.warn("Failed to store item {} in InfluxDB point {}", point, item);
+ }
} else {
- logger.trace("Ignoring item {} as is cannot be converted to an InfluxDB point", item);
+ logger.trace("Ignoring item {}, conversion to an InfluxDB point failed.", item);
}
} else {
- logger.debug("store ignored, InfluxDB is not yet connected");
+ logger.debug("store ignored, InfluxDB is not connected");
}
}
@Override
public Iterable<HistoricItem> query(FilterCriteria filter) {
- logger.debug("Got a query for historic points!");
-
if (checkConnection()) {
logger.trace(
- "Filter: itemname: {}, ordering: {}, state: {}, operator: {}, getBeginDate: {}, getEndDate: {}, getPageSize: {}, getPageNumber: {}",
+ "Query-Filter: itemname: {}, ordering: {}, state: {}, operator: {}, getBeginDate: {}, getEndDate: {}, getPageSize: {}, getPageNumber: {}",
filter.getItemName(), filter.getOrdering().toString(), filter.getState(), filter.getOperator(),
filter.getBeginDate(), filter.getEndDate(), filter.getPageSize(), filter.getPageNumber());
-
- String query = RepositoryFactory.createQueryCreator(configuration, metadataRegistry).createQuery(filter,
+ String query = influxDBRepository.createQueryCreator().createQuery(filter,
configuration.getRetentionPolicy());
logger.trace("Query {}", query);
- List<InfluxRow> results = influxDBRepository.query(query);
- return results.stream().map(this::mapRow2HistoricItem).collect(Collectors.toList());
+ List<InfluxDBRepository.InfluxRow> results = influxDBRepository.query(query);
+ return results.stream().map(this::mapRowToHistoricItem).collect(Collectors.toList());
} else {
- logger.debug("query ignored, InfluxDB is not yet connected");
+ logger.debug("Query for persisted data ignored, InfluxDB is not connected");
return List.of();
}
}
- private HistoricItem mapRow2HistoricItem(InfluxRow row) {
- State state = InfluxDBStateConvertUtils.objectToState(row.getValue(), row.getItemName(), itemRegistry);
- return new InfluxDBHistoricItem(row.getItemName(), state,
- ZonedDateTime.ofInstant(row.getTime(), ZoneId.systemDefault()));
+ private HistoricItem mapRowToHistoricItem(InfluxDBRepository.InfluxRow row) {
+ State state = InfluxDBStateConvertUtils.objectToState(row.value(), row.itemName(), itemRegistry);
+ return new InfluxDBHistoricItem(row.itemName(), state,
+ ZonedDateTime.ofInstant(row.time(), ZoneId.systemDefault()));
}
@Override
* @return true if connected
*/
private boolean checkConnection() {
- if (influxDBRepository == null) {
- return false;
- } else if (influxDBRepository.isConnected()) {
+ if (influxDBRepository.isConnected()) {
return true;
} else if (tryReconnection) {
logger.debug("Connection lost, trying re-connection");
- influxDBRepository.connect();
- return influxDBRepository.isConnected();
+ return influxDBRepository.connect();
}
return false;
}
String createQuery(FilterCriteria criteria, String retentionPolicy);
default String getOperationSymbol(FilterCriteria.Operator operator, InfluxDBVersion version) {
- switch (operator) {
- case EQ:
- return "=";
- case LT:
- return "<";
- case LTE:
- return "<=";
- case GT:
- return ">";
- case GTE:
- return ">=";
- case NEQ:
- return version == InfluxDBVersion.V1 ? "<>" : "!=";
- default:
- throw new UnnexpectedConditionException("Not expected operator " + operator);
- }
+ return switch (operator) {
+ case EQ -> "=";
+ case LT -> "<";
+ case LTE -> "<=";
+ case GT -> ">";
+ case GTE -> ">=";
+ case NEQ -> version == InfluxDBVersion.V1 ? "<>" : "!=";
+ };
}
}
*/
package org.openhab.persistence.influxdb.internal;
-import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.StringJoiner;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
+import org.openhab.core.config.core.ConfigParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public static final String ADD_CATEGORY_TAG_PARAM = "addCategoryTag";
public static final String ADD_LABEL_TAG_PARAM = "addLabelTag";
public static final String ADD_TYPE_TAG_PARAM = "addTypeTag";
- public static InfluxDBConfiguration NO_CONFIGURATION = new InfluxDBConfiguration(Collections.emptyMap());
private final Logger logger = LoggerFactory.getLogger(InfluxDBConfiguration.class);
private final String url;
private final String user;
private final String databaseName;
private final String retentionPolicy;
private final InfluxDBVersion version;
-
private final boolean replaceUnderscore;
private final boolean addCategoryTag;
private final boolean addTypeTag;
private final boolean addLabelTag;
public InfluxDBConfiguration(Map<String, Object> config) {
- url = (String) config.getOrDefault(URL_PARAM, "http://127.0.0.1:8086");
- user = (String) config.getOrDefault(USER_PARAM, "openhab");
- password = (String) config.getOrDefault(PASSWORD_PARAM, "");
- token = (String) config.getOrDefault(TOKEN_PARAM, "");
- databaseName = (String) config.getOrDefault(DATABASE_PARAM, "openhab");
- retentionPolicy = (String) config.getOrDefault(RETENTION_POLICY_PARAM, "autogen");
+ url = ConfigParser.valueAsOrElse(config.get(URL_PARAM), String.class, "http://127.0.0.1:8086");
+ user = ConfigParser.valueAsOrElse(config.get(USER_PARAM), String.class, "openhab");
+ password = ConfigParser.valueAsOrElse(config.get(PASSWORD_PARAM), String.class, "");
+ token = ConfigParser.valueAsOrElse(config.get(TOKEN_PARAM), String.class, "");
+ databaseName = ConfigParser.valueAsOrElse(config.get(DATABASE_PARAM), String.class, "openhab");
+ retentionPolicy = ConfigParser.valueAsOrElse(config.get(RETENTION_POLICY_PARAM), String.class, "autogen");
version = parseInfluxVersion((String) config.getOrDefault(VERSION_PARAM, InfluxDBVersion.V1.name()));
-
- replaceUnderscore = getConfigBooleanValue(config, REPLACE_UNDERSCORE_PARAM, false);
- addCategoryTag = getConfigBooleanValue(config, ADD_CATEGORY_TAG_PARAM, false);
- addLabelTag = getConfigBooleanValue(config, ADD_LABEL_TAG_PARAM, false);
- addTypeTag = getConfigBooleanValue(config, ADD_TYPE_TAG_PARAM, false);
- }
-
- private static boolean getConfigBooleanValue(Map<String, Object> config, String key, boolean defaultValue) {
- Object object = config.get(key);
- if (object instanceof Boolean) {
- return (Boolean) object;
- } else if (object instanceof String) {
- return "true".equalsIgnoreCase((String) object);
- } else {
- return defaultValue;
- }
+ replaceUnderscore = ConfigParser.valueAsOrElse(config.get(REPLACE_UNDERSCORE_PARAM), Boolean.class, false);
+ addCategoryTag = ConfigParser.valueAsOrElse(config.get(ADD_CATEGORY_TAG_PARAM), Boolean.class, false);
+ addLabelTag = ConfigParser.valueAsOrElse(config.get(ADD_LABEL_TAG_PARAM), Boolean.class, false);
+ addTypeTag = ConfigParser.valueAsOrElse(config.get(ADD_TYPE_TAG_PARAM), Boolean.class, false);
}
private InfluxDBVersion parseInfluxVersion(@Nullable String value) {
@Override
public String toString() {
- String sb = "InfluxDBConfiguration{" + "url='" + url + '\'' + ", user='" + user + '\'' + ", password='"
- + password.length() + " chars" + '\'' + ", token='" + token.length() + " chars" + '\''
- + ", databaseName='" + databaseName + '\'' + ", retentionPolicy='" + retentionPolicy + '\''
- + ", version=" + version + ", replaceUnderscore=" + replaceUnderscore + ", addCategoryTag="
- + addCategoryTag + ", addTypeTag=" + addTypeTag + ", addLabelTag=" + addLabelTag + '}';
- return sb;
- }
-
- public int getTokenLength() {
- return token.length();
- }
-
- public char[] getTokenAsCharArray() {
- return token.toCharArray();
+ return "InfluxDBConfiguration{url='" + url + "', user='" + user + "', password='" + password.length()
+ + " chars', token='" + token.length() + " chars', databaseName='" + databaseName
+ + "', retentionPolicy='" + retentionPolicy + "', version=" + version + ", replaceUnderscore="
+ + replaceUnderscore + ", addCategoryTag=" + addCategoryTag + ", addTypeTag=" + addTypeTag
+ + ", addLabelTag=" + addLabelTag + '}';
}
}
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.openhab.core.persistence.HistoricItem;
import org.openhab.core.types.State;
-import org.openhab.core.types.UnDefType;
/**
* Java bean used to return items queries results from InfluxDB.
public class InfluxDBHistoricItem implements HistoricItem {
private String name = "";
- private State state = UnDefType.NULL;
- private ZonedDateTime timestamp;
+ private final State state;
+ private final ZonedDateTime timestamp;
public InfluxDBHistoricItem(String name, State state, ZonedDateTime timestamp) {
this.name = name;
return state;
}
- public void setState(State state) {
- this.state = state;
- }
-
@Override
public ZonedDateTime getTimestamp() {
return timestamp;
}
- public void setTimestamp(ZonedDateTime timestamp) {
- this.timestamp = timestamp;
- }
-
@Override
public String toString() {
return DateFormat.getDateTimeInstance().format(timestamp) + ": " + name + " -> " + state.toString();
--- /dev/null
+/**
+ * Copyright (c) 2010-2023 Contributors to the openHAB project
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.openhab.persistence.influxdb.internal;
+
+import java.util.Optional;
+
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.openhab.core.items.Metadata;
+import org.openhab.core.items.MetadataKey;
+import org.openhab.core.items.MetadataRegistry;
+import org.openhab.persistence.influxdb.InfluxDBPersistenceService;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+
+/**
+ * Utility service for using item metadata in InfluxDB
+ *
+ * @author Jan N. Klug - Initial contribution
+ */
+@NonNullByDefault
+@Component(service = InfluxDBMetadataService.class)
+public class InfluxDBMetadataService {
+ private final MetadataRegistry metadataRegistry;
+
+ @Activate
+ public InfluxDBMetadataService(@Reference MetadataRegistry metadataRegistry) {
+ this.metadataRegistry = metadataRegistry;
+ }
+
+ /**
+ * get the measurement name from the item metadata or return the provided default
+ *
+ * @param itemName the item name
+ * @param defaultName the default measurement name (
+ * @return the metadata measurement name if present, defaultName otherwise
+ */
+ public String getMeasurementNameOrDefault(String itemName, String defaultName) {
+ Optional<Metadata> metadata = getMetaData(itemName);
+ if (metadata.isPresent()) {
+ String metaName = metadata.get().getValue();
+ if (!metaName.isBlank()) {
+ return metaName;
+ }
+ }
+
+ return defaultName;
+ }
+
+ /**
+ * get an Optional of the metadata for an item
+ *
+ * @param itemName the item name
+ * @return Optional with the metadata (may be empty)
+ */
+ public Optional<Metadata> getMetaData(String itemName) {
+ MetadataKey key = new MetadataKey(InfluxDBPersistenceService.SERVICE_NAME, itemName);
+ return Optional.ofNullable(metadataRegistry.get(key));
+ }
+}
+++ /dev/null
-/**
- * Copyright (c) 2010-2023 Contributors to the openHAB project
- *
- * See the NOTICE file(s) distributed with this work for additional
- * information.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License 2.0 which is available at
- * http://www.eclipse.org/legal/epl-2.0
- *
- * SPDX-License-Identifier: EPL-2.0
- */
-package org.openhab.persistence.influxdb.internal;
-
-import org.eclipse.jdt.annotation.NonNullByDefault;
-import org.eclipse.jdt.annotation.Nullable;
-import org.openhab.core.items.Metadata;
-import org.openhab.core.items.MetadataKey;
-import org.openhab.core.items.MetadataRegistry;
-import org.openhab.persistence.influxdb.InfluxDBPersistenceService;
-
-/**
- * Logic to use items metadata from an openHAB {@link Item}
- *
- * @author Johannes Ott - Initial contribution
- */
-@NonNullByDefault
-public class InfluxDBMetadataUtils {
-
- private InfluxDBMetadataUtils() {
- }
-
- public static String calculateMeasurementNameFromMetadataIfPresent(
- final @Nullable MetadataRegistry currentMetadataRegistry, String name, @Nullable String itemName) {
-
- if (itemName == null || currentMetadataRegistry == null) {
- return name;
- }
-
- MetadataKey key = new MetadataKey(InfluxDBPersistenceService.SERVICE_NAME, itemName);
- Metadata metadata = currentMetadataRegistry.get(key);
- if (metadata != null) {
- String metaName = metadata.getValue();
- if (!metaName.isBlank()) {
- name = metaName;
- }
- }
-
- return name;
- }
-}
package org.openhab.persistence.influxdb.internal;
import java.util.Date;
+import java.util.Map;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
private final String name;
private final Integer count;
- public InfluxDBPersistentItemInfo(String name, Integer count) {
- this.name = name;
- this.count = count;
+ public InfluxDBPersistentItemInfo(Map.Entry<String, Integer> itemInfo) {
+ this.name = itemInfo.getKey();
+ this.count = itemInfo.getValue();
}
@Override
*/
package org.openhab.persistence.influxdb.internal;
+import java.time.Instant;
import java.util.List;
import java.util.Map;
/**
* Connect to InfluxDB server
*
- * @return True if successful, otherwise false
+ * @return <code>true</code> if successful, otherwise <code>false</code>
*/
boolean connect();
/**
* Check if connection is currently ready
*
- * @return True if its ready, otherwise false
+ * @return True if it's ready, otherwise false
*/
boolean checkConnectionStatus();
/**
- * Return all stored item names with it's count of stored points
+ * Return all stored item names with its count of stored points
*
* @return Map with <ItemName,ItemCount> entries
*/
*
* @param query Query
* @return Query results
+ *
*/
List<InfluxRow> query(String query);
* Write point to database
*
* @param influxPoint Point to write
+ * @throws UnexpectedConditionException when an error occurs
*/
- void write(InfluxPoint influxPoint);
+ void write(InfluxPoint influxPoint) throws UnexpectedConditionException;
+
+ /**
+ * create a query creator on this repository
+ *
+ * @return the query creator for this repository
+ */
+ FilterCriteriaQueryCreator createQueryCreator();
+
+ record InfluxRow(Instant time, String itemName, Object value) {
+ }
}
public class InfluxDBStateConvertUtils {
static final Number DIGITAL_VALUE_OFF = 0; // Visible for testing
static final Number DIGITAL_VALUE_ON = 1; // Visible for testing
- private static Logger logger = LoggerFactory.getLogger(InfluxDBStateConvertUtils.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(InfluxDBStateConvertUtils.class);
/**
* Converts {@link State} to objects fitting into influxdb values.
if (state instanceof HSBType) {
value = state.toString();
} else if (state instanceof PointType) {
- value = point2String((PointType) state);
+ value = state.toString();
} else if (state instanceof DecimalType) {
value = ((DecimalType) state).toBigDecimal();
} else if (state instanceof QuantityType<?>) {
* @return the state of the item represented by the itemName parameter, else the string value of
* the Object parameter
*/
- public static State objectToState(@Nullable Object value, String itemName, @Nullable ItemRegistry itemRegistry) {
- State state = null;
- if (itemRegistry != null) {
- try {
- Item item = itemRegistry.getItem(itemName);
- state = objectToState(value, item);
- } catch (ItemNotFoundException e) {
- logger.info("Could not find item '{}' in registry", itemName);
- }
- }
-
- if (state == null) {
- state = new StringType(String.valueOf(value));
+ public static State objectToState(Object value, String itemName, ItemRegistry itemRegistry) {
+ try {
+ Item item = itemRegistry.getItem(itemName);
+ return objectToState(value, item);
+ } catch (ItemNotFoundException e) {
+ LOGGER.info("Could not find item '{}' in registry", itemName);
}
- return state;
+ return new StringType(String.valueOf(value));
}
public static State objectToState(@Nullable Object value, Item itemToSetState) {
} else if (item instanceof DimmerItem) {
return new PercentType(valueStr);
} else if (item instanceof SwitchItem) {
- return toBoolean(valueStr) ? OnOffType.ON : OnOffType.OFF;
+ return OnOffType.from(toBoolean(valueStr));
} else if (item instanceof ContactItem) {
return toBoolean(valueStr) ? OpenClosedType.OPEN : OpenClosedType.CLOSED;
} else if (item instanceof RollershutterItem) {
if ("1".equals(object) || "1.0".equals(object)) {
return true;
} else {
- return Boolean.valueOf(String.valueOf(object));
+ return Boolean.parseBoolean(String.valueOf(object));
}
} else {
return false;
}
}
-
- private static String point2String(PointType point) {
- StringBuilder buf = new StringBuilder();
- buf.append(point.getLatitude().toString());
- buf.append(",");
- buf.append(point.getLongitude().toString());
- if (!point.getAltitude().equals(DecimalType.ZERO)) {
- buf.append(",");
- buf.append(point.getAltitude().toString());
- }
- return buf.toString(); // latitude, longitude, altitude
- }
}
*/
@NonNullByDefault({ DefaultLocation.PARAMETER })
public class InfluxPoint {
- private String measurementName;
- private Instant time;
- private Object value;
- private Map<String, String> tags;
+ private final String measurementName;
+ private final Instant time;
+ private final Object value;
+ private final Map<String, String> tags;
private InfluxPoint(Builder builder) {
measurementName = builder.measurementName;
}
public static final class Builder {
- private String measurementName;
+ private final String measurementName;
private Instant time;
private Object value;
- private Map<String, String> tags = new HashMap<>();
+ private final Map<String, String> tags = new HashMap<>();
private Builder(String measurementName) {
this.measurementName = measurementName;
return this;
}
- public Builder withTag(String name, String value) {
- tags.put(name, value);
+ public Builder withTag(String name, Object value) {
+ tags.put(name, value.toString());
return this;
}
+++ /dev/null
-/**
- * Copyright (c) 2010-2023 Contributors to the openHAB project
- *
- * See the NOTICE file(s) distributed with this work for additional
- * information.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License 2.0 which is available at
- * http://www.eclipse.org/legal/epl-2.0
- *
- * SPDX-License-Identifier: EPL-2.0
- */
-package org.openhab.persistence.influxdb.internal;
-
-import java.time.Instant;
-
-import org.eclipse.jdt.annotation.NonNullByDefault;
-import org.eclipse.jdt.annotation.Nullable;
-
-/**
- * Row data returned from database query
- *
- * @author Joan Pujol Espinar - Initial contribution
- */
-@NonNullByDefault
-public class InfluxRow {
- private final String itemName;
- private final Instant time;
- private final @Nullable Object value;
-
- public InfluxRow(Instant time, String itemName, @Nullable Object value) {
- this.time = time;
- this.itemName = itemName;
- this.value = value;
- }
-
- public Instant getTime() {
- return time;
- }
-
- public String getItemName() {
- return itemName;
- }
-
- public @Nullable Object getValue() {
- return value;
- }
-}
*/
package org.openhab.persistence.influxdb.internal;
-import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.*;
+import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.TAG_CATEGORY_NAME;
+import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.TAG_ITEM_NAME;
+import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.TAG_LABEL_NAME;
+import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.TAG_TYPE_NAME;
import java.time.Instant;
+import java.util.Objects;
import java.util.Optional;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.core.items.Item;
-import org.openhab.core.items.Metadata;
-import org.openhab.core.items.MetadataKey;
-import org.openhab.core.items.MetadataRegistry;
import org.openhab.core.types.State;
import org.openhab.core.types.UnDefType;
-import org.openhab.persistence.influxdb.InfluxDBPersistenceService;
/**
* Logic to create an InfluxDB {@link InfluxPoint} from an openHAB {@link Item}
@NonNullByDefault
public class ItemToStorePointCreator {
private final InfluxDBConfiguration configuration;
- private final @Nullable MetadataRegistry metadataRegistry;
+ private final InfluxDBMetadataService influxDBMetadataService;
- public ItemToStorePointCreator(InfluxDBConfiguration configuration, @Nullable MetadataRegistry metadataRegistry) {
+ public ItemToStorePointCreator(InfluxDBConfiguration configuration,
+ InfluxDBMetadataService influxDBMetadataService) {
this.configuration = configuration;
- this.metadataRegistry = metadataRegistry;
+ this.influxDBMetadataService = influxDBMetadataService;
}
public @Nullable InfluxPoint convert(Item item, @Nullable String storeAlias) {
Object value = InfluxDBStateConvertUtils.stateToObject(state);
- InfluxPoint.Builder point = InfluxPoint.newBuilder(measurementName).withTime(Instant.now()).withValue(value)
- .withTag(TAG_ITEM_NAME, itemName);
+ InfluxPoint.Builder pointBuilder = InfluxPoint.newBuilder(measurementName).withTime(Instant.now())
+ .withValue(value).withTag(TAG_ITEM_NAME, itemName);
- addPointTags(item, point);
+ addPointTags(item, pointBuilder);
- return point.build();
+ return pointBuilder.build();
}
private String calculateMeasurementName(Item item, @Nullable String storeAlias) {
String name = storeAlias != null && !storeAlias.isBlank() ? storeAlias : item.getName();
-
- name = InfluxDBMetadataUtils.calculateMeasurementNameFromMetadataIfPresent(metadataRegistry, name,
- item.getName());
+ name = influxDBMetadataService.getMeasurementNameOrDefault(item.getName(), name);
if (configuration.isReplaceUnderscore()) {
name = name.replace('_', '.');
}
private State getItemState(Item item) {
- final State state;
- final Optional<Class<? extends State>> desiredConversion = calculateDesiredTypeConversionToStore(item);
- if (desiredConversion.isPresent()) {
- State convertedState = item.getStateAs(desiredConversion.get());
- if (convertedState != null) {
- state = convertedState;
- } else {
- state = item.getState();
- }
- } else {
- state = item.getState();
- }
- return state;
+ return calculateDesiredTypeConversionToStore(item)
+ .map(desiredClass -> Objects.requireNonNullElseGet(item.getStateAs(desiredClass), item::getState))
+ .orElseGet(item::getState);
}
private Optional<Class<? extends State>> calculateDesiredTypeConversionToStore(Item item) {
.findFirst().map(commandType -> commandType.asSubclass(State.class));
}
- private void addPointTags(Item item, InfluxPoint.Builder point) {
+ private void addPointTags(Item item, InfluxPoint.Builder pointBuilder) {
if (configuration.isAddCategoryTag()) {
- String categoryName = item.getCategory();
- if (categoryName == null) {
- categoryName = "n/a";
- }
- point.withTag(TAG_CATEGORY_NAME, categoryName);
+ String categoryName = Objects.requireNonNullElse(item.getCategory(), "n/a");
+ pointBuilder.withTag(TAG_CATEGORY_NAME, categoryName);
}
if (configuration.isAddTypeTag()) {
- point.withTag(TAG_TYPE_NAME, item.getType());
+ pointBuilder.withTag(TAG_TYPE_NAME, item.getType());
}
if (configuration.isAddLabelTag()) {
- String labelName = item.getLabel();
- if (labelName == null) {
- labelName = "n/a";
- }
- point.withTag(TAG_LABEL_NAME, labelName);
+ String labelName = Objects.requireNonNullElse(item.getLabel(), "n/a");
+ pointBuilder.withTag(TAG_LABEL_NAME, labelName);
}
- final MetadataRegistry currentMetadataRegistry = metadataRegistry;
- if (currentMetadataRegistry != null) {
- MetadataKey key = new MetadataKey(InfluxDBPersistenceService.SERVICE_NAME, item.getName());
- Metadata metadata = currentMetadataRegistry.get(key);
- if (metadata != null) {
- metadata.getConfiguration().forEach((tagName, tagValue) -> {
- point.withTag(tagName, tagValue.toString());
- });
- }
- }
+ influxDBMetadataService.getMetaData(item.getName())
+ .ifPresent(metadata -> metadata.getConfiguration().forEach(pointBuilder::withTag));
}
}
+++ /dev/null
-/**
- * Copyright (c) 2010-2023 Contributors to the openHAB project
- *
- * See the NOTICE file(s) distributed with this work for additional
- * information.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License 2.0 which is available at
- * http://www.eclipse.org/legal/epl-2.0
- *
- * SPDX-License-Identifier: EPL-2.0
- */
-package org.openhab.persistence.influxdb.internal;
-
-import org.eclipse.jdt.annotation.NonNullByDefault;
-import org.openhab.core.items.MetadataRegistry;
-import org.openhab.persistence.influxdb.internal.influx1.Influx1FilterCriteriaQueryCreatorImpl;
-import org.openhab.persistence.influxdb.internal.influx1.InfluxDB1RepositoryImpl;
-import org.openhab.persistence.influxdb.internal.influx2.Influx2FilterCriteriaQueryCreatorImpl;
-import org.openhab.persistence.influxdb.internal.influx2.InfluxDB2RepositoryImpl;
-
-/**
- * Factory that returns {@link InfluxDBRepository} and
- * {@link FilterCriteriaQueryCreator} implementations depending on InfluxDB
- * version
- *
- * @author Joan Pujol Espinar - Initial contribution
- */
-@NonNullByDefault
-public class RepositoryFactory {
-
- public static InfluxDBRepository createRepository(InfluxDBConfiguration influxDBConfiguration) {
- switch (influxDBConfiguration.getVersion()) {
- case V1:
- return new InfluxDB1RepositoryImpl(influxDBConfiguration);
- case V2:
- return new InfluxDB2RepositoryImpl(influxDBConfiguration);
- default:
- throw new UnnexpectedConditionException("Not expected version " + influxDBConfiguration.getVersion());
- }
- }
-
- public static FilterCriteriaQueryCreator createQueryCreator(InfluxDBConfiguration influxDBConfiguration,
- MetadataRegistry metadataRegistry) {
- switch (influxDBConfiguration.getVersion()) {
- case V1:
- return new Influx1FilterCriteriaQueryCreatorImpl(influxDBConfiguration, metadataRegistry);
- case V2:
- return new Influx2FilterCriteriaQueryCreatorImpl(influxDBConfiguration, metadataRegistry);
- default:
- throw new UnnexpectedConditionException("Not expected version " + influxDBConfiguration.getVersion());
- }
- }
-}
--- /dev/null
+/**
+ * Copyright (c) 2010-2023 Contributors to the openHAB project
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.openhab.persistence.influxdb.internal;
+
+import org.eclipse.jdt.annotation.NonNullByDefault;
+
+/**
+ * Throw to indicate an unexpected condition that should not have happened (a bug)
+ *
+ * @author Joan Pujol Espinar - Initial contribution
+ */
+@NonNullByDefault
+public class UnexpectedConditionException extends Exception {
+ private static final long serialVersionUID = 1128380327167959556L;
+
+ public UnexpectedConditionException(String message) {
+ super(message);
+ }
+}
+++ /dev/null
-/**
- * Copyright (c) 2010-2023 Contributors to the openHAB project
- *
- * See the NOTICE file(s) distributed with this work for additional
- * information.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License 2.0 which is available at
- * http://www.eclipse.org/legal/epl-2.0
- *
- * SPDX-License-Identifier: EPL-2.0
- */
-package org.openhab.persistence.influxdb.internal;
-
-import org.eclipse.jdt.annotation.NonNullByDefault;
-
-/**
- * Throw to indicate an unnexpected condition that should not have happened (a bug)
- *
- * @author Joan Pujol Espinar - Initial contribution
- */
-@NonNullByDefault
-public class UnnexpectedConditionException extends RuntimeException {
- private static final long serialVersionUID = 1128380327167959556L;
-
- public UnnexpectedConditionException(String message) {
- super(message);
- }
-
- public UnnexpectedConditionException(String message, Throwable cause) {
- super(message, cause);
- }
-}
+++ /dev/null
-/**
- * Copyright (c) 2010-2023 Contributors to the openHAB project
- *
- * See the NOTICE file(s) distributed with this work for additional
- * information.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License 2.0 which is available at
- * http://www.eclipse.org/legal/epl-2.0
- *
- * SPDX-License-Identifier: EPL-2.0
- */
-package org.openhab.persistence.influxdb.internal.influx1;
-
-import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.*;
-import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.*;
-import static org.openhab.persistence.influxdb.internal.InfluxDBStateConvertUtils.stateToObject;
-
-import org.eclipse.jdt.annotation.NonNullByDefault;
-import org.eclipse.jdt.annotation.Nullable;
-import org.influxdb.dto.Query;
-import org.influxdb.querybuilder.BuiltQuery;
-import org.influxdb.querybuilder.Select;
-import org.influxdb.querybuilder.Where;
-import org.influxdb.querybuilder.clauses.SimpleClause;
-import org.openhab.core.items.MetadataRegistry;
-import org.openhab.core.persistence.FilterCriteria;
-import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
-import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
-import org.openhab.persistence.influxdb.internal.InfluxDBMetadataUtils;
-import org.openhab.persistence.influxdb.internal.InfluxDBVersion;
-
-/**
- * Implementation of {@link FilterCriteriaQueryCreator} for InfluxDB 1.0
- *
- * @author Joan Pujol Espinar - Initial contribution
- */
-@NonNullByDefault
-public class Influx1FilterCriteriaQueryCreatorImpl implements FilterCriteriaQueryCreator {
-
- private InfluxDBConfiguration configuration;
- private MetadataRegistry metadataRegistry;
-
- public Influx1FilterCriteriaQueryCreatorImpl(InfluxDBConfiguration configuration,
- MetadataRegistry metadataRegistry) {
- this.configuration = configuration;
- this.metadataRegistry = metadataRegistry;
- }
-
- @Override
- public String createQuery(FilterCriteria criteria, String retentionPolicy) {
- final String tableName;
- final String itemName = criteria.getItemName();
- boolean hasCriteriaName = itemName != null;
-
- tableName = calculateTableName(itemName);
-
- Select select = select().column("\"" + COLUMN_VALUE_NAME_V1 + "\"::field")
- .column("\"" + TAG_ITEM_NAME + "\"::tag")
- .fromRaw(null, fullQualifiedTableName(retentionPolicy, tableName, hasCriteriaName));
-
- Where where = select.where();
-
- if (itemName != null && !tableName.equals(itemName)) {
- where = where.and(BuiltQuery.QueryBuilder.eq(TAG_ITEM_NAME, itemName));
- }
-
- if (criteria.getBeginDate() != null) {
- where = where.and(
- BuiltQuery.QueryBuilder.gte(COLUMN_TIME_NAME_V1, criteria.getBeginDate().toInstant().toString()));
- }
- if (criteria.getEndDate() != null) {
- where = where.and(
- BuiltQuery.QueryBuilder.lte(COLUMN_TIME_NAME_V1, criteria.getEndDate().toInstant().toString()));
- }
-
- if (criteria.getState() != null && criteria.getOperator() != null) {
- where = where.and(new SimpleClause(COLUMN_VALUE_NAME_V1,
- getOperationSymbol(criteria.getOperator(), InfluxDBVersion.V1),
- stateToObject(criteria.getState())));
- }
-
- if (criteria.getOrdering() == FilterCriteria.Ordering.DESCENDING) {
- select = select.orderBy(desc());
- } else if (criteria.getOrdering() == FilterCriteria.Ordering.ASCENDING) {
- select = select.orderBy(asc());
- }
-
- if (criteria.getPageSize() != Integer.MAX_VALUE) {
- if (criteria.getPageNumber() != 0) {
- select = select.limit(criteria.getPageSize(), criteria.getPageSize() * criteria.getPageNumber());
- } else {
- select = select.limit(criteria.getPageSize());
- }
- }
-
- final Query query = (Query) select;
- return query.getCommand();
- }
-
- private String calculateTableName(@Nullable String itemName) {
- if (itemName == null) {
- return "/.*/";
- }
-
- String name = itemName;
-
- name = InfluxDBMetadataUtils.calculateMeasurementNameFromMetadataIfPresent(metadataRegistry, name, itemName);
-
- if (configuration.isReplaceUnderscore()) {
- name = name.replace('_', '.');
- }
-
- return name;
- }
-
- private String fullQualifiedTableName(String retentionPolicy, String tableName, boolean escapeTableName) {
- StringBuilder sb = new StringBuilder();
- sb.append('"').append(retentionPolicy).append('"');
- sb.append(".");
- if (escapeTableName) {
- sb.append('"').append(tableName).append('"');
- } else {
- sb.append(tableName);
- }
- return sb.toString();
- }
-}
--- /dev/null
+/**
+ * Copyright (c) 2010-2023 Contributors to the openHAB project
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.openhab.persistence.influxdb.internal.influx1;
+
+import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.*;
+import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.*;
+import static org.openhab.persistence.influxdb.internal.InfluxDBStateConvertUtils.stateToObject;
+
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.eclipse.jdt.annotation.Nullable;
+import org.influxdb.dto.Query;
+import org.influxdb.querybuilder.BuiltQuery;
+import org.influxdb.querybuilder.Select;
+import org.influxdb.querybuilder.Where;
+import org.influxdb.querybuilder.clauses.SimpleClause;
+import org.openhab.core.persistence.FilterCriteria;
+import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
+import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
+import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
+import org.openhab.persistence.influxdb.internal.InfluxDBVersion;
+
+/**
+ * Implementation of {@link FilterCriteriaQueryCreator} for InfluxDB 1.0
+ *
+ * @author Joan Pujol Espinar - Initial contribution
+ */
+@NonNullByDefault
+public class InfluxDB1FilterCriteriaQueryCreatorImpl implements FilterCriteriaQueryCreator {
+
+ private final InfluxDBConfiguration configuration;
+ private final InfluxDBMetadataService influxDBMetadataService;
+
+ public InfluxDB1FilterCriteriaQueryCreatorImpl(InfluxDBConfiguration configuration,
+ InfluxDBMetadataService influxDBMetadataService) {
+ this.configuration = configuration;
+ this.influxDBMetadataService = influxDBMetadataService;
+ }
+
+ @Override
+ public String createQuery(FilterCriteria criteria, String retentionPolicy) {
+ final String itemName = criteria.getItemName();
+ final String tableName = getTableName(itemName);
+ final boolean hasCriteriaName = itemName != null;
+
+ Select select = select().column("\"" + COLUMN_VALUE_NAME_V1 + "\"::field")
+ .column("\"" + TAG_ITEM_NAME + "\"::tag")
+ .fromRaw(null, fullQualifiedTableName(retentionPolicy, tableName, hasCriteriaName));
+
+ Where where = select.where();
+
+ if (itemName != null && !tableName.equals(itemName)) {
+ where.and(BuiltQuery.QueryBuilder.eq(TAG_ITEM_NAME, itemName));
+ }
+ if (criteria.getBeginDate() != null) {
+ where.and(BuiltQuery.QueryBuilder.gte(COLUMN_TIME_NAME_V1, criteria.getBeginDate().toInstant().toString()));
+ }
+ if (criteria.getEndDate() != null) {
+ where.and(BuiltQuery.QueryBuilder.lte(COLUMN_TIME_NAME_V1, criteria.getEndDate().toInstant().toString()));
+ }
+
+ if (criteria.getState() != null && criteria.getOperator() != null) {
+ where.and(new SimpleClause(COLUMN_VALUE_NAME_V1,
+ getOperationSymbol(criteria.getOperator(), InfluxDBVersion.V1),
+ stateToObject(criteria.getState())));
+ }
+
+ if (criteria.getOrdering() == FilterCriteria.Ordering.DESCENDING) {
+ select = select.orderBy(desc());
+ } else if (criteria.getOrdering() == FilterCriteria.Ordering.ASCENDING) {
+ select = select.orderBy(asc());
+ }
+
+ if (criteria.getPageSize() != Integer.MAX_VALUE) {
+ if (criteria.getPageNumber() != 0) {
+ select = select.limit(criteria.getPageSize(), (long) criteria.getPageSize() * criteria.getPageNumber());
+ } else {
+ select = select.limit(criteria.getPageSize());
+ }
+ }
+
+ return ((Query) select).getCommand();
+ }
+
+ private String getTableName(@Nullable String itemName) {
+ if (itemName == null) {
+ return "/.*/";
+ }
+
+ String name = influxDBMetadataService.getMeasurementNameOrDefault(itemName, itemName);
+
+ if (configuration.isReplaceUnderscore()) {
+ name = name.replace('_', '.');
+ }
+
+ return name;
+ }
+
+ private String fullQualifiedTableName(String retentionPolicy, String tableName, boolean escapeTableName) {
+ StringBuilder sb = new StringBuilder();
+ sb.append('"').append(retentionPolicy).append('"');
+ sb.append(".");
+ if (escapeTableName) {
+ sb.append('"').append(tableName).append('"');
+ } else {
+ sb.append(tableName);
+ }
+ return sb.toString();
+ }
+}
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.influxdb.dto.Pong;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
+import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
+import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
import org.openhab.persistence.influxdb.internal.InfluxPoint;
-import org.openhab.persistence.influxdb.internal.InfluxRow;
-import org.openhab.persistence.influxdb.internal.UnnexpectedConditionException;
+import org.openhab.persistence.influxdb.internal.UnexpectedConditionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@NonNullByDefault
public class InfluxDB1RepositoryImpl implements InfluxDBRepository {
private final Logger logger = LoggerFactory.getLogger(InfluxDB1RepositoryImpl.class);
- private InfluxDBConfiguration configuration;
- @Nullable
- private InfluxDB client;
+ private final InfluxDBConfiguration configuration;
+ private final InfluxDBMetadataService influxDBMetadataService;
+ private @Nullable InfluxDB client;
- public InfluxDB1RepositoryImpl(InfluxDBConfiguration configuration) {
+ public InfluxDB1RepositoryImpl(InfluxDBConfiguration configuration,
+ InfluxDBMetadataService influxDBMetadataService) {
this.configuration = configuration;
+ this.influxDBMetadataService = influxDBMetadataService;
}
@Override
@Override
public void disconnect() {
+ final InfluxDB currentClient = client;
+ if (currentClient != null) {
+ currentClient.close();
+ }
this.client = null;
}
@Override
public boolean checkConnectionStatus() {
- boolean dbStatus = false;
final InfluxDB currentClient = client;
if (currentClient != null) {
try {
String version = pong.getVersion();
// may be check for version >= 0.9
if (version != null && !version.contains("unknown")) {
- dbStatus = true;
logger.debug("database status is OK, version is {}", version);
+ return true;
} else {
logger.warn("database ping error, version is: \"{}\" response time was \"{}\"", version,
pong.getResponseTime());
- dbStatus = false;
}
} catch (RuntimeException e) {
- dbStatus = false;
- logger.error("database connection failed", e);
- handleDatabaseException(e);
+ logger.warn("database error: {}", e.getMessage(), e);
}
} else {
logger.warn("checkConnection: database is not connected");
}
- return dbStatus;
- }
-
- private void handleDatabaseException(Exception e) {
- logger.warn("database error: {}", e.getMessage(), e);
+ return false;
}
@Override
- public void write(InfluxPoint point) {
+ public void write(InfluxPoint point) throws UnexpectedConditionException {
final InfluxDB currentClient = this.client;
if (currentClient != null) {
Point clientPoint = convertPointToClientFormat(point);
}
}
- private Point convertPointToClientFormat(InfluxPoint point) {
+ private Point convertPointToClientFormat(InfluxPoint point) throws UnexpectedConditionException {
Point.Builder clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime().toEpochMilli(),
TimeUnit.MILLISECONDS);
- setPointValue(point.getValue(), clientPoint);
- point.getTags().entrySet().forEach(e -> clientPoint.tag(e.getKey(), e.getValue()));
- return clientPoint.build();
- }
-
- private void setPointValue(@Nullable Object value, Point.Builder point) {
+ Object value = point.getValue();
if (value instanceof String) {
- point.addField(FIELD_VALUE_NAME, (String) value);
+ clientPoint.addField(FIELD_VALUE_NAME, (String) value);
} else if (value instanceof Number) {
- point.addField(FIELD_VALUE_NAME, (Number) value);
+ clientPoint.addField(FIELD_VALUE_NAME, (Number) value);
} else if (value instanceof Boolean) {
- point.addField(FIELD_VALUE_NAME, (Boolean) value);
+ clientPoint.addField(FIELD_VALUE_NAME, (Boolean) value);
} else if (value == null) {
- point.addField(FIELD_VALUE_NAME, (String) null);
+ clientPoint.addField(FIELD_VALUE_NAME, "null");
} else {
- throw new UnnexpectedConditionException("Not expected value type");
+ throw new UnexpectedConditionException("Not expected value type");
}
+ point.getTags().forEach(clientPoint::tag);
+ return clientPoint.build();
}
@Override
if (currentClient != null) {
Query parsedQuery = new Query(query, configuration.getDatabaseName());
List<QueryResult.Result> results = currentClient.query(parsedQuery, TimeUnit.MILLISECONDS).getResults();
- return convertClientResutToRepository(results);
+ return convertClientResultToRepository(results);
} else {
logger.warn("Returning empty list because queryAPI isn't present");
- return Collections.emptyList();
+ return List.of();
}
}
- private List<InfluxRow> convertClientResutToRepository(List<QueryResult.Result> results) {
+ private List<InfluxRow> convertClientResultToRepository(List<QueryResult.Result> results) {
List<InfluxRow> rows = new ArrayList<>();
for (QueryResult.Result result : results) {
- List<QueryResult.Series> seriess = result.getSeries();
+ List<QueryResult.Series> allSeries = result.getSeries();
if (result.getError() != null) {
logger.warn("{}", result.getError());
continue;
}
- if (seriess == null) {
+ if (allSeries == null) {
logger.debug("query returned no series");
} else {
- for (QueryResult.Series series : seriess) {
- logger.trace("series {}", series.toString());
- List<List<@Nullable Object>> valuess = series.getValues();
- if (valuess == null) {
+ for (QueryResult.Series series : allSeries) {
+ logger.trace("series {}", series);
+ String defaultItemName = series.getName();
+ List<List<Object>> allValues = series.getValues();
+ if (allValues == null) {
logger.debug("query returned no values");
} else {
List<String> columns = series.getColumns();
logger.trace("columns {}", columns);
if (columns != null) {
- Integer timestampColumn = null;
- Integer valueColumn = null;
- Integer itemNameColumn = null;
- for (int i = 0; i < columns.size(); i++) {
- String columnName = columns.get(i);
- if (columnName.equals(COLUMN_TIME_NAME_V1)) {
- timestampColumn = i;
- } else if (columnName.equals(COLUMN_VALUE_NAME_V1)) {
- valueColumn = i;
- } else if (columnName.equals(TAG_ITEM_NAME)) {
- itemNameColumn = i;
- }
- }
- if (valueColumn == null || timestampColumn == null) {
+ int timestampColumn = columns.indexOf(COLUMN_TIME_NAME_V1);
+ int valueColumn = columns.indexOf(COLUMN_VALUE_NAME_V1);
+ int itemNameColumn = columns.indexOf(TAG_ITEM_NAME);
+ if (valueColumn == -1 || timestampColumn == -1) {
throw new IllegalStateException("missing column");
}
- for (int i = 0; i < valuess.size(); i++) {
- Double rawTime = (Double) Objects.requireNonNull(valuess.get(i).get(timestampColumn));
+ for (List<Object> valueObject : allValues) {
+ Double rawTime = (Double) valueObject.get(timestampColumn);
Instant time = Instant.ofEpochMilli(rawTime.longValue());
- @Nullable
- Object value = valuess.get(i).get(valueColumn);
- var currentI = i;
- String itemName = Optional.ofNullable(itemNameColumn)
- .flatMap(inc -> Optional.ofNullable((String) valuess.get(currentI).get(inc)))
- .orElse(series.getName());
+ Object value = valueObject.get(valueColumn);
+ String itemName = itemNameColumn == -1 ? defaultItemName
+ : Objects.requireNonNullElse((String) valueObject.get(itemNameColumn),
+ defaultItemName);
logger.trace("adding historic item {}: time {} value {}", itemName, time, value);
rows.add(new InfluxRow(time, itemName, value));
}
public Map<String, Integer> getStoredItemsCount() {
return Collections.emptyMap();
}
+
+ @Override
+ public FilterCriteriaQueryCreator createQueryCreator() {
+ return new InfluxDB1FilterCriteriaQueryCreatorImpl(configuration, influxDBMetadataService);
+ }
}
+++ /dev/null
-/**
- * Copyright (c) 2010-2023 Contributors to the openHAB project
- *
- * See the NOTICE file(s) distributed with this work for additional
- * information.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License 2.0 which is available at
- * http://www.eclipse.org/legal/epl-2.0
- *
- * SPDX-License-Identifier: EPL-2.0
- */
-package org.openhab.persistence.influxdb.internal.influx2;
-
-import static com.influxdb.query.dsl.functions.restriction.Restrictions.measurement;
-import static com.influxdb.query.dsl.functions.restriction.Restrictions.tag;
-import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.*;
-import static org.openhab.persistence.influxdb.internal.InfluxDBStateConvertUtils.stateToObject;
-
-import java.time.temporal.ChronoUnit;
-
-import org.eclipse.jdt.annotation.NonNullByDefault;
-import org.openhab.core.items.MetadataRegistry;
-import org.openhab.core.persistence.FilterCriteria;
-import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
-import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
-import org.openhab.persistence.influxdb.internal.InfluxDBMetadataUtils;
-import org.openhab.persistence.influxdb.internal.InfluxDBVersion;
-
-import com.influxdb.query.dsl.Flux;
-import com.influxdb.query.dsl.functions.RangeFlux;
-import com.influxdb.query.dsl.functions.restriction.Restrictions;
-
-/**
- * Implementation of {@link FilterCriteriaQueryCreator} for InfluxDB 2.0
- *
- * @author Joan Pujol Espinar - Initial contribution
- */
-@NonNullByDefault
-public class Influx2FilterCriteriaQueryCreatorImpl implements FilterCriteriaQueryCreator {
-
- private InfluxDBConfiguration configuration;
- private MetadataRegistry metadataRegistry;
-
- public Influx2FilterCriteriaQueryCreatorImpl(InfluxDBConfiguration configuration,
- MetadataRegistry metadataRegistry) {
- this.configuration = configuration;
- this.metadataRegistry = metadataRegistry;
- }
-
- @Override
- public String createQuery(FilterCriteria criteria, String retentionPolicy) {
- Flux flux = Flux.from(retentionPolicy);
-
- RangeFlux range = flux.range();
- if (criteria.getBeginDate() != null) {
- range = range.withStart(criteria.getBeginDate().toInstant());
- } else {
- range = flux.range(-100L, ChronoUnit.YEARS); // Flux needs a mandatory start range
- }
- if (criteria.getEndDate() != null) {
- range = range.withStop(criteria.getEndDate().toInstant());
- }
- flux = range;
-
- String itemName = criteria.getItemName();
- if (itemName != null) {
- String measurementName = calculateMeasurementName(itemName);
- boolean needsToUseItemTagName = !measurementName.equals(itemName);
-
- flux = flux.filter(measurement().equal(measurementName));
- if (needsToUseItemTagName) {
- flux = flux.filter(tag(TAG_ITEM_NAME).equal(itemName));
- }
-
- if (needsToUseItemTagName) {
- flux = flux.keep(new String[] { FIELD_MEASUREMENT_NAME, COLUMN_TIME_NAME_V2, COLUMN_VALUE_NAME_V2,
- TAG_ITEM_NAME });
- } else {
- flux = flux.keep(new String[] { FIELD_MEASUREMENT_NAME, COLUMN_TIME_NAME_V2, COLUMN_VALUE_NAME_V2 });
- }
- }
-
- if (criteria.getState() != null && criteria.getOperator() != null) {
- Restrictions restrictions = Restrictions.and(Restrictions.field().equal(FIELD_VALUE_NAME),
- Restrictions.value().custom(stateToObject(criteria.getState()),
- getOperationSymbol(criteria.getOperator(), InfluxDBVersion.V2)));
- flux = flux.filter(restrictions);
- }
-
- flux = applyOrderingAndPageSize(criteria, flux);
-
- return flux.toString();
- }
-
- private Flux applyOrderingAndPageSize(FilterCriteria criteria, Flux flux) {
- var lastOptimization = criteria.getOrdering() == FilterCriteria.Ordering.DESCENDING
- && criteria.getPageSize() == 1;
-
- if (lastOptimization) {
- flux = flux.last();
- } else {
- if (criteria.getOrdering() != null) {
- boolean desc = criteria.getOrdering() == FilterCriteria.Ordering.DESCENDING;
- flux = flux.sort().withDesc(desc).withColumns(new String[] { COLUMN_TIME_NAME_V2 });
- }
-
- if (criteria.getPageSize() != Integer.MAX_VALUE) {
- flux = flux.limit(criteria.getPageSize()).withPropertyValue("offset",
- criteria.getPageNumber() * criteria.getPageSize());
- }
- }
- return flux;
- }
-
- private String calculateMeasurementName(String itemName) {
- String name = itemName;
-
- name = InfluxDBMetadataUtils.calculateMeasurementNameFromMetadataIfPresent(metadataRegistry, name, itemName);
-
- if (configuration.isReplaceUnderscore()) {
- name = name.replace('_', '.');
- }
-
- return name;
- }
-}
--- /dev/null
+/**
+ * Copyright (c) 2010-2023 Contributors to the openHAB project
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.openhab.persistence.influxdb.internal.influx2;
+
+import static com.influxdb.query.dsl.functions.restriction.Restrictions.measurement;
+import static com.influxdb.query.dsl.functions.restriction.Restrictions.tag;
+import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.*;
+import static org.openhab.persistence.influxdb.internal.InfluxDBStateConvertUtils.stateToObject;
+
+import java.time.temporal.ChronoUnit;
+
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.openhab.core.persistence.FilterCriteria;
+import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
+import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
+import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
+import org.openhab.persistence.influxdb.internal.InfluxDBVersion;
+
+import com.influxdb.query.dsl.Flux;
+import com.influxdb.query.dsl.functions.RangeFlux;
+import com.influxdb.query.dsl.functions.restriction.Restrictions;
+
+/**
+ * Implementation of {@link FilterCriteriaQueryCreator} for InfluxDB 2.0
+ *
+ * @author Joan Pujol Espinar - Initial contribution
+ */
+@NonNullByDefault
+public class InfluxDB2FilterCriteriaQueryCreatorImpl implements FilterCriteriaQueryCreator {
+ private final InfluxDBConfiguration configuration;
+ private final InfluxDBMetadataService influxDBMetadataService;
+
+ public InfluxDB2FilterCriteriaQueryCreatorImpl(InfluxDBConfiguration configuration,
+ InfluxDBMetadataService influxDBMetadataService) {
+ this.configuration = configuration;
+ this.influxDBMetadataService = influxDBMetadataService;
+ }
+
+ @Override
+ public String createQuery(FilterCriteria criteria, String retentionPolicy) {
+ Flux flux = Flux.from(retentionPolicy);
+
+ RangeFlux range = flux.range();
+ if (criteria.getBeginDate() != null) {
+ range.withStart(criteria.getBeginDate().toInstant());
+ } else {
+ range = flux.range(-100L, ChronoUnit.YEARS); // Flux needs a mandatory start range
+ }
+ if (criteria.getEndDate() != null) {
+ range.withStop(criteria.getEndDate().toInstant());
+ }
+ flux = range;
+
+ String itemName = criteria.getItemName();
+ if (itemName != null) {
+ String name = influxDBMetadataService.getMeasurementNameOrDefault(itemName, itemName);
+ String measurementName = configuration.isReplaceUnderscore() ? name.replace('_', '.') : name;
+ flux = flux.filter(measurement().equal(measurementName));
+ if (!measurementName.equals(itemName)) {
+ flux = flux.filter(tag(TAG_ITEM_NAME).equal(itemName));
+ flux = flux.keep(new String[] { FIELD_MEASUREMENT_NAME, COLUMN_TIME_NAME_V2, COLUMN_VALUE_NAME_V2,
+ TAG_ITEM_NAME });
+ } else {
+ flux = flux.keep(new String[] { FIELD_MEASUREMENT_NAME, COLUMN_TIME_NAME_V2, COLUMN_VALUE_NAME_V2 });
+ }
+ }
+
+ if (criteria.getState() != null && criteria.getOperator() != null) {
+ Restrictions restrictions = Restrictions.and(Restrictions.field().equal(FIELD_VALUE_NAME),
+ Restrictions.value().custom(stateToObject(criteria.getState()),
+ getOperationSymbol(criteria.getOperator(), InfluxDBVersion.V2)));
+ flux = flux.filter(restrictions);
+ }
+
+ flux = applyOrderingAndPageSize(criteria, flux);
+
+ return flux.toString();
+ }
+
+ private Flux applyOrderingAndPageSize(FilterCriteria criteria, Flux flux) {
+ var lastOptimization = criteria.getOrdering() == FilterCriteria.Ordering.DESCENDING
+ && criteria.getPageSize() == 1;
+
+ if (lastOptimization) {
+ flux = flux.last();
+ } else {
+ if (criteria.getOrdering() != null) {
+ boolean desc = criteria.getOrdering() == FilterCriteria.Ordering.DESCENDING;
+ flux = flux.sort().withDesc(desc).withColumns(new String[] { COLUMN_TIME_NAME_V2 });
+ }
+
+ if (criteria.getPageSize() != Integer.MAX_VALUE) {
+ flux = flux.limit(criteria.getPageSize()).withPropertyValue("offset",
+ criteria.getPageNumber() * criteria.getPageSize());
+ }
+ }
+ return flux;
+ }
+}
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
+import java.util.Objects;
import java.util.stream.Stream;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
+import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
import org.openhab.persistence.influxdb.internal.InfluxDBConstants;
+import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
import org.openhab.persistence.influxdb.internal.InfluxPoint;
-import org.openhab.persistence.influxdb.internal.InfluxRow;
-import org.openhab.persistence.influxdb.internal.UnnexpectedConditionException;
+import org.openhab.persistence.influxdb.internal.UnexpectedConditionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@NonNullByDefault
public class InfluxDB2RepositoryImpl implements InfluxDBRepository {
private final Logger logger = LoggerFactory.getLogger(InfluxDB2RepositoryImpl.class);
- private InfluxDBConfiguration configuration;
- @Nullable
- private InfluxDBClient client;
- @Nullable
- private QueryApi queryAPI;
- @Nullable
- private WriteApi writeAPI;
-
- public InfluxDB2RepositoryImpl(InfluxDBConfiguration configuration) {
+ private final InfluxDBConfiguration configuration;
+ private final InfluxDBMetadataService influxDBMetadataService;
+
+ private @Nullable InfluxDBClient client;
+ private @Nullable QueryApi queryAPI;
+ private @Nullable WriteApi writeAPI;
+
+ public InfluxDB2RepositoryImpl(InfluxDBConfiguration configuration,
+ InfluxDBMetadataService influxDBMetadataService) {
this.configuration = configuration;
+ this.influxDBMetadataService = influxDBMetadataService;
}
- /**
- * Returns if the client has been successfully connected to server
- *
- * @return True if it's connected, otherwise false
- */
@Override
public boolean isConnected() {
return client != null;
}
- /**
- * Connect to InfluxDB server
- *
- * @return True if successful, otherwise false
- */
@Override
public boolean connect() {
InfluxDBClientOptions.Builder optionsBuilder = InfluxDBClientOptions.builder().url(configuration.getUrl())
.org(configuration.getDatabaseName()).bucket(configuration.getRetentionPolicy());
- char[] token = configuration.getTokenAsCharArray();
+ char[] token = configuration.getToken().toCharArray();
if (token.length > 0) {
optionsBuilder.authenticateToken(token);
} else {
final InfluxDBClient createdClient = InfluxDBClientFactory.create(clientOptions);
this.client = createdClient;
- logger.debug("Succesfully connected to InfluxDB. Instance ready={}", createdClient.ready());
+
queryAPI = createdClient.getQueryApi();
writeAPI = createdClient.getWriteApi();
+ logger.debug("Successfully connected to InfluxDB. Instance ready={}", createdClient.ready());
+
return checkConnectionStatus();
}
- /**
- * Disconnect from InfluxDB server
- */
@Override
public void disconnect() {
final InfluxDBClient currentClient = this.client;
this.client = null;
}
- /**
- * Check if connection is currently ready
- *
- * @return True if its ready, otherwise false
- */
@Override
public boolean checkConnectionStatus() {
final InfluxDBClient currentClient = client;
}
}
- /**
- * Write point to database
- *
- * @param point
- */
@Override
- public void write(InfluxPoint point) {
+ public void write(InfluxPoint point) throws UnexpectedConditionException {
final WriteApi currentWriteAPI = writeAPI;
if (currentWriteAPI != null) {
currentWriteAPI.writePoint(convertPointToClientFormat(point));
}
}
- private Point convertPointToClientFormat(InfluxPoint point) {
+ private Point convertPointToClientFormat(InfluxPoint point) throws UnexpectedConditionException {
Point clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime(), WritePrecision.MS);
setPointValue(point.getValue(), clientPoint);
- point.getTags().entrySet().forEach(e -> clientPoint.addTag(e.getKey(), e.getValue()));
+ point.getTags().forEach(clientPoint::addTag);
return clientPoint;
}
- private void setPointValue(@Nullable Object value, Point point) {
+ private void setPointValue(@Nullable Object value, Point point) throws UnexpectedConditionException {
if (value instanceof String) {
point.addField(FIELD_VALUE_NAME, (String) value);
} else if (value instanceof Number) {
} else if (value == null) {
point.addField(FIELD_VALUE_NAME, (String) null);
} else {
- throw new UnnexpectedConditionException("Not expected value type");
+ throw new UnexpectedConditionException("Not expected value type");
}
}
- /**
- * Executes Flux query
- *
- * @param query Query
- * @return Query results
- */
@Override
public List<InfluxRow> query(String query) {
final QueryApi currentQueryAPI = queryAPI;
if (currentQueryAPI != null) {
List<FluxTable> clientResult = currentQueryAPI.query(query);
- return convertClientResutToRepository(clientResult);
+ return clientResult.stream().flatMap(this::mapRawResultToHistoric).toList();
} else {
logger.warn("Returning empty list because queryAPI isn't present");
- return Collections.emptyList();
+ return List.of();
}
}
- private List<InfluxRow> convertClientResutToRepository(List<FluxTable> clientResult) {
- return clientResult.stream().flatMap(this::mapRawResultToHistoric).collect(Collectors.toList());
- }
-
private Stream<InfluxRow> mapRawResultToHistoric(FluxTable rawRow) {
return rawRow.getRecords().stream().map(r -> {
String itemName = (String) r.getValueByKey(InfluxDBConstants.TAG_ITEM_NAME);
- if (itemName == null) { // use measurement name if item is not tagged
- itemName = r.getMeasurement();
- }
Object value = r.getValueByKey(COLUMN_VALUE_NAME_V2);
Instant time = (Instant) r.getValueByKey(COLUMN_TIME_NAME_V2);
return new InfluxRow(time, itemName, value);
});
}
- /**
- * Return all stored item names with it's count of stored points
- *
- * @return Map with <ItemName,ItemCount> entries
- */
@Override
public Map<String, Integer> getStoredItemsCount() {
final QueryApi currentQueryAPI = queryAPI;
+ " |> group()";
List<FluxTable> queryResult = currentQueryAPI.query(query);
- queryResult.stream().findFirst().orElse(new FluxTable()).getRecords().forEach(row -> {
- result.put((String) row.getValueByKey(TAG_ITEM_NAME), ((Number) row.getValue()).intValue());
- });
+ Objects.requireNonNull(queryResult.stream().findFirst().orElse(new FluxTable())).getRecords()
+ .forEach(row -> {
+ result.put((String) row.getValueByKey(TAG_ITEM_NAME), ((Number) row.getValue()).intValue());
+ });
return result;
} else {
logger.warn("Returning empty result because queryAPI isn't present");
return Collections.emptyMap();
}
}
+
+ @Override
+ public FilterCriteriaQueryCreator createQueryCreator() {
+ return new InfluxDB2FilterCriteriaQueryCreatorImpl(configuration, influxDBMetadataService);
+ }
}
+++ /dev/null
-/**
- * Copyright (c) 2010-2023 Contributors to the openHAB project
- *
- * See the NOTICE file(s) distributed with this work for additional
- * information.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License 2.0 which is available at
- * http://www.eclipse.org/legal/epl-2.0
- *
- * SPDX-License-Identifier: EPL-2.0
- */
-package org.openhab.persistence.influxdb.internal;
-
-import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.*;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.eclipse.jdt.annotation.NonNullByDefault;
-
-/**
- * @author Joan Pujol Espinar - Initial contribution
- */
-@NonNullByDefault
-public class ConfigurationTestHelper {
-
- public static Map<String, Object> createValidConfigurationParameters() {
- Map<String, Object> config = new HashMap<>();
- config.put(URL_PARAM, "http://localhost:8086");
- config.put(VERSION_PARAM, InfluxDBVersion.V2.name());
- config.put(TOKEN_PARAM, "sampletoken");
- config.put(DATABASE_PARAM, "openhab");
- config.put(RETENTION_POLICY_PARAM, "default");
- return config;
- }
-
- public static InfluxDBConfiguration createValidConfiguration() {
- return new InfluxDBConfiguration(createValidConfigurationParameters());
- }
-
- public static Map<String, Object> createInvalidConfigurationParameters() {
- Map<String, Object> config = createValidConfigurationParameters();
- config.remove(TOKEN_PARAM);
- return config;
- }
-}
*/
package org.openhab.persistence.influxdb.internal;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
+import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.DATABASE_PARAM;
+import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.PASSWORD_PARAM;
+import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.RETENTION_POLICY_PARAM;
+import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.TOKEN_PARAM;
+import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.URL_PARAM;
+import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.USER_PARAM;
+import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.VERSION_PARAM;
import java.util.Map;
-import org.eclipse.jdt.annotation.DefaultLocation;
import org.eclipse.jdt.annotation.NonNullByDefault;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
* @author Joan Pujol Espinar - Initial contribution
*/
@ExtendWith(MockitoExtension.class)
-@NonNullByDefault(value = { DefaultLocation.PARAMETER, DefaultLocation.RETURN_TYPE })
+@NonNullByDefault
public class InfluxDBPersistenceServiceTest {
- private InfluxDBPersistenceService instance;
+ private static final Map<String, Object> VALID_V1_CONFIGURATION = Map.of( //
+ URL_PARAM, "http://localhost:8086", //
+ VERSION_PARAM, InfluxDBVersion.V1.name(), //
+ USER_PARAM, "user", PASSWORD_PARAM, "password", //
+ DATABASE_PARAM, "openhab", //
+ RETENTION_POLICY_PARAM, "default");
- private @Mock InfluxDBRepository influxDBRepository;
+ private static final Map<String, Object> VALID_V2_CONFIGURATION = Map.of( //
+ URL_PARAM, "http://localhost:8086", //
+ VERSION_PARAM, InfluxDBVersion.V2.name(), //
+ TOKEN_PARAM, "sampletoken", //
+ DATABASE_PARAM, "openhab", //
+ RETENTION_POLICY_PARAM, "default");
- private Map<String, Object> validConfig;
- private Map<String, Object> invalidConfig;
+ private static final Map<String, Object> INVALID_V1_CONFIGURATION = Map.of(//
+ URL_PARAM, "http://localhost:8086", //
+ VERSION_PARAM, InfluxDBVersion.V1.name(), //
+ USER_PARAM, "user", //
+ DATABASE_PARAM, "openhab", //
+ RETENTION_POLICY_PARAM, "default");
- @BeforeEach
- public void before() {
- instance = new InfluxDBPersistenceService(mock(ItemRegistry.class), mock(MetadataRegistry.class)) {
- @Override
- protected InfluxDBRepository createInfluxDBRepository() {
- return influxDBRepository;
- }
- };
+ private static final Map<String, Object> INVALID_V2_CONFIGURATION = Map.of( //
+ URL_PARAM, "http://localhost:8086", //
+ VERSION_PARAM, InfluxDBVersion.V2.name(), //
+ DATABASE_PARAM, "openhab", //
+ RETENTION_POLICY_PARAM, "default");
- validConfig = ConfigurationTestHelper.createValidConfigurationParameters();
- invalidConfig = ConfigurationTestHelper.createInvalidConfigurationParameters();
- }
+ private @Mock @NonNullByDefault({}) InfluxDBRepository influxDBRepositoryMock;
+
+ private final InfluxDBMetadataService influxDBMetadataService = new InfluxDBMetadataService(
+ mock(MetadataRegistry.class));
- @AfterEach
- public void after() {
- validConfig = null;
- invalidConfig = null;
- instance = null;
- influxDBRepository = null;
+ @Test
+ public void activateWithValidV1ConfigShouldConnectRepository() {
+ getService(VALID_V1_CONFIGURATION);
+ verify(influxDBRepositoryMock).connect();
}
@Test
- public void activateWithValidConfigShouldConnectRepository() {
- instance.activate(validConfig);
- verify(influxDBRepository).connect();
+ public void activateWithValidV2ConfigShouldConnectRepository() {
+ getService(VALID_V2_CONFIGURATION);
+ verify(influxDBRepositoryMock).connect();
}
@Test
- public void activateWithInvalidConfigShouldNotConnectRepository() {
- instance.activate(invalidConfig);
- verify(influxDBRepository, never()).connect();
+ public void activateWithInvalidV1ConfigShouldFail() {
+ assertThrows(IllegalArgumentException.class, () -> getService(INVALID_V1_CONFIGURATION));
}
@Test
- public void activateWithNullConfigShouldNotConnectRepository() {
- instance.activate(null);
- verify(influxDBRepository, never()).connect();
+ public void activateWithInvalidV2ShouldFail() {
+ assertThrows(IllegalArgumentException.class, () -> getService(INVALID_V2_CONFIGURATION));
}
@Test
public void deactivateShouldDisconnectRepository() {
- instance.activate(validConfig);
+ InfluxDBPersistenceService instance = getService(VALID_V2_CONFIGURATION);
instance.deactivate();
- verify(influxDBRepository).disconnect();
+ verify(influxDBRepositoryMock).disconnect();
}
@Test
- public void storeItemWithConnectedRepository() {
- instance.activate(validConfig);
- when(influxDBRepository.isConnected()).thenReturn(true);
+ public void storeItemWithConnectedRepository() throws UnexpectedConditionException {
+ InfluxDBPersistenceService instance = getService(VALID_V2_CONFIGURATION);
+ when(influxDBRepositoryMock.isConnected()).thenReturn(true);
instance.store(ItemTestHelper.createNumberItem("number", 5));
- verify(influxDBRepository).write(any());
+ verify(influxDBRepositoryMock).write(any());
}
@Test
- public void storeItemWithDisconnectedRepositoryIsIgnored() {
- instance.activate(validConfig);
- when(influxDBRepository.isConnected()).thenReturn(false);
+ public void storeItemWithDisconnectedRepositoryIsIgnored() throws UnexpectedConditionException {
+ InfluxDBPersistenceService instance = getService(VALID_V2_CONFIGURATION);
+ when(influxDBRepositoryMock.isConnected()).thenReturn(false);
instance.store(ItemTestHelper.createNumberItem("number", 5));
- verify(influxDBRepository, never()).write(any());
+ verify(influxDBRepositoryMock, never()).write(any());
+ }
+
+ private InfluxDBPersistenceService getService(Map<String, Object> config) {
+ return new InfluxDBPersistenceService(mock(ItemRegistry.class), influxDBMetadataService, config) {
+ @Override
+ protected InfluxDBRepository createInfluxDBRepository() {
+ return influxDBRepositoryMock;
+ }
+ };
}
}
import org.openhab.core.library.types.PercentType;
import org.openhab.core.persistence.FilterCriteria;
import org.openhab.persistence.influxdb.InfluxDBPersistenceService;
-import org.openhab.persistence.influxdb.internal.influx1.Influx1FilterCriteriaQueryCreatorImpl;
-import org.openhab.persistence.influxdb.internal.influx2.Influx2FilterCriteriaQueryCreatorImpl;
+import org.openhab.persistence.influxdb.internal.influx1.InfluxDB1FilterCriteriaQueryCreatorImpl;
+import org.openhab.persistence.influxdb.internal.influx2.InfluxDB2FilterCriteriaQueryCreatorImpl;
/**
* @author Joan Pujol Espinar - Initial contribution
private @Mock InfluxDBConfiguration influxDBConfiguration;
private @Mock MetadataRegistry metadataRegistry;
- private Influx1FilterCriteriaQueryCreatorImpl instanceV1;
- private Influx2FilterCriteriaQueryCreatorImpl instanceV2;
+ private InfluxDB1FilterCriteriaQueryCreatorImpl instanceV1;
+ private InfluxDB2FilterCriteriaQueryCreatorImpl instanceV2;
@BeforeEach
public void before() {
- instanceV1 = new Influx1FilterCriteriaQueryCreatorImpl(influxDBConfiguration, metadataRegistry);
- instanceV2 = new Influx2FilterCriteriaQueryCreatorImpl(influxDBConfiguration, metadataRegistry);
+ InfluxDBMetadataService influxDBMetadataService = new InfluxDBMetadataService(metadataRegistry);
+ instanceV1 = new InfluxDB1FilterCriteriaQueryCreatorImpl(influxDBConfiguration, influxDBMetadataService);
+ instanceV2 = new InfluxDB2FilterCriteriaQueryCreatorImpl(influxDBConfiguration, influxDBMetadataService);
}
@AfterEach
assertThat(queryV1, equalTo("SELECT \"value\"::field,\"item\"::tag FROM \"origin\".\"sampleItem\";"));
String queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
- assertThat(queryV2,
- equalTo("from(bucket:\"origin\")\n\t" + "|> range(start:-100y)\n\t"
- + "|> filter(fn: (r) => r[\"_measurement\"] == \"sampleItem\")\n\t"
- + "|> keep(columns:[\"_measurement\", \"_time\", \"_value\"])"));
+ assertThat(queryV2, equalTo("""
+ from(bucket:"origin")
+ \t|> range(start:-100y)
+ \t|> filter(fn: (r) => r["_measurement"] == "sampleItem")
+ \t|> keep(columns:["_measurement", "_time", "_value"])"""));
}
@Test
assertThat(queryV1, equalTo(expectedQueryV1));
String queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
- String expectedQueryV2 = String.format(
- "from(bucket:\"origin\")\n\t" + "|> range(start:%s, stop:%s)\n\t"
- + "|> filter(fn: (r) => r[\"_measurement\"] == \"sampleItem\")\n\t"
- + "|> keep(columns:[\"_measurement\", \"_time\", \"_value\"])",
+ String expectedQueryV2 = String.format("""
+ from(bucket:"origin")
+ \t|> range(start:%s, stop:%s)
+ \t|> filter(fn: (r) => r["_measurement"] == "sampleItem")
+ \t|> keep(columns:["_measurement", "_time", "_value"])""",
INFLUX2_DATE_FORMATTER.format(now.toInstant()), INFLUX2_DATE_FORMATTER.format(tomorrow.toInstant()));
assertThat(queryV2, equalTo(expectedQueryV2));
}
equalTo("SELECT \"value\"::field,\"item\"::tag FROM \"origin\".\"sampleItem\" WHERE value <= 90;"));
String queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
- assertThat(queryV2,
- equalTo("from(bucket:\"origin\")\n\t" + "|> range(start:-100y)\n\t"
- + "|> filter(fn: (r) => r[\"_measurement\"] == \"sampleItem\")\n\t"
- + "|> keep(columns:[\"_measurement\", \"_time\", \"_value\"])\n\t"
- + "|> filter(fn: (r) => (r[\"_field\"] == \"value\" and r[\"_value\"] <= 90))"));
+ assertThat(queryV2, equalTo("""
+ from(bucket:"origin")
+ \t|> range(start:-100y)
+ \t|> filter(fn: (r) => r["_measurement"] == "sampleItem")
+ \t|> keep(columns:["_measurement", "_time", "_value"])
+ \t|> filter(fn: (r) => (r["_field"] == "value" and r["_value"] <= 90))"""));
}
@Test
equalTo("SELECT \"value\"::field,\"item\"::tag FROM \"origin\".\"sampleItem\" LIMIT 10 OFFSET 20;"));
String queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
- assertThat(queryV2, equalTo("from(bucket:\"origin\")\n\t" + "|> range(start:-100y)\n\t"
- + "|> filter(fn: (r) => r[\"_measurement\"] == \"sampleItem\")\n\t"
- + "|> keep(columns:[\"_measurement\", \"_time\", \"_value\"])\n\t" + "|> limit(n:10, offset:20)"));
+ assertThat(queryV2, equalTo("""
+ from(bucket:"origin")
+ \t|> range(start:-100y)
+ \t|> filter(fn: (r) => r["_measurement"] == "sampleItem")
+ \t|> keep(columns:["_measurement", "_time", "_value"])
+ \t|> limit(n:10, offset:20)"""));
}
@Test
equalTo("SELECT \"value\"::field,\"item\"::tag FROM \"origin\".\"sampleItem\" ORDER BY time ASC;"));
String queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
- assertThat(queryV2,
- equalTo("from(bucket:\"origin\")\n\t" + "|> range(start:-100y)\n\t"
- + "|> filter(fn: (r) => r[\"_measurement\"] == \"sampleItem\")\n\t"
- + "|> keep(columns:[\"_measurement\", \"_time\", \"_value\"])\n\t"
- + "|> sort(desc:false, columns:[\"_time\"])"));
+ assertThat(queryV2, equalTo("""
+ from(bucket:"origin")
+ \t|> range(start:-100y)
+ \t|> filter(fn: (r) => r["_measurement"] == "sampleItem")
+ \t|> keep(columns:["_measurement", "_time", "_value"])
+ \t|> sort(desc:false, columns:["_time"])"""));
}
@Test
criteria.setOrdering(FilterCriteria.Ordering.DESCENDING);
criteria.setPageSize(1);
String queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
- assertThat(queryV2,
- equalTo("from(bucket:\"origin\")\n\t" + "|> range(start:-100y)\n\t"
- + "|> filter(fn: (r) => r[\"_measurement\"] == \"sampleItem\")\n\t"
- + "|> keep(columns:[\"_measurement\", \"_time\", \"_value\"])\n\t" + "|> last()"));
+ assertThat(queryV2, equalTo("""
+ from(bucket:"origin")
+ \t|> range(start:-100y)
+ \t|> filter(fn: (r) => r["_measurement"] == "sampleItem")
+ \t|> keep(columns:["_measurement", "_time", "_value"])
+ \t|> last()"""));
}
private FilterCriteria createBaseCriteria() {
- return createBaseCriteria(ITEM_NAME);
- }
-
- private FilterCriteria createBaseCriteria(String sampleItem) {
FilterCriteria criteria = new FilterCriteria();
- criteria.setItemName(sampleItem);
+ criteria.setItemName(ITEM_NAME);
criteria.setOrdering(null);
return criteria;
}
"SELECT \"value\"::field,\"item\"::tag FROM \"origin\".\"measurementName\" WHERE item = 'sampleItem';"));
String queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
- assertThat(queryV2,
- equalTo("from(bucket:\"origin\")\n\t" + "|> range(start:-100y)\n\t"
- + "|> filter(fn: (r) => r[\"_measurement\"] == \"measurementName\")\n\t"
- + "|> filter(fn: (r) => r[\"item\"] == \"sampleItem\")\n\t"
- + "|> keep(columns:[\"_measurement\", \"_time\", \"_value\", \"item\"])"));
-
+ assertThat(queryV2, equalTo("""
+ from(bucket:"origin")
+ \t|> range(start:-100y)
+ \t|> filter(fn: (r) => r["_measurement"] == "measurementName")
+ \t|> filter(fn: (r) => r["item"] == "sampleItem")
+ \t|> keep(columns:["_measurement", "_time", "_value", "item"])"""));
when(metadataRegistry.get(metadataKey))
.thenReturn(new Metadata(metadataKey, "", Map.of("key1", "val1", "key2", "val2")));
assertThat(queryV1, equalTo("SELECT \"value\"::field,\"item\"::tag FROM \"origin\".\"sampleItem\";"));
queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
- assertThat(queryV2,
- equalTo("from(bucket:\"origin\")\n\t" + "|> range(start:-100y)\n\t"
- + "|> filter(fn: (r) => r[\"_measurement\"] == \"sampleItem\")\n\t"
- + "|> keep(columns:[\"_measurement\", \"_time\", \"_value\"])"));
+ assertThat(queryV2, equalTo("""
+ from(bucket:"origin")
+ \t|> range(start:-100y)
+ \t|> filter(fn: (r) => r["_measurement"] == "sampleItem")
+ \t|> keep(columns:["_measurement", "_time", "_value"])"""));
}
}
import org.eclipse.jdt.annotation.DefaultLocation;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
* @author Joan Pujol Espinar - Initial contribution
*/
@ExtendWith(MockitoExtension.class)
-@SuppressWarnings("null") // In case of any NPE it will cause test fail that it's the expected result
@NonNullByDefault(value = { DefaultLocation.PARAMETER, DefaultLocation.RETURN_TYPE })
public class ItemToStorePointCreatorTest {
@BeforeEach
public void before() {
+ InfluxDBMetadataService influxDBMetadataService = new InfluxDBMetadataService(metadataRegistry);
when(influxDBConfiguration.isAddCategoryTag()).thenReturn(false);
when(influxDBConfiguration.isAddLabelTag()).thenReturn(false);
when(influxDBConfiguration.isAddTypeTag()).thenReturn(false);
when(influxDBConfiguration.isReplaceUnderscore()).thenReturn(false);
- instance = new ItemToStorePointCreator(influxDBConfiguration, metadataRegistry);
+ instance = new ItemToStorePointCreator(influxDBConfiguration, influxDBMetadataService);
}
@AfterEach
NumberItem item = ItemTestHelper.createNumberItem("myitem", number);
InfluxPoint point = instance.convert(item, null);
+ if (point == null) {
+ Assertions.fail("'point' is null");
+ return;
+ }
+
assertThat(point.getMeasurementName(), equalTo(item.getName()));
assertThat("Must Store item name", point.getTags(), hasEntry("item", item.getName()));
assertThat(point.getValue(), equalTo(new BigDecimal(number.toString())));
}
+ @SuppressWarnings("unused")
private static Stream<Number> convertBasicItem() {
return Stream.of(5, 5.5, 5L);
}
public void shouldUseAliasAsMeasurementNameIfProvided() {
NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
InfluxPoint point = instance.convert(item, "aliasName");
+
+ if (point == null) {
+ Assertions.fail("'point' is null");
+ return;
+ }
+
assertThat(point.getMeasurementName(), is("aliasName"));
}
when(influxDBConfiguration.isAddCategoryTag()).thenReturn(true);
InfluxPoint point = instance.convert(item, null);
+
+ if (point == null) {
+ Assertions.fail("'point' is null");
+ return;
+ }
+
assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_CATEGORY_NAME, "categoryValue"));
when(influxDBConfiguration.isAddCategoryTag()).thenReturn(false);
point = instance.convert(item, null);
+
+ if (point == null) {
+ Assertions.fail("'point' is null");
+ return;
+ }
+
assertThat(point.getTags(), not(hasKey(InfluxDBConstants.TAG_CATEGORY_NAME)));
}
when(influxDBConfiguration.isAddTypeTag()).thenReturn(true);
InfluxPoint point = instance.convert(item, null);
+
+ if (point == null) {
+ Assertions.fail("'point' is null");
+ return;
+ }
+
assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_TYPE_NAME, "Number"));
when(influxDBConfiguration.isAddTypeTag()).thenReturn(false);
point = instance.convert(item, null);
+
+ if (point == null) {
+ Assertions.fail("'point' is null");
+ return;
+ }
+
assertThat(point.getTags(), not(hasKey(InfluxDBConstants.TAG_TYPE_NAME)));
}
when(influxDBConfiguration.isAddLabelTag()).thenReturn(true);
InfluxPoint point = instance.convert(item, null);
+
+ if (point == null) {
+ Assertions.fail("'point' is null");
+ return;
+ }
+
assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_LABEL_NAME, "ItemLabel"));
when(influxDBConfiguration.isAddLabelTag()).thenReturn(false);
point = instance.convert(item, null);
+
+ if (point == null) {
+ Assertions.fail("'point' is null");
+ return;
+ }
+
assertThat(point.getTags(), not(hasKey(InfluxDBConstants.TAG_LABEL_NAME)));
}
.thenReturn(new Metadata(metadataKey, "", Map.of("key1", "val1", "key2", "val2")));
InfluxPoint point = instance.convert(item, null);
+
+ if (point == null) {
+ Assertions.fail("'point' is null");
+ return;
+ }
+
assertThat(point.getTags(), hasEntry("key1", "val1"));
assertThat(point.getTags(), hasEntry("key2", "val2"));
}
MetadataKey metadataKey = new MetadataKey(InfluxDBPersistenceService.SERVICE_NAME, item.getName());
InfluxPoint point = instance.convert(item, null);
+ if (point == null) {
+ Assertions.fail();
+ return;
+ }
assertThat(point.getMeasurementName(), equalTo(item.getName()));
point = instance.convert(item, null);
+ if (point == null) {
+ Assertions.fail();
+ return;
+ }
assertThat(point.getMeasurementName(), equalTo(item.getName()));
assertThat(point.getTags(), hasEntry("item", item.getName()));
.thenReturn(new Metadata(metadataKey, "measurementName", Map.of("key1", "val1", "key2", "val2")));
point = instance.convert(item, null);
+ if (point == null) {
+ Assertions.fail();
+ return;
+ }
assertThat(point.getMeasurementName(), equalTo("measurementName"));
assertThat(point.getTags(), hasEntry("item", item.getName()));
.thenReturn(new Metadata(metadataKey, "", Map.of("key1", "val1", "key2", "val2")));
point = instance.convert(item, null);
+ if (point == null) {
+ Assertions.fail();
+ return;
+ }
assertThat(point.getMeasurementName(), equalTo(item.getName()));
assertThat(point.getTags(), hasEntry("item", item.getName()));
}