import org.openhab.core.items.ItemUtil;
import org.openhab.core.persistence.FilterCriteria;
import org.openhab.core.persistence.HistoricItem;
+import org.openhab.core.persistence.ModifiablePersistenceService;
import org.openhab.core.persistence.PersistenceItemInfo;
import org.openhab.core.persistence.PersistenceService;
import org.openhab.core.persistence.QueryablePersistenceService;
QueryablePersistenceService.class }, configurationPid = "org.openhab.influxdb", //
property = Constants.SERVICE_PID + "=org.openhab.influxdb")
@ConfigurableService(category = "persistence", label = "InfluxDB Persistence Service", description_uri = InfluxDBPersistenceService.CONFIG_URI)
-public class InfluxDBPersistenceService implements QueryablePersistenceService {
+public class InfluxDBPersistenceService implements ModifiablePersistenceService {
public static final String SERVICE_NAME = "influxdb";
private final Logger logger = LoggerFactory.getLogger(InfluxDBPersistenceService.class);
@Override
public void store(Item item, @Nullable String alias) {
+ store(item, ZonedDateTime.now(), item.getState(), alias);
+ }
+
+ @Override
+ public void store(Item item, ZonedDateTime date, State state) {
+ store(item, date, state, null);
+ }
+
+ public void store(Item item, ZonedDateTime date, State state, @Nullable String alias) {
if (!serviceActivated) {
logger.warn("InfluxDB service not ready. Storing {} rejected.", item);
return;
}
- convert(item, alias).thenAccept(point -> {
+ convert(item, state, date.toInstant(), null).thenAccept(point -> {
if (point == null) {
logger.trace("Ignoring item {}, conversion to an InfluxDB point failed.", item.getName());
return;
});
}
+ @Override
+ public boolean remove(FilterCriteria filter) throws IllegalArgumentException {
+ if (serviceActivated && checkConnection()) {
+ if (filter.getItemName() == null) {
+ logger.warn("Item name is missing in filter {} when trying to remove data.", filter);
+ return false;
+ }
+ return influxDBRepository.remove(filter);
+ } else {
+ logger.debug("Remove query {} ignored, InfluxDB is not connected.", filter);
+ return false;
+ }
+ }
+
@Override
public Iterable<HistoricItem> query(FilterCriteria filter) {
if (serviceActivated && checkConnection()) {
filter.getItemName(), filter.getOrdering().toString(), filter.getState(), filter.getOperator(),
filter.getBeginDate(), filter.getEndDate(), filter.getPageSize(), filter.getPageNumber());
if (filter.getItemName() == null) {
- logger.warn("Item name is missing in filter {}", filter);
+ logger.warn("Item name is missing in filter {} when querying data.", filter);
return List.of();
}
- String query = influxDBRepository.createQueryCreator().createQuery(filter,
+
+ List<InfluxDBRepository.InfluxRow> results = influxDBRepository.query(filter,
configuration.getRetentionPolicy());
- logger.trace("Query {}", query);
- List<InfluxDBRepository.InfluxRow> results = influxDBRepository.query(query);
return results.stream().map(this::mapRowToHistoricItem).collect(Collectors.toList());
} else {
logger.debug("Query for persisted data ignored, InfluxDB is not connected");
* @return a {@link CompletableFuture} that contains either <code>null</code> for item states that cannot be
* converted or the corresponding {@link InfluxPoint}
*/
- CompletableFuture<@Nullable InfluxPoint> convert(Item item, @Nullable String storeAlias) {
+ CompletableFuture<@Nullable InfluxPoint> convert(Item item, State state, Instant timeStamp,
+ @Nullable String storeAlias) {
String itemName = item.getName();
String itemLabel = item.getLabel();
String category = item.getCategory();
- State state = item.getState();
String itemType = item.getType();
- Instant timeStamp = Instant.now();
if (state instanceof UnDefType) {
return CompletableFuture.completedFuture(null);
import java.util.Map;
import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.openhab.core.persistence.FilterCriteria;
/**
* Manages InfluxDB server interaction maintaining client connection
/**
* Executes Flux query
*
- * @param query Query
+ * @param filter the query filter
* @return Query results
*
*/
- List<InfluxRow> query(String query);
+ List<InfluxRow> query(FilterCriteria filter, String retentionPolicy);
/**
* Write points to database
boolean write(List<InfluxPoint> influxPoints);
/**
- * create a query creator on this repository
+ * Execute delete query
*
- * @return the query creator for this repository
+ * @param filter the query filter
+ * @return <code>true</code> if query executed successfully, <code>false</code> otherwise
*/
- FilterCriteriaQueryCreator createQueryCreator();
+ boolean remove(FilterCriteria filter);
record InfluxRow(Instant time, String itemName, Object value) {
}
import org.influxdb.dto.Pong;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
+import org.openhab.core.persistence.FilterCriteria;
import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
private final Logger logger = LoggerFactory.getLogger(InfluxDB1RepositoryImpl.class);
private final InfluxDBConfiguration configuration;
private final InfluxDBMetadataService influxDBMetadataService;
+ private final FilterCriteriaQueryCreator queryCreator;
private @Nullable InfluxDB client;
public InfluxDB1RepositoryImpl(InfluxDBConfiguration configuration,
InfluxDBMetadataService influxDBMetadataService) {
this.configuration = configuration;
this.influxDBMetadataService = influxDBMetadataService;
+ this.queryCreator = new InfluxDB1FilterCriteriaQueryCreatorImpl(configuration, influxDBMetadataService);
}
@Override
return true;
}
+ @Override
+ public boolean remove(FilterCriteria filter) {
+ logger.warn("Removing data is not supported in InfluxDB v1.");
+ return false;
+ }
+
private Optional<Point> convertPointToClientFormat(InfluxPoint point) {
Point.Builder clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime().toEpochMilli(),
TimeUnit.MILLISECONDS);
}
@Override
- public List<InfluxRow> query(String query) {
+ public List<InfluxRow> query(FilterCriteria filter, String retentionPolicy) {
final InfluxDB currentClient = client;
if (currentClient != null) {
+ String query = queryCreator.createQuery(filter, retentionPolicy);
+ logger.trace("Query {}", query);
Query parsedQuery = new Query(query, configuration.getDatabaseName());
List<QueryResult.Result> results = currentClient.query(parsedQuery, TimeUnit.MILLISECONDS).getResults();
return convertClientResultToRepository(results);
public Map<String, Integer> getStoredItemsCount() {
return Collections.emptyMap();
}
-
- @Override
- public FilterCriteriaQueryCreator createQueryCreator() {
- return new InfluxDB1FilterCriteriaQueryCreatorImpl(configuration, influxDBMetadataService);
- }
}
import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.*;
import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
+import org.openhab.core.persistence.FilterCriteria;
import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
import org.openhab.persistence.influxdb.internal.InfluxDBConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.influxdb.client.DeleteApi;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.InfluxDBClientOptions;
private final Logger logger = LoggerFactory.getLogger(InfluxDB2RepositoryImpl.class);
private final InfluxDBConfiguration configuration;
private final InfluxDBMetadataService influxDBMetadataService;
+ private final FilterCriteriaQueryCreator queryCreator;
private @Nullable InfluxDBClient client;
private @Nullable QueryApi queryAPI;
private @Nullable WriteApi writeAPI;
+ private @Nullable DeleteApi deleteAPI;
public InfluxDB2RepositoryImpl(InfluxDBConfiguration configuration,
InfluxDBMetadataService influxDBMetadataService) {
this.configuration = configuration;
this.influxDBMetadataService = influxDBMetadataService;
+ this.queryCreator = new InfluxDB2FilterCriteriaQueryCreatorImpl(configuration, influxDBMetadataService);
}
@Override
queryAPI = createdClient.getQueryApi();
writeAPI = createdClient.getWriteApi();
+ deleteAPI = createdClient.getDeleteApi();
logger.debug("Successfully connected to InfluxDB. Instance ready={}", createdClient.ready());
return checkConnectionStatus();
return true;
}
+ @Override
+ public boolean remove(FilterCriteria filter) {
+ final DeleteApi currentDeleteApi = deleteAPI;
+ if (currentDeleteApi == null) {
+ return false;
+ }
+
+ if (filter.getState() != null) {
+ logger.warn("Deleting by value is not supported in InfluxDB v2.");
+ return false;
+ }
+ OffsetDateTime start = Objects.requireNonNullElse(filter.getBeginDate(), ZonedDateTime.now().minusYears(100))
+ .toOffsetDateTime();
+ OffsetDateTime stop = Objects.requireNonNullElse(filter.getEndDate(), ZonedDateTime.now().plusYears(100))
+ .toOffsetDateTime();
+
+ // create predicate
+ String predicate = "";
+ String itemName = filter.getItemName();
+ if (itemName != null) {
+ String name = influxDBMetadataService.getMeasurementNameOrDefault(itemName, itemName);
+ String measurementName = configuration.isReplaceUnderscore() ? name.replace('_', '.') : name;
+ predicate = "(_measurement=\"" + measurementName + "\")";
+ }
+
+ try {
+ deleteAPI.delete(start, stop, predicate, configuration.getRetentionPolicy(),
+ configuration.getDatabaseName());
+ } catch (InfluxException e) {
+ logger.debug("Deleting from database failed", e);
+ return false;
+ }
+
+ return true;
+ }
+
private Optional<Point> convertPointToClientFormat(InfluxPoint point) {
Point clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime(), WritePrecision.MS);
@Nullable
}
@Override
- public List<InfluxRow> query(String query) {
+ public List<InfluxRow> query(FilterCriteria filter, String retentionPolicy) {
final QueryApi currentQueryAPI = queryAPI;
if (currentQueryAPI != null) {
+ String query = queryCreator.createQuery(filter, retentionPolicy);
+ logger.trace("Query {}", query);
List<FluxTable> clientResult = currentQueryAPI.query(query);
return clientResult.stream().flatMap(this::mapRawResultToHistoric).toList();
} else {
return Collections.emptyMap();
}
}
-
- @Override
- public FilterCriteriaQueryCreator createQueryCreator() {
- return new InfluxDB2FilterCriteriaQueryCreatorImpl(configuration, influxDBMetadataService);
- }
}
import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.*;
import java.math.BigDecimal;
+import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@MethodSource
public void convertBasicItem(Number number) throws ExecutionException, InterruptedException {
NumberItem item = ItemTestHelper.createNumberItem("myitem", number);
- InfluxPoint point = instance.convert(item, null).get();
+ InfluxPoint point = instance.convert(item, item.getState(), Instant.now(), null).get();
if (point == null) {
Assertions.fail("'point' is null");
@Test
public void shouldUseAliasAsMeasurementNameIfProvided() throws ExecutionException, InterruptedException {
NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
- InfluxPoint point = instance.convert(item, "aliasName").get();
+ InfluxPoint point = instance.convert(item, item.getState(), Instant.now(), "aliasName").get();
if (point == null) {
Assertions.fail("'point' is null");
item.setCategory("categoryValue");
instance = getService(false, true, false, false);
- InfluxPoint point = instance.convert(item, null).get();
+ InfluxPoint point = instance.convert(item, item.getState(), Instant.now(), null).get();
if (point == null) {
Assertions.fail("'point' is null");
assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_CATEGORY_NAME, "categoryValue"));
instance = getService(false, false, false, false);
- point = instance.convert(item, null).get();
+ point = instance.convert(item, item.getState(), Instant.now(), null).get();
if (point == null) {
Assertions.fail("'point' is null");
NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
instance = getService(false, false, false, true);
- InfluxPoint point = instance.convert(item, null).get();
+ InfluxPoint point = instance.convert(item, item.getState(), Instant.now(), null).get();
if (point == null) {
Assertions.fail("'point' is null");
assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_TYPE_NAME, "Number"));
instance = getService(false, false, false, false);
- point = instance.convert(item, null).get();
+ point = instance.convert(item, item.getState(), Instant.now(), null).get();
if (point == null) {
Assertions.fail("'point' is null");
item.setLabel("ItemLabel");
instance = getService(false, false, true, false);
- InfluxPoint point = instance.convert(item, null).get();
+ InfluxPoint point = instance.convert(item, item.getState(), Instant.now(), null).get();
if (point == null) {
Assertions.fail("'point' is null");
assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_LABEL_NAME, "ItemLabel"));
instance = getService(false, false, false, false);
- point = instance.convert(item, null).get();
+ point = instance.convert(item, item.getState(), Instant.now(), null).get();
if (point == null) {
Assertions.fail("'point' is null");
when(metadataRegistry.get(metadataKey))
.thenReturn(new Metadata(metadataKey, "", Map.of("key1", "val1", "key2", "val2")));
- InfluxPoint point = instance.convert(item, null).get();
+ InfluxPoint point = instance.convert(item, item.getState(), Instant.now(), null).get();
if (point == null) {
Assertions.fail("'point' is null");
NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
MetadataKey metadataKey = new MetadataKey(InfluxDBPersistenceService.SERVICE_NAME, item.getName());
- InfluxPoint point = instance.convert(item, null).get();
+ InfluxPoint point = instance.convert(item, item.getState(), Instant.now(), null).get();
if (point == null) {
Assertions.fail();
return;
}
assertThat(point.getMeasurementName(), equalTo(item.getName()));
- point = instance.convert(item, null).get();
+ point = instance.convert(item, item.getState(), Instant.now(), null).get();
if (point == null) {
Assertions.fail();
return;
when(metadataRegistry.get(metadataKey))
.thenReturn(new Metadata(metadataKey, "measurementName", Map.of("key1", "val1", "key2", "val2")));
- point = instance.convert(item, null).get();
+ point = instance.convert(item, item.getState(), Instant.now(), null).get();
if (point == null) {
Assertions.fail();
return;
when(metadataRegistry.get(metadataKey))
.thenReturn(new Metadata(metadataKey, "", Map.of("key1", "val1", "key2", "val2")));
- point = instance.convert(item, null).get();
+ point = instance.convert(item, item.getState(), Instant.now(), null).get();
if (point == null) {
Assertions.fail();
return;