2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.dbquery.internal.dbimpl.influx2;
15 import java.util.ArrayList;
16 import java.util.List;
17 import java.util.concurrent.CompletableFuture;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.Executors;
21 import org.eclipse.jdt.annotation.NonNullByDefault;
22 import org.openhab.binding.dbquery.internal.config.InfluxDB2BridgeConfiguration;
23 import org.openhab.binding.dbquery.internal.domain.Database;
24 import org.openhab.binding.dbquery.internal.domain.Query;
25 import org.openhab.binding.dbquery.internal.domain.QueryFactory;
26 import org.openhab.binding.dbquery.internal.domain.QueryResult;
27 import org.openhab.binding.dbquery.internal.error.DatabaseException;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
31 import com.influxdb.query.FluxRecord;
34 * Influx2 implementation of {@link Database}
36 * @author Joan Pujol - Initial contribution
39 public class Influx2Database implements Database {
40 private final Logger logger = LoggerFactory.getLogger(Influx2Database.class);
41 private final ExecutorService executors;
42 private final InfluxDB2BridgeConfiguration config;
43 private final InfluxDBClientFacade client;
44 private final QueryFactory queryFactory;
46 public Influx2Database(InfluxDB2BridgeConfiguration config, InfluxDBClientFacade influxDBClientFacade) {
48 this.client = influxDBClientFacade;
49 executors = Executors.newSingleThreadScheduledExecutor();
50 queryFactory = new Influx2QueryFactory();
54 public boolean isConnected() {
55 return client.isConnected();
59 public CompletableFuture<Boolean> connect() {
60 return CompletableFuture.supplyAsync(() -> {
61 synchronized (Influx2Database.this) {
62 return client.connect();
68 public CompletableFuture<Boolean> disconnect() {
69 return CompletableFuture.supplyAsync(() -> {
70 synchronized (Influx2Database.this) {
71 return client.disconnect();
77 public QueryFactory queryFactory() throws DatabaseException {
82 public CompletableFuture<QueryResult> executeQuery(Query query) {
84 if (query instanceof Influx2QueryFactory.Influx2Query) {
85 Influx2QueryFactory.Influx2Query influxQuery = (Influx2QueryFactory.Influx2Query) query;
87 CompletableFuture<QueryResult> asyncResult = new CompletableFuture<>();
88 List<FluxRecord> records = new ArrayList<>();
89 client.query(influxQuery.getQuery(), (cancellable, record) -> { // onNext
91 }, error -> { // onError
92 logger.warn("Error executing query {}", query, error);
93 asyncResult.complete(QueryResult.ofIncorrectResult("Error executing query"));
94 }, () -> { // onComplete
95 asyncResult.complete(new Influx2QueryResultExtractor().apply(records));
99 return CompletableFuture
100 .completedFuture(QueryResult.ofIncorrectResult("Unnexpected query type " + query));
102 } catch (RuntimeException e) {
103 return CompletableFuture.failedFuture(e);
108 public String toString() {
109 return "Influx2Database{config=" + config + '}';