]> git.basschouten.com Git - openhab-addons.git/blob
f6d5dc4bf98d68cfed671a9beccbd85810d5970d
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.persistence.dynamodb.internal;
14
15 import java.time.ZonedDateTime;
16 import java.util.ArrayDeque;
17 import java.util.ArrayList;
18 import java.util.Collections;
19 import java.util.Deque;
20 import java.util.HashMap;
21 import java.util.List;
22 import java.util.Locale;
23 import java.util.Map;
24 import java.util.Map.Entry;
25 import java.util.Set;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.ScheduledExecutorService;
28 import java.util.concurrent.ScheduledFuture;
29 import java.util.concurrent.TimeUnit;
30 import java.util.function.Function;
31
32 import org.eclipse.jdt.annotation.NonNullByDefault;
33 import org.eclipse.jdt.annotation.Nullable;
34 import org.openhab.core.common.NamedThreadFactory;
35 import org.openhab.core.config.core.ConfigurableService;
36 import org.openhab.core.items.Item;
37 import org.openhab.core.items.ItemNotFoundException;
38 import org.openhab.core.items.ItemRegistry;
39 import org.openhab.core.persistence.FilterCriteria;
40 import org.openhab.core.persistence.HistoricItem;
41 import org.openhab.core.persistence.PersistenceItemInfo;
42 import org.openhab.core.persistence.PersistenceService;
43 import org.openhab.core.persistence.QueryablePersistenceService;
44 import org.openhab.core.persistence.strategy.PersistenceStrategy;
45 import org.openhab.core.types.State;
46 import org.osgi.framework.BundleContext;
47 import org.osgi.framework.Constants;
48 import org.osgi.service.component.annotations.Activate;
49 import org.osgi.service.component.annotations.Component;
50 import org.osgi.service.component.annotations.Deactivate;
51 import org.osgi.service.component.annotations.Reference;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 import com.amazonaws.AmazonClientException;
56 import com.amazonaws.AmazonServiceException;
57 import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBMapper;
58 import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBMapper.FailedBatch;
59 import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBMapperConfig;
60 import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBMapperConfig.PaginationLoadingStrategy;
61 import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBQueryExpression;
62 import com.amazonaws.services.dynamodbv2.datamodeling.PaginatedQueryList;
63 import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
64 import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
65 import com.amazonaws.services.dynamodbv2.model.GlobalSecondaryIndex;
66 import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
67 import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
68 import com.amazonaws.services.dynamodbv2.model.TableDescription;
69 import com.amazonaws.services.dynamodbv2.model.TableStatus;
70 import com.amazonaws.services.dynamodbv2.model.WriteRequest;
71
72 /**
73  * This is the implementation of the DynamoDB {@link PersistenceService}. It persists item values
74  * using the <a href="https://aws.amazon.com/dynamodb/">Amazon DynamoDB</a> database. The states (
75  * {@link State}) of an {@link Item} are persisted in DynamoDB tables.
76  *
77  * The service creates tables automatically, one for numbers, and one for strings.
78  *
79  * @see AbstractDynamoDBItem.fromState for details how different items are persisted
80  *
81  * @author Sami Salonen - Initial contribution
82  * @author Kai Kreuzer - Migration to 3.x
83  *
84  */
85 @NonNullByDefault
86 @Component(service = { PersistenceService.class,
87         QueryablePersistenceService.class }, configurationPid = "org.openhab.dynamodb", //
88         property = Constants.SERVICE_PID + "=org.openhab.dynamodb")
89 @ConfigurableService(category = "persistence", label = "DynamoDB Persistence Service", description_uri = DynamoDBPersistenceService.CONFIG_URI)
90 public class DynamoDBPersistenceService extends AbstractBufferedPersistenceService<DynamoDBItem<?>>
91         implements QueryablePersistenceService {
92
93     protected static final String CONFIG_URI = "persistence:dynamodb";
94
95     private class ExponentialBackoffRetry implements Runnable {
96         private int retry;
97         private Map<String, List<WriteRequest>> unprocessedItems;
98         private @Nullable Exception lastException;
99
100         public ExponentialBackoffRetry(Map<String, List<WriteRequest>> unprocessedItems) {
101             this.unprocessedItems = unprocessedItems;
102         }
103
104         @Override
105         public void run() {
106             logger.debug("Error storing object to dynamo, unprocessed items: {}. Retrying with exponential back-off",
107                     unprocessedItems);
108             lastException = null;
109             while (!unprocessedItems.isEmpty() && retry < WAIT_MILLIS_IN_RETRIES.length) {
110                 if (!sleep()) {
111                     // Interrupted
112                     return;
113                 }
114                 retry++;
115                 try {
116                     BatchWriteItemOutcome outcome = DynamoDBPersistenceService.this.db.getDynamoDB()
117                             .batchWriteItemUnprocessed(unprocessedItems);
118                     unprocessedItems = outcome.getUnprocessedItems();
119                     lastException = null;
120                 } catch (AmazonServiceException e) {
121                     if (e instanceof ResourceNotFoundException) {
122                         logger.debug(
123                                 "DynamoDB query raised unexpected exception: {}. This might happen if table was recently created",
124                                 e.getMessage());
125                     } else {
126                         logger.debug("DynamoDB query raised unexpected exception: {}.", e.getMessage());
127                     }
128                     lastException = e;
129                     continue;
130                 }
131             }
132             if (unprocessedItems.isEmpty()) {
133                 logger.debug("After {} retries successfully wrote all unprocessed items", retry);
134             } else {
135                 logger.warn(
136                         "Even after retries failed to write some items. Last exception: {} {}, unprocessed items: {}",
137                         lastException == null ? "null" : lastException.getClass().getName(),
138                         lastException == null ? "null" : lastException.getMessage(), unprocessedItems);
139             }
140         }
141
142         private boolean sleep() {
143             try {
144                 long sleepTime;
145                 if (retry == 1 && lastException != null && lastException instanceof ResourceNotFoundException) {
146                     sleepTime = WAIT_ON_FIRST_RESOURCE_NOT_FOUND_MILLIS;
147                 } else {
148                     sleepTime = WAIT_MILLIS_IN_RETRIES[retry];
149                 }
150                 Thread.sleep(sleepTime);
151                 return true;
152             } catch (InterruptedException e) {
153                 logger.debug("Interrupted while writing data!");
154                 return false;
155             }
156         }
157
158         public Map<String, List<WriteRequest>> getUnprocessedItems() {
159             return unprocessedItems;
160         }
161     }
162
163     private static final int WAIT_ON_FIRST_RESOURCE_NOT_FOUND_MILLIS = 5000;
164     private static final int[] WAIT_MILLIS_IN_RETRIES = new int[] { 100, 100, 200, 300, 500 };
165     private static final String DYNAMODB_THREADPOOL_NAME = "dynamodbPersistenceService";
166
167     private final ItemRegistry itemRegistry;
168     private @Nullable DynamoDBClient db;
169     private final Logger logger = LoggerFactory.getLogger(DynamoDBPersistenceService.class);
170     private boolean isProperlyConfigured;
171     private @NonNullByDefault({}) DynamoDBConfig dbConfig;
172     private @NonNullByDefault({}) DynamoDBTableNameResolver tableNameResolver;
173     private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1,
174             new NamedThreadFactory(DYNAMODB_THREADPOOL_NAME));
175     private @Nullable ScheduledFuture<?> writeBufferedDataFuture;
176
177     @Activate
178     public DynamoDBPersistenceService(final @Reference ItemRegistry itemRegistry) {
179         this.itemRegistry = itemRegistry;
180     }
181
182     /**
183      * For testing. Allows access to underlying DynamoDBClient.
184      *
185      * @return DynamoDBClient connected to AWS Dyanamo DB.
186      */
187     @Nullable
188     DynamoDBClient getDb() {
189         return db;
190     }
191
192     @Activate
193     public void activate(final @Nullable BundleContext bundleContext, final Map<String, Object> config) {
194         resetClient();
195         dbConfig = DynamoDBConfig.fromConfig(config);
196         if (dbConfig == null) {
197             // Configuration was invalid. Abort service activation.
198             // Error is already logger in fromConfig.
199             return;
200         }
201
202         tableNameResolver = new DynamoDBTableNameResolver(dbConfig.getTablePrefix());
203         try {
204             if (!ensureClient()) {
205                 logger.error("Error creating dynamodb database client. Aborting service activation.");
206                 return;
207             }
208         } catch (Exception e) {
209             logger.error("Error constructing dynamodb client", e);
210             return;
211         }
212
213         writeBufferedDataFuture = null;
214         resetWithBufferSize(dbConfig.getBufferSize());
215         long commitIntervalMillis = dbConfig.getBufferCommitIntervalMillis();
216         if (commitIntervalMillis > 0) {
217             writeBufferedDataFuture = scheduler.scheduleWithFixedDelay(new Runnable() {
218                 @Override
219                 public void run() {
220                     try {
221                         DynamoDBPersistenceService.this.flushBufferedData();
222                     } catch (RuntimeException e) {
223                         // We want to catch all unexpected exceptions since all unhandled exceptions make
224                         // ScheduledExecutorService halt the regular running of the task.
225                         // It is better to print out the exception, and try again
226                         // (on next cycle)
227                         logger.warn(
228                                 "Execution of scheduled flushing of buffered data failed unexpectedly. Ignoring exception, trying again according to configured commit interval of {} ms.",
229                                 commitIntervalMillis, e);
230                     }
231                 }
232             }, 0, commitIntervalMillis, TimeUnit.MILLISECONDS);
233         }
234         isProperlyConfigured = true;
235         logger.debug("dynamodb persistence service activated");
236     }
237
238     @Deactivate
239     public void deactivate() {
240         logger.debug("dynamodb persistence service deactivated");
241         if (writeBufferedDataFuture != null) {
242             writeBufferedDataFuture.cancel(false);
243             writeBufferedDataFuture = null;
244         }
245         resetClient();
246     }
247
248     /**
249      * Initializes DynamoDBClient (db field)
250      *
251      * If DynamoDBClient constructor throws an exception, error is logged and false is returned.
252      *
253      * @return whether initialization was successful.
254      */
255     private boolean ensureClient() {
256         if (db == null) {
257             try {
258                 db = new DynamoDBClient(dbConfig);
259             } catch (Exception e) {
260                 logger.error("Error constructing dynamodb client", e);
261                 return false;
262             }
263         }
264         return true;
265     }
266
267     @Override
268     public DynamoDBItem<?> persistenceItemFromState(String name, State state, ZonedDateTime time) {
269         return AbstractDynamoDBItem.fromState(name, state, time);
270     }
271
272     /**
273      * Create table (if not present) and wait for table to become active.
274      *
275      * Synchronized in order to ensure that at most single thread is creating the table at a time
276      *
277      * @param mapper
278      * @param dtoClass
279      * @return whether table creation succeeded.
280      */
281     private synchronized boolean createTable(DynamoDBMapper mapper, Class<?> dtoClass) {
282         if (db == null) {
283             return false;
284         }
285         String tableName;
286         try {
287             ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput(dbConfig.getReadCapacityUnits(),
288                     dbConfig.getWriteCapacityUnits());
289             CreateTableRequest request = mapper.generateCreateTableRequest(dtoClass);
290             request.setProvisionedThroughput(provisionedThroughput);
291             if (request.getGlobalSecondaryIndexes() != null) {
292                 for (GlobalSecondaryIndex index : request.getGlobalSecondaryIndexes()) {
293                     index.setProvisionedThroughput(provisionedThroughput);
294                 }
295             }
296             tableName = request.getTableName();
297             try {
298                 db.getDynamoClient().describeTable(tableName);
299             } catch (ResourceNotFoundException e) {
300                 // No table present, continue with creation
301                 db.getDynamoClient().createTable(request);
302             } catch (AmazonClientException e) {
303                 logger.error("Table creation failed due to error in describeTable operation", e);
304                 return false;
305             }
306
307             // table found or just created, wait
308             return waitForTableToBecomeActive(tableName);
309         } catch (AmazonClientException e) {
310             logger.error("Exception when creating table", e);
311             return false;
312         }
313     }
314
315     private boolean waitForTableToBecomeActive(String tableName) {
316         try {
317             logger.debug("Checking if table '{}' is created...", tableName);
318             final TableDescription tableDescription;
319             try {
320                 tableDescription = db.getDynamoDB().getTable(tableName).waitForActive();
321             } catch (IllegalArgumentException e) {
322                 logger.warn("Table '{}' is being deleted: {} {}", tableName, e.getClass().getSimpleName(),
323                         e.getMessage());
324                 return false;
325             } catch (ResourceNotFoundException e) {
326                 logger.warn("Table '{}' was deleted unexpectedly: {} {}", tableName, e.getClass().getSimpleName(),
327                         e.getMessage());
328                 return false;
329             }
330             boolean success = TableStatus.ACTIVE.equals(TableStatus.fromValue(tableDescription.getTableStatus()));
331             if (success) {
332                 logger.debug("Creation of table '{}' successful, table status is now {}", tableName,
333                         tableDescription.getTableStatus());
334             } else {
335                 logger.warn("Creation of table '{}' unsuccessful, table status is now {}", tableName,
336                         tableDescription.getTableStatus());
337             }
338             return success;
339         } catch (AmazonClientException e) {
340             logger.error("Exception when checking table status (describe): {}", e.getMessage());
341             return false;
342         } catch (InterruptedException e) {
343             logger.error("Interrupted while trying to check table status: {}", e.getMessage());
344             return false;
345         }
346     }
347
348     private void resetClient() {
349         if (db == null) {
350             return;
351         }
352         db.shutdown();
353         db = null;
354         dbConfig = null;
355         tableNameResolver = null;
356         isProperlyConfigured = false;
357     }
358
359     private DynamoDBMapper getDBMapper(String tableName) {
360         try {
361             DynamoDBMapperConfig mapperConfig = new DynamoDBMapperConfig.Builder()
362                     .withTableNameOverride(new DynamoDBMapperConfig.TableNameOverride(tableName))
363                     .withPaginationLoadingStrategy(PaginationLoadingStrategy.LAZY_LOADING).build();
364             return new DynamoDBMapper(db.getDynamoClient(), mapperConfig);
365         } catch (AmazonClientException e) {
366             logger.error("Error getting db mapper: {}", e.getMessage());
367             throw e;
368         }
369     }
370
371     @Override
372     protected boolean isReadyToStore() {
373         return isProperlyConfigured && ensureClient();
374     }
375
376     @Override
377     public String getId() {
378         return "dynamodb";
379     }
380
381     @Override
382     public String getLabel(@Nullable Locale locale) {
383         return "DynamoDB";
384     }
385
386     @Override
387     public Set<PersistenceItemInfo> getItemInfo() {
388         return Collections.emptySet();
389     }
390
391     @Override
392     protected void flushBufferedData() {
393         if (buffer != null && buffer.isEmpty()) {
394             return;
395         }
396         logger.debug("Writing buffered data. Buffer size: {}", buffer.size());
397
398         for (;;) {
399             Map<String, Deque<DynamoDBItem<?>>> itemsByTable = readBuffer();
400             // Write batch of data, one table at a time
401             for (Entry<String, Deque<DynamoDBItem<?>>> entry : itemsByTable.entrySet()) {
402                 String tableName = entry.getKey();
403                 Deque<DynamoDBItem<?>> batch = entry.getValue();
404                 if (!batch.isEmpty()) {
405                     flushBatch(getDBMapper(tableName), batch);
406                 }
407             }
408             if (buffer != null && buffer.isEmpty()) {
409                 break;
410             }
411         }
412     }
413
414     private Map<String, Deque<DynamoDBItem<?>>> readBuffer() {
415         Map<String, Deque<DynamoDBItem<?>>> batchesByTable = new HashMap<>(2);
416         // Get batch of data
417         while (!buffer.isEmpty()) {
418             DynamoDBItem<?> dynamoItem = buffer.poll();
419             if (dynamoItem == null) {
420                 break;
421             }
422             String tableName = tableNameResolver.fromItem(dynamoItem);
423             Deque<DynamoDBItem<?>> batch = batchesByTable.computeIfAbsent(tableName, new Function<>() {
424                 @Override
425                 public @Nullable Deque<DynamoDBItem<?>> apply(@Nullable String t) {
426                     return new ArrayDeque<>();
427                 }
428             });
429             batch.add(dynamoItem);
430         }
431         return batchesByTable;
432     }
433
434     /**
435      * Flush batch of data to DynamoDB
436      *
437      * @param mapper mapper associated with the batch
438      * @param batch batch of data to write to DynamoDB
439      */
440     private void flushBatch(DynamoDBMapper mapper, Deque<DynamoDBItem<?>> batch) {
441         long currentTimeMillis = System.currentTimeMillis();
442         List<FailedBatch> failed = mapper.batchSave(batch);
443         for (FailedBatch failedBatch : failed) {
444             if (failedBatch.getException() instanceof ResourceNotFoundException) {
445                 // Table did not exist. Try again after creating table
446                 retryFlushAfterCreatingTable(mapper, batch, failedBatch);
447             } else {
448                 logger.debug("Batch failed with {}. Retrying next with exponential back-off",
449                         failedBatch.getException().getMessage());
450                 new ExponentialBackoffRetry(failedBatch.getUnprocessedItems()).run();
451             }
452         }
453         if (failed.isEmpty()) {
454             logger.debug("flushBatch ended with {} items in {} ms: {}", batch.size(),
455                     System.currentTimeMillis() - currentTimeMillis, batch);
456         } else {
457             logger.warn(
458                     "flushBatch ended with {} items in {} ms: {}. There were some failed batches that were retried -- check logs for ERRORs to see if writes were successful",
459                     batch.size(), System.currentTimeMillis() - currentTimeMillis, batch);
460         }
461     }
462
463     /**
464      * Retry flushing data after creating table associated with mapper
465      *
466      * @param mapper mapper associated with the batch
467      * @param batch original batch of data. Used for logging and to determine table name
468      * @param failedBatch failed batch that should be retried
469      */
470     private void retryFlushAfterCreatingTable(DynamoDBMapper mapper, Deque<DynamoDBItem<?>> batch,
471             FailedBatch failedBatch) {
472         logger.debug("Table was not found. Trying to create table and try saving again");
473         if (createTable(mapper, batch.peek().getClass())) {
474             logger.debug("Table creation successful, trying to save again");
475             if (!failedBatch.getUnprocessedItems().isEmpty()) {
476                 ExponentialBackoffRetry retry = new ExponentialBackoffRetry(failedBatch.getUnprocessedItems());
477                 retry.run();
478                 if (retry.getUnprocessedItems().isEmpty()) {
479                     logger.debug("Successfully saved items after table creation");
480                 }
481             }
482         } else {
483             logger.warn("Table creation failed. Not storing some parts of batch: {}. Unprocessed items: {}", batch,
484                     failedBatch.getUnprocessedItems());
485         }
486     }
487
488     @Override
489     public Iterable<HistoricItem> query(FilterCriteria filter) {
490         logger.debug("got a query");
491         if (!isProperlyConfigured) {
492             logger.debug("Configuration for dynamodb not yet loaded or broken. Not storing item.");
493             return Collections.emptyList();
494         }
495         if (!ensureClient()) {
496             logger.warn("DynamoDB not connected. Not storing item.");
497             return Collections.emptyList();
498         }
499
500         String itemName = filter.getItemName();
501         Item item = getItemFromRegistry(itemName);
502         if (item == null) {
503             logger.warn("Could not get item {} from registry!", itemName);
504             return Collections.emptyList();
505         }
506
507         Class<DynamoDBItem<?>> dtoClass = AbstractDynamoDBItem.getDynamoItemClass(item.getClass());
508         String tableName = tableNameResolver.fromClass(dtoClass);
509         DynamoDBMapper mapper = getDBMapper(tableName);
510         logger.debug("item {} (class {}) will be tried to query using dto class {} from table {}", itemName,
511                 item.getClass(), dtoClass, tableName);
512
513         List<HistoricItem> historicItems = new ArrayList<>();
514
515         DynamoDBQueryExpression<DynamoDBItem<?>> queryExpression = DynamoDBQueryUtils.createQueryExpression(dtoClass,
516                 filter);
517         @SuppressWarnings("rawtypes")
518         final PaginatedQueryList<? extends DynamoDBItem> paginatedList;
519         try {
520             paginatedList = mapper.query(dtoClass, queryExpression);
521         } catch (AmazonServiceException e) {
522             logger.error(
523                     "DynamoDB query raised unexpected exception: {}. Returning empty collection. "
524                             + "Status code 400 (resource not found) might occur if table was just created.",
525                     e.getMessage());
526             return Collections.emptyList();
527         }
528         for (int itemIndexOnPage = 0; itemIndexOnPage < filter.getPageSize(); itemIndexOnPage++) {
529             int itemIndex = filter.getPageNumber() * filter.getPageSize() + itemIndexOnPage;
530             DynamoDBItem<?> dynamoItem;
531             try {
532                 dynamoItem = paginatedList.get(itemIndex);
533             } catch (IndexOutOfBoundsException e) {
534                 logger.debug("Index {} is out-of-bounds", itemIndex);
535                 break;
536             }
537             if (dynamoItem != null) {
538                 HistoricItem historicItem = dynamoItem.asHistoricItem(item);
539                 logger.trace("Dynamo item {} converted to historic item: {}", item, historicItem);
540                 historicItems.add(historicItem);
541             }
542
543         }
544         return historicItems;
545     }
546
547     /**
548      * Retrieves the item for the given name from the item registry
549      *
550      * @param itemName
551      * @return item with the given name, or null if no such item exists in item registry.
552      */
553     private @Nullable Item getItemFromRegistry(String itemName) {
554         Item item = null;
555         try {
556             if (itemRegistry != null) {
557                 item = itemRegistry.getItem(itemName);
558             }
559         } catch (ItemNotFoundException e1) {
560             logger.error("Unable to get item {} from registry", itemName);
561         }
562         return item;
563     }
564
565     @Override
566     public List<PersistenceStrategy> getDefaultStrategies() {
567         return List.of(PersistenceStrategy.Globals.RESTORE, PersistenceStrategy.Globals.CHANGE);
568     }
569 }