Skip to content

Commit

Permalink
[influxdb] Implement ModifiablePersistenceService (openhab#14959)
Browse files Browse the repository at this point in the history
* [influxdb] Implement ModifiablePersistenceService

Signed-off-by: Jan N. Klug <[email protected]>
Signed-off-by: Matt Myers <[email protected]>
  • Loading branch information
J-N-K authored and matchews committed Aug 9, 2023
1 parent 0220d11 commit 0f31808
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.openhab.core.items.ItemUtil;
import org.openhab.core.persistence.FilterCriteria;
import org.openhab.core.persistence.HistoricItem;
import org.openhab.core.persistence.ModifiablePersistenceService;
import org.openhab.core.persistence.PersistenceItemInfo;
import org.openhab.core.persistence.PersistenceService;
import org.openhab.core.persistence.QueryablePersistenceService;
Expand Down Expand Up @@ -92,7 +93,7 @@
QueryablePersistenceService.class }, configurationPid = "org.openhab.influxdb", //
property = Constants.SERVICE_PID + "=org.openhab.influxdb")
@ConfigurableService(category = "persistence", label = "InfluxDB Persistence Service", description_uri = InfluxDBPersistenceService.CONFIG_URI)
public class InfluxDBPersistenceService implements QueryablePersistenceService {
public class InfluxDBPersistenceService implements ModifiablePersistenceService {
public static final String SERVICE_NAME = "influxdb";

private final Logger logger = LoggerFactory.getLogger(InfluxDBPersistenceService.class);
Expand Down Expand Up @@ -190,11 +191,20 @@ public void store(Item item) {

@Override
public void store(Item item, @Nullable String alias) {
store(item, ZonedDateTime.now(), item.getState(), alias);
}

@Override
public void store(Item item, ZonedDateTime date, State state) {
store(item, date, state, null);
}

public void store(Item item, ZonedDateTime date, State state, @Nullable String alias) {
if (!serviceActivated) {
logger.warn("InfluxDB service not ready. Storing {} rejected.", item);
return;
}
convert(item, alias).thenAccept(point -> {
convert(item, state, date.toInstant(), null).thenAccept(point -> {
if (point == null) {
logger.trace("Ignoring item {}, conversion to an InfluxDB point failed.", item.getName());
return;
Expand All @@ -207,6 +217,20 @@ public void store(Item item, @Nullable String alias) {
});
}

@Override
public boolean remove(FilterCriteria filter) throws IllegalArgumentException {
if (serviceActivated && checkConnection()) {
if (filter.getItemName() == null) {
logger.warn("Item name is missing in filter {} when trying to remove data.", filter);
return false;
}
return influxDBRepository.remove(filter);
} else {
logger.debug("Remove query {} ignored, InfluxDB is not connected.", filter);
return false;
}
}

@Override
public Iterable<HistoricItem> query(FilterCriteria filter) {
if (serviceActivated && checkConnection()) {
Expand All @@ -215,13 +239,12 @@ public Iterable<HistoricItem> query(FilterCriteria filter) {
filter.getItemName(), filter.getOrdering().toString(), filter.getState(), filter.getOperator(),
filter.getBeginDate(), filter.getEndDate(), filter.getPageSize(), filter.getPageNumber());
if (filter.getItemName() == null) {
logger.warn("Item name is missing in filter {}", filter);
logger.warn("Item name is missing in filter {} when querying data.", filter);
return List.of();
}
String query = influxDBRepository.createQueryCreator().createQuery(filter,

List<InfluxDBRepository.InfluxRow> results = influxDBRepository.query(filter,
configuration.getRetentionPolicy());
logger.trace("Query {}", query);
List<InfluxDBRepository.InfluxRow> results = influxDBRepository.query(query);
return results.stream().map(this::mapRowToHistoricItem).collect(Collectors.toList());
} else {
logger.debug("Query for persisted data ignored, InfluxDB is not connected");
Expand Down Expand Up @@ -279,13 +302,12 @@ private void commit() {
* @return a {@link CompletableFuture} that contains either <code>null</code> for item states that cannot be
* converted or the corresponding {@link InfluxPoint}
*/
CompletableFuture<@Nullable InfluxPoint> convert(Item item, @Nullable String storeAlias) {
CompletableFuture<@Nullable InfluxPoint> convert(Item item, State state, Instant timeStamp,
@Nullable String storeAlias) {
String itemName = item.getName();
String itemLabel = item.getLabel();
String category = item.getCategory();
State state = item.getState();
String itemType = item.getType();
Instant timeStamp = Instant.now();

if (state instanceof UnDefType) {
return CompletableFuture.completedFuture(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.Map;

import org.eclipse.jdt.annotation.NonNullByDefault;
import org.openhab.core.persistence.FilterCriteria;

/**
* Manages InfluxDB server interaction maintaining client connection
Expand Down Expand Up @@ -61,11 +62,11 @@ public interface InfluxDBRepository {
/**
* Executes Flux query
*
* @param query Query
* @param filter the query filter
* @return Query results
*
*/
List<InfluxRow> query(String query);
List<InfluxRow> query(FilterCriteria filter, String retentionPolicy);

/**
* Write points to database
Expand All @@ -76,11 +77,12 @@ public interface InfluxDBRepository {
boolean write(List<InfluxPoint> influxPoints);

/**
* create a query creator on this repository
* Execute delete query
*
* @return the query creator for this repository
* @param filter the query filter
* @return <code>true</code> if query executed successfully, <code>false</code> otherwise
*/
FilterCriteriaQueryCreator createQueryCreator();
boolean remove(FilterCriteria filter);

record InfluxRow(Instant time, String itemName, Object value) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.influxdb.dto.Pong;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.openhab.core.persistence.FilterCriteria;
import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
Expand All @@ -58,12 +59,14 @@ public class InfluxDB1RepositoryImpl implements InfluxDBRepository {
private final Logger logger = LoggerFactory.getLogger(InfluxDB1RepositoryImpl.class);
private final InfluxDBConfiguration configuration;
private final InfluxDBMetadataService influxDBMetadataService;
private final FilterCriteriaQueryCreator queryCreator;
private @Nullable InfluxDB client;

public InfluxDB1RepositoryImpl(InfluxDBConfiguration configuration,
InfluxDBMetadataService influxDBMetadataService) {
this.configuration = configuration;
this.influxDBMetadataService = influxDBMetadataService;
this.queryCreator = new InfluxDB1FilterCriteriaQueryCreatorImpl(configuration, influxDBMetadataService);
}

@Override
Expand Down Expand Up @@ -134,6 +137,12 @@ public boolean write(List<InfluxPoint> influxPoints) {
return true;
}

@Override
public boolean remove(FilterCriteria filter) {
logger.warn("Removing data is not supported in InfluxDB v1.");
return false;
}

private Optional<Point> convertPointToClientFormat(InfluxPoint point) {
Point.Builder clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime().toEpochMilli(),
TimeUnit.MILLISECONDS);
Expand All @@ -155,9 +164,11 @@ private Optional<Point> convertPointToClientFormat(InfluxPoint point) {
}

@Override
public List<InfluxRow> query(String query) {
public List<InfluxRow> query(FilterCriteria filter, String retentionPolicy) {
final InfluxDB currentClient = client;
if (currentClient != null) {
String query = queryCreator.createQuery(filter, retentionPolicy);
logger.trace("Query {}", query);
Query parsedQuery = new Query(query, configuration.getDatabaseName());
List<QueryResult.Result> results = currentClient.query(parsedQuery, TimeUnit.MILLISECONDS).getResults();
return convertClientResultToRepository(results);
Expand Down Expand Up @@ -216,9 +227,4 @@ private List<InfluxRow> convertClientResultToRepository(List<QueryResult.Result>
public Map<String, Integer> getStoredItemsCount() {
return Collections.emptyMap();
}

@Override
public FilterCriteriaQueryCreator createQueryCreator() {
return new InfluxDB1FilterCriteriaQueryCreatorImpl(configuration, influxDBMetadataService);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.*;

import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
Expand All @@ -25,6 +27,7 @@

import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.core.persistence.FilterCriteria;
import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
import org.openhab.persistence.influxdb.internal.InfluxDBConstants;
Expand All @@ -34,6 +37,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.influxdb.client.DeleteApi;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.InfluxDBClientOptions;
Expand All @@ -55,15 +59,18 @@ public class InfluxDB2RepositoryImpl implements InfluxDBRepository {
private final Logger logger = LoggerFactory.getLogger(InfluxDB2RepositoryImpl.class);
private final InfluxDBConfiguration configuration;
private final InfluxDBMetadataService influxDBMetadataService;
private final FilterCriteriaQueryCreator queryCreator;

private @Nullable InfluxDBClient client;
private @Nullable QueryApi queryAPI;
private @Nullable WriteApi writeAPI;
private @Nullable DeleteApi deleteAPI;

public InfluxDB2RepositoryImpl(InfluxDBConfiguration configuration,
InfluxDBMetadataService influxDBMetadataService) {
this.configuration = configuration;
this.influxDBMetadataService = influxDBMetadataService;
this.queryCreator = new InfluxDB2FilterCriteriaQueryCreatorImpl(configuration, influxDBMetadataService);
}

@Override
Expand All @@ -88,6 +95,7 @@ public boolean connect() {

queryAPI = createdClient.getQueryApi();
writeAPI = createdClient.getWriteApi();
deleteAPI = createdClient.getDeleteApi();
logger.debug("Successfully connected to InfluxDB. Instance ready={}", createdClient.ready());

return checkConnectionStatus();
Expand Down Expand Up @@ -137,6 +145,42 @@ public boolean write(List<InfluxPoint> influxPoints) {
return true;
}

@Override
public boolean remove(FilterCriteria filter) {
final DeleteApi currentDeleteApi = deleteAPI;
if (currentDeleteApi == null) {
return false;
}

if (filter.getState() != null) {
logger.warn("Deleting by value is not supported in InfluxDB v2.");
return false;
}
OffsetDateTime start = Objects.requireNonNullElse(filter.getBeginDate(), ZonedDateTime.now().minusYears(100))
.toOffsetDateTime();
OffsetDateTime stop = Objects.requireNonNullElse(filter.getEndDate(), ZonedDateTime.now().plusYears(100))
.toOffsetDateTime();

// create predicate
String predicate = "";
String itemName = filter.getItemName();
if (itemName != null) {
String name = influxDBMetadataService.getMeasurementNameOrDefault(itemName, itemName);
String measurementName = configuration.isReplaceUnderscore() ? name.replace('_', '.') : name;
predicate = "(_measurement=\"" + measurementName + "\")";
}

try {
deleteAPI.delete(start, stop, predicate, configuration.getRetentionPolicy(),
configuration.getDatabaseName());
} catch (InfluxException e) {
logger.debug("Deleting from database failed", e);
return false;
}

return true;
}

private Optional<Point> convertPointToClientFormat(InfluxPoint point) {
Point clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime(), WritePrecision.MS);
@Nullable
Expand All @@ -158,9 +202,11 @@ private Optional<Point> convertPointToClientFormat(InfluxPoint point) {
}

@Override
public List<InfluxRow> query(String query) {
public List<InfluxRow> query(FilterCriteria filter, String retentionPolicy) {
final QueryApi currentQueryAPI = queryAPI;
if (currentQueryAPI != null) {
String query = queryCreator.createQuery(filter, retentionPolicy);
logger.trace("Query {}", query);
List<FluxTable> clientResult = currentQueryAPI.query(query);
return clientResult.stream().flatMap(this::mapRawResultToHistoric).toList();
} else {
Expand Down Expand Up @@ -204,9 +250,4 @@ public Map<String, Integer> getStoredItemsCount() {
return Collections.emptyMap();
}
}

@Override
public FilterCriteriaQueryCreator createQueryCreator() {
return new InfluxDB2FilterCriteriaQueryCreatorImpl(configuration, influxDBMetadataService);
}
}
Loading

0 comments on commit 0f31808

Please sign in to comment.