2 * Copyright (c) 2010-2020 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.persistence.dynamodb.internal;
15 import java.time.ZonedDateTime;
16 import java.util.ArrayDeque;
17 import java.util.ArrayList;
18 import java.util.Collections;
19 import java.util.Deque;
20 import java.util.HashMap;
21 import java.util.List;
22 import java.util.Locale;
24 import java.util.Map.Entry;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.ScheduledExecutorService;
28 import java.util.concurrent.ScheduledFuture;
29 import java.util.concurrent.TimeUnit;
30 import java.util.function.Function;
32 import org.eclipse.jdt.annotation.NonNullByDefault;
33 import org.eclipse.jdt.annotation.Nullable;
34 import org.openhab.core.common.NamedThreadFactory;
35 import org.openhab.core.config.core.ConfigurableService;
36 import org.openhab.core.items.Item;
37 import org.openhab.core.items.ItemNotFoundException;
38 import org.openhab.core.items.ItemRegistry;
39 import org.openhab.core.persistence.FilterCriteria;
40 import org.openhab.core.persistence.HistoricItem;
41 import org.openhab.core.persistence.PersistenceItemInfo;
42 import org.openhab.core.persistence.PersistenceService;
43 import org.openhab.core.persistence.QueryablePersistenceService;
44 import org.openhab.core.persistence.strategy.PersistenceStrategy;
45 import org.openhab.core.types.State;
46 import org.osgi.framework.BundleContext;
47 import org.osgi.framework.Constants;
48 import org.osgi.service.component.annotations.Activate;
49 import org.osgi.service.component.annotations.Component;
50 import org.osgi.service.component.annotations.Deactivate;
51 import org.osgi.service.component.annotations.Reference;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
55 import com.amazonaws.AmazonClientException;
56 import com.amazonaws.AmazonServiceException;
57 import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBMapper;
58 import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBMapper.FailedBatch;
59 import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBMapperConfig;
60 import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBMapperConfig.PaginationLoadingStrategy;
61 import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBQueryExpression;
62 import com.amazonaws.services.dynamodbv2.datamodeling.PaginatedQueryList;
63 import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
64 import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
65 import com.amazonaws.services.dynamodbv2.model.GlobalSecondaryIndex;
66 import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
67 import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
68 import com.amazonaws.services.dynamodbv2.model.TableDescription;
69 import com.amazonaws.services.dynamodbv2.model.TableStatus;
70 import com.amazonaws.services.dynamodbv2.model.WriteRequest;
73 * This is the implementation of the DynamoDB {@link PersistenceService}. It persists item values
74 * using the <a href="https://aws.amazon.com/dynamodb/">Amazon DynamoDB</a> database. The states (
75 * {@link State}) of an {@link Item} are persisted in DynamoDB tables.
77 * The service creates tables automatically, one for numbers, and one for strings.
79 * @see AbstractDynamoDBItem.fromState for details how different items are persisted
81 * @author Sami Salonen - Initial contribution
82 * @author Kai Kreuzer - Migration to 3.x
86 @Component(service = { PersistenceService.class,
87 QueryablePersistenceService.class }, configurationPid = "org.openhab.dynamodb", //
88 property = Constants.SERVICE_PID + "=org.openhab.dynamodb")
89 @ConfigurableService(category = "persistence", label = "DynamoDB Persistence Service", description_uri = DynamoDBPersistenceService.CONFIG_URI)
90 public class DynamoDBPersistenceService extends AbstractBufferedPersistenceService<DynamoDBItem<?>>
91 implements QueryablePersistenceService {
93 protected static final String CONFIG_URI = "persistence:dynamodb";
95 private class ExponentialBackoffRetry implements Runnable {
97 private Map<String, List<WriteRequest>> unprocessedItems;
98 private @Nullable Exception lastException;
100 public ExponentialBackoffRetry(Map<String, List<WriteRequest>> unprocessedItems) {
101 this.unprocessedItems = unprocessedItems;
106 logger.debug("Error storing object to dynamo, unprocessed items: {}. Retrying with exponential back-off",
108 lastException = null;
109 while (!unprocessedItems.isEmpty() && retry < WAIT_MILLIS_IN_RETRIES.length) {
116 BatchWriteItemOutcome outcome = DynamoDBPersistenceService.this.db.getDynamoDB()
117 .batchWriteItemUnprocessed(unprocessedItems);
118 unprocessedItems = outcome.getUnprocessedItems();
119 lastException = null;
120 } catch (AmazonServiceException e) {
121 if (e instanceof ResourceNotFoundException) {
123 "DynamoDB query raised unexpected exception: {}. This might happen if table was recently created",
126 logger.debug("DynamoDB query raised unexpected exception: {}.", e.getMessage());
132 if (unprocessedItems.isEmpty()) {
133 logger.debug("After {} retries successfully wrote all unprocessed items", retry);
136 "Even after retries failed to write some items. Last exception: {} {}, unprocessed items: {}",
137 lastException == null ? "null" : lastException.getClass().getName(),
138 lastException == null ? "null" : lastException.getMessage(), unprocessedItems);
142 private boolean sleep() {
145 if (retry == 1 && lastException != null && lastException instanceof ResourceNotFoundException) {
146 sleepTime = WAIT_ON_FIRST_RESOURCE_NOT_FOUND_MILLIS;
148 sleepTime = WAIT_MILLIS_IN_RETRIES[retry];
150 Thread.sleep(sleepTime);
152 } catch (InterruptedException e) {
153 logger.debug("Interrupted while writing data!");
158 public Map<String, List<WriteRequest>> getUnprocessedItems() {
159 return unprocessedItems;
163 private static final int WAIT_ON_FIRST_RESOURCE_NOT_FOUND_MILLIS = 5000;
164 private static final int[] WAIT_MILLIS_IN_RETRIES = new int[] { 100, 100, 200, 300, 500 };
165 private static final String DYNAMODB_THREADPOOL_NAME = "dynamodbPersistenceService";
167 private final ItemRegistry itemRegistry;
168 private @Nullable DynamoDBClient db;
169 private final Logger logger = LoggerFactory.getLogger(DynamoDBPersistenceService.class);
170 private boolean isProperlyConfigured;
171 private @NonNullByDefault({}) DynamoDBConfig dbConfig;
172 private @NonNullByDefault({}) DynamoDBTableNameResolver tableNameResolver;
173 private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1,
174 new NamedThreadFactory(DYNAMODB_THREADPOOL_NAME));
175 private @Nullable ScheduledFuture<?> writeBufferedDataFuture;
178 public DynamoDBPersistenceService(final @Reference ItemRegistry itemRegistry) {
179 this.itemRegistry = itemRegistry;
183 * For testing. Allows access to underlying DynamoDBClient.
185 * @return DynamoDBClient connected to AWS Dyanamo DB.
188 DynamoDBClient getDb() {
193 public void activate(final @Nullable BundleContext bundleContext, final Map<String, Object> config) {
195 dbConfig = DynamoDBConfig.fromConfig(config);
196 if (dbConfig == null) {
197 // Configuration was invalid. Abort service activation.
198 // Error is already logger in fromConfig.
202 tableNameResolver = new DynamoDBTableNameResolver(dbConfig.getTablePrefix());
204 if (!ensureClient()) {
205 logger.error("Error creating dynamodb database client. Aborting service activation.");
208 } catch (Exception e) {
209 logger.error("Error constructing dynamodb client", e);
213 writeBufferedDataFuture = null;
214 resetWithBufferSize(dbConfig.getBufferSize());
215 long commitIntervalMillis = dbConfig.getBufferCommitIntervalMillis();
216 if (commitIntervalMillis > 0) {
217 writeBufferedDataFuture = scheduler.scheduleWithFixedDelay(new Runnable() {
221 DynamoDBPersistenceService.this.flushBufferedData();
222 } catch (RuntimeException e) {
223 // We want to catch all unexpected exceptions since all unhandled exceptions make
224 // ScheduledExecutorService halt the regular running of the task.
225 // It is better to print out the exception, and try again
228 "Execution of scheduled flushing of buffered data failed unexpectedly. Ignoring exception, trying again according to configured commit interval of {} ms.",
229 commitIntervalMillis, e);
232 }, 0, commitIntervalMillis, TimeUnit.MILLISECONDS);
234 isProperlyConfigured = true;
235 logger.debug("dynamodb persistence service activated");
239 public void deactivate() {
240 logger.debug("dynamodb persistence service deactivated");
241 if (writeBufferedDataFuture != null) {
242 writeBufferedDataFuture.cancel(false);
243 writeBufferedDataFuture = null;
249 * Initializes DynamoDBClient (db field)
251 * If DynamoDBClient constructor throws an exception, error is logged and false is returned.
253 * @return whether initialization was successful.
255 private boolean ensureClient() {
258 db = new DynamoDBClient(dbConfig);
259 } catch (Exception e) {
260 logger.error("Error constructing dynamodb client", e);
268 public DynamoDBItem<?> persistenceItemFromState(String name, State state, ZonedDateTime time) {
269 return AbstractDynamoDBItem.fromState(name, state, time);
273 * Create table (if not present) and wait for table to become active.
275 * Synchronized in order to ensure that at most single thread is creating the table at a time
279 * @return whether table creation succeeded.
281 private synchronized boolean createTable(DynamoDBMapper mapper, Class<?> dtoClass) {
287 ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput(dbConfig.getReadCapacityUnits(),
288 dbConfig.getWriteCapacityUnits());
289 CreateTableRequest request = mapper.generateCreateTableRequest(dtoClass);
290 request.setProvisionedThroughput(provisionedThroughput);
291 if (request.getGlobalSecondaryIndexes() != null) {
292 for (GlobalSecondaryIndex index : request.getGlobalSecondaryIndexes()) {
293 index.setProvisionedThroughput(provisionedThroughput);
296 tableName = request.getTableName();
298 db.getDynamoClient().describeTable(tableName);
299 } catch (ResourceNotFoundException e) {
300 // No table present, continue with creation
301 db.getDynamoClient().createTable(request);
302 } catch (AmazonClientException e) {
303 logger.error("Table creation failed due to error in describeTable operation", e);
307 // table found or just created, wait
308 return waitForTableToBecomeActive(tableName);
309 } catch (AmazonClientException e) {
310 logger.error("Exception when creating table", e);
315 private boolean waitForTableToBecomeActive(String tableName) {
317 logger.debug("Checking if table '{}' is created...", tableName);
318 final TableDescription tableDescription;
320 tableDescription = db.getDynamoDB().getTable(tableName).waitForActive();
321 } catch (IllegalArgumentException e) {
322 logger.warn("Table '{}' is being deleted: {} {}", tableName, e.getClass().getSimpleName(),
325 } catch (ResourceNotFoundException e) {
326 logger.warn("Table '{}' was deleted unexpectedly: {} {}", tableName, e.getClass().getSimpleName(),
330 boolean success = TableStatus.ACTIVE.equals(TableStatus.fromValue(tableDescription.getTableStatus()));
332 logger.debug("Creation of table '{}' successful, table status is now {}", tableName,
333 tableDescription.getTableStatus());
335 logger.warn("Creation of table '{}' unsuccessful, table status is now {}", tableName,
336 tableDescription.getTableStatus());
339 } catch (AmazonClientException e) {
340 logger.error("Exception when checking table status (describe): {}", e.getMessage());
342 } catch (InterruptedException e) {
343 logger.error("Interrupted while trying to check table status: {}", e.getMessage());
348 private void resetClient() {
355 tableNameResolver = null;
356 isProperlyConfigured = false;
359 private DynamoDBMapper getDBMapper(String tableName) {
361 DynamoDBMapperConfig mapperConfig = new DynamoDBMapperConfig.Builder()
362 .withTableNameOverride(new DynamoDBMapperConfig.TableNameOverride(tableName))
363 .withPaginationLoadingStrategy(PaginationLoadingStrategy.LAZY_LOADING).build();
364 return new DynamoDBMapper(db.getDynamoClient(), mapperConfig);
365 } catch (AmazonClientException e) {
366 logger.error("Error getting db mapper: {}", e.getMessage());
372 protected boolean isReadyToStore() {
373 return isProperlyConfigured && ensureClient();
377 public String getId() {
382 public String getLabel(@Nullable Locale locale) {
387 public Set<PersistenceItemInfo> getItemInfo() {
388 return Collections.emptySet();
392 protected void flushBufferedData() {
393 if (buffer != null && buffer.isEmpty()) {
396 logger.debug("Writing buffered data. Buffer size: {}", buffer.size());
399 Map<String, Deque<DynamoDBItem<?>>> itemsByTable = readBuffer();
400 // Write batch of data, one table at a time
401 for (Entry<String, Deque<DynamoDBItem<?>>> entry : itemsByTable.entrySet()) {
402 String tableName = entry.getKey();
403 Deque<DynamoDBItem<?>> batch = entry.getValue();
404 if (!batch.isEmpty()) {
405 flushBatch(getDBMapper(tableName), batch);
408 if (buffer != null && buffer.isEmpty()) {
414 private Map<String, Deque<DynamoDBItem<?>>> readBuffer() {
415 Map<String, Deque<DynamoDBItem<?>>> batchesByTable = new HashMap<>(2);
417 while (!buffer.isEmpty()) {
418 DynamoDBItem<?> dynamoItem = buffer.poll();
419 if (dynamoItem == null) {
422 String tableName = tableNameResolver.fromItem(dynamoItem);
423 Deque<DynamoDBItem<?>> batch = batchesByTable.computeIfAbsent(tableName, new Function<>() {
425 public Deque<DynamoDBItem<?>> apply(@Nullable String t) {
426 return new ArrayDeque<>();
429 batch.add(dynamoItem);
431 return batchesByTable;
435 * Flush batch of data to DynamoDB
437 * @param mapper mapper associated with the batch
438 * @param batch batch of data to write to DynamoDB
440 private void flushBatch(DynamoDBMapper mapper, Deque<DynamoDBItem<?>> batch) {
441 long currentTimeMillis = System.currentTimeMillis();
442 List<FailedBatch> failed = mapper.batchSave(batch);
443 for (FailedBatch failedBatch : failed) {
444 if (failedBatch.getException() instanceof ResourceNotFoundException) {
445 // Table did not exist. Try again after creating table
446 retryFlushAfterCreatingTable(mapper, batch, failedBatch);
448 logger.debug("Batch failed with {}. Retrying next with exponential back-off",
449 failedBatch.getException().getMessage());
450 new ExponentialBackoffRetry(failedBatch.getUnprocessedItems()).run();
453 if (failed.isEmpty()) {
454 logger.debug("flushBatch ended with {} items in {} ms: {}", batch.size(),
455 System.currentTimeMillis() - currentTimeMillis, batch);
458 "flushBatch ended with {} items in {} ms: {}. There were some failed batches that were retried -- check logs for ERRORs to see if writes were successful",
459 batch.size(), System.currentTimeMillis() - currentTimeMillis, batch);
464 * Retry flushing data after creating table associated with mapper
466 * @param mapper mapper associated with the batch
467 * @param batch original batch of data. Used for logging and to determine table name
468 * @param failedBatch failed batch that should be retried
470 private void retryFlushAfterCreatingTable(DynamoDBMapper mapper, Deque<DynamoDBItem<?>> batch,
471 FailedBatch failedBatch) {
472 logger.debug("Table was not found. Trying to create table and try saving again");
473 if (createTable(mapper, batch.peek().getClass())) {
474 logger.debug("Table creation successful, trying to save again");
475 if (!failedBatch.getUnprocessedItems().isEmpty()) {
476 ExponentialBackoffRetry retry = new ExponentialBackoffRetry(failedBatch.getUnprocessedItems());
478 if (retry.getUnprocessedItems().isEmpty()) {
479 logger.debug("Successfully saved items after table creation");
483 logger.warn("Table creation failed. Not storing some parts of batch: {}. Unprocessed items: {}", batch,
484 failedBatch.getUnprocessedItems());
489 public Iterable<HistoricItem> query(FilterCriteria filter) {
490 logger.debug("got a query");
491 if (!isProperlyConfigured) {
492 logger.debug("Configuration for dynamodb not yet loaded or broken. Not storing item.");
493 return Collections.emptyList();
495 if (!ensureClient()) {
496 logger.warn("DynamoDB not connected. Not storing item.");
497 return Collections.emptyList();
500 String itemName = filter.getItemName();
501 Item item = getItemFromRegistry(itemName);
503 logger.warn("Could not get item {} from registry!", itemName);
504 return Collections.emptyList();
507 Class<DynamoDBItem<?>> dtoClass = AbstractDynamoDBItem.getDynamoItemClass(item.getClass());
508 String tableName = tableNameResolver.fromClass(dtoClass);
509 DynamoDBMapper mapper = getDBMapper(tableName);
510 logger.debug("item {} (class {}) will be tried to query using dto class {} from table {}", itemName,
511 item.getClass(), dtoClass, tableName);
513 List<HistoricItem> historicItems = new ArrayList<>();
515 DynamoDBQueryExpression<DynamoDBItem<?>> queryExpression = DynamoDBQueryUtils.createQueryExpression(dtoClass,
517 @SuppressWarnings("rawtypes")
518 final PaginatedQueryList<? extends DynamoDBItem> paginatedList;
520 paginatedList = mapper.query(dtoClass, queryExpression);
521 } catch (AmazonServiceException e) {
523 "DynamoDB query raised unexpected exception: {}. Returning empty collection. "
524 + "Status code 400 (resource not found) might occur if table was just created.",
526 return Collections.emptyList();
528 for (int itemIndexOnPage = 0; itemIndexOnPage < filter.getPageSize(); itemIndexOnPage++) {
529 int itemIndex = filter.getPageNumber() * filter.getPageSize() + itemIndexOnPage;
530 DynamoDBItem<?> dynamoItem;
532 dynamoItem = paginatedList.get(itemIndex);
533 } catch (IndexOutOfBoundsException e) {
534 logger.debug("Index {} is out-of-bounds", itemIndex);
537 if (dynamoItem != null) {
538 HistoricItem historicItem = dynamoItem.asHistoricItem(item);
539 logger.trace("Dynamo item {} converted to historic item: {}", item, historicItem);
540 historicItems.add(historicItem);
544 return historicItems;
548 * Retrieves the item for the given name from the item registry
551 * @return item with the given name, or null if no such item exists in item registry.
553 private @Nullable Item getItemFromRegistry(String itemName) {
556 if (itemRegistry != null) {
557 item = itemRegistry.getItem(itemName);
559 } catch (ItemNotFoundException e1) {
560 logger.error("Unable to get item {} from registry", itemName);
566 public List<PersistenceStrategy> getDefaultStrategies() {
567 return List.of(PersistenceStrategy.Globals.RESTORE, PersistenceStrategy.Globals.CHANGE);