]> git.basschouten.com Git - openhab-addons.git/blob
d1d90c792da1a0f5b9748b2911c0ddad17031f66
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2021 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.persistence.dynamodb.internal;
14
15 import java.lang.reflect.InvocationTargetException;
16 import java.net.URI;
17 import java.time.Duration;
18 import java.time.Instant;
19 import java.time.ZonedDateTime;
20 import java.util.Collections;
21 import java.util.Iterator;
22 import java.util.List;
23 import java.util.Locale;
24 import java.util.Map;
25 import java.util.Set;
26 import java.util.concurrent.CompletableFuture;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.ThreadPoolExecutor;
31 import java.util.stream.Collectors;
32
33 import javax.measure.Unit;
34
35 import org.eclipse.jdt.annotation.NonNullByDefault;
36 import org.eclipse.jdt.annotation.Nullable;
37 import org.openhab.core.common.ThreadPoolManager;
38 import org.openhab.core.config.core.ConfigurableService;
39 import org.openhab.core.items.GenericItem;
40 import org.openhab.core.items.GroupItem;
41 import org.openhab.core.items.Item;
42 import org.openhab.core.items.ItemNotFoundException;
43 import org.openhab.core.items.ItemRegistry;
44 import org.openhab.core.library.items.NumberItem;
45 import org.openhab.core.library.types.QuantityType;
46 import org.openhab.core.persistence.FilterCriteria;
47 import org.openhab.core.persistence.HistoricItem;
48 import org.openhab.core.persistence.PersistenceItemInfo;
49 import org.openhab.core.persistence.PersistenceService;
50 import org.openhab.core.persistence.QueryablePersistenceService;
51 import org.openhab.core.persistence.strategy.PersistenceStrategy;
52 import org.openhab.core.types.State;
53 import org.openhab.core.types.UnDefType;
54 import org.osgi.framework.BundleContext;
55 import org.osgi.framework.Constants;
56 import org.osgi.service.component.annotations.Activate;
57 import org.osgi.service.component.annotations.Component;
58 import org.osgi.service.component.annotations.Deactivate;
59 import org.osgi.service.component.annotations.Reference;
60 import org.reactivestreams.Subscriber;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
63
64 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
65 import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
66 import software.amazon.awssdk.core.async.SdkPublisher;
67 import software.amazon.awssdk.core.client.config.ClientAsyncConfiguration;
68 import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
69 import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
70 import software.amazon.awssdk.enhanced.dynamodb.DynamoDbAsyncTable;
71 import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedAsyncClient;
72 import software.amazon.awssdk.enhanced.dynamodb.TableSchema;
73 import software.amazon.awssdk.enhanced.dynamodb.model.QueryEnhancedRequest;
74 import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
75 import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
76 import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClientBuilder;
77 import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
78
79 /**
80  * This is the implementation of the DynamoDB {@link PersistenceService}. It persists item values
81  * using the <a href="https://aws.amazon.com/dynamodb/">Amazon DynamoDB</a> database. The states (
82  * {@link State}) of an {@link Item} are persisted in DynamoDB tables.
83  *
84  * The service creates tables automatically, one for numbers, and one for strings.
85  *
86  * @see AbstractDynamoDBItem.fromState for details how different items are persisted
87  *
88  * @author Sami Salonen - Initial contribution
89  * @author Kai Kreuzer - Migration to 3.x
90  *
91  */
92 @NonNullByDefault
93 @Component(service = { PersistenceService.class,
94         QueryablePersistenceService.class }, configurationPid = "org.openhab.dynamodb", //
95         property = Constants.SERVICE_PID + "=org.openhab.dynamodb")
96 @ConfigurableService(category = "persistence", label = "DynamoDB Persistence Service", description_uri = DynamoDBPersistenceService.CONFIG_URI)
97 public class DynamoDBPersistenceService implements QueryablePersistenceService {
98
99     private static final int MAX_CONCURRENCY = 100;
100
101     protected static final String CONFIG_URI = "persistence:dynamodb";
102
103     private static final String DYNAMODB_THREADPOOL_NAME = "dynamodbPersistenceService";
104
105     private ItemRegistry itemRegistry;
106     private @Nullable DynamoDbEnhancedAsyncClient client;
107     private @Nullable DynamoDbAsyncClient lowLevelClient;
108     private final static Logger logger = LoggerFactory.getLogger(DynamoDBPersistenceService.class);
109     private boolean isProperlyConfigured;
110     private @Nullable DynamoDBConfig dbConfig;
111     private @Nullable DynamoDBTableNameResolver tableNameResolver;
112     private final ExecutorService executor = ThreadPoolManager.getPool(DYNAMODB_THREADPOOL_NAME);
113     private static final Duration TIMEOUT_API_CALL = Duration.ofSeconds(60);
114     private static final Duration TIMEOUT_API_CALL_ATTEMPT = Duration.ofSeconds(5);
115     private Map<Class<? extends DynamoDBItem<?>>, DynamoDbAsyncTable<? extends DynamoDBItem<?>>> tableCache = new ConcurrentHashMap<>(
116             2);
117
118     private @Nullable URI endpointOverride;
119
120     void overrideConfig(AwsRequestOverrideConfiguration.Builder config) {
121         config.apiCallAttemptTimeout(TIMEOUT_API_CALL_ATTEMPT).apiCallTimeout(TIMEOUT_API_CALL);
122     }
123
124     void overrideConfig(ClientOverrideConfiguration.Builder config) {
125         DynamoDBConfig localDbConfig = dbConfig;
126         config.apiCallAttemptTimeout(TIMEOUT_API_CALL_ATTEMPT).apiCallTimeout(TIMEOUT_API_CALL);
127         if (localDbConfig != null) {
128             config.retryPolicy(localDbConfig.getRetryPolicy());
129         }
130     }
131
132     @Activate
133     public DynamoDBPersistenceService(final @Reference ItemRegistry itemRegistry) {
134         this.itemRegistry = itemRegistry;
135     }
136
137     /**
138      * For tests
139      */
140     DynamoDBPersistenceService(final ItemRegistry itemRegistry, @Nullable URI endpointOverride) {
141         this.itemRegistry = itemRegistry;
142         this.endpointOverride = endpointOverride;
143     }
144
145     /**
146      * For tests
147      */
148     @Nullable
149     URI getEndpointOverride() {
150         return endpointOverride;
151     }
152
153     @Nullable
154     DynamoDbAsyncClient getLowLevelClient() {
155         return lowLevelClient;
156     }
157
158     ExecutorService getExecutor() {
159         return executor;
160     }
161
162     @Nullable
163     DynamoDBTableNameResolver getTableNameResolver() {
164         return tableNameResolver;
165     }
166
167     @Nullable
168     DynamoDBConfig getDbConfig() {
169         return dbConfig;
170     }
171
172     @Activate
173     public void activate(final @Nullable BundleContext bundleContext, final Map<String, Object> config) {
174         disconnect();
175         DynamoDBConfig localDbConfig = dbConfig = DynamoDBConfig.fromConfig(config);
176         if (localDbConfig == null) {
177             // Configuration was invalid. Abort service activation.
178             // Error is already logger in fromConfig.
179             return;
180         }
181         tableNameResolver = new DynamoDBTableNameResolver(localDbConfig.getTableRevision(), localDbConfig.getTable(),
182                 localDbConfig.getTablePrefixLegacy());
183         try {
184             if (!ensureClient()) {
185                 logger.error("Error creating dynamodb database client. Aborting service activation.");
186                 return;
187             }
188         } catch (Exception e) {
189             logger.error("Error constructing dynamodb client", e);
190             return;
191         }
192
193         isProperlyConfigured = true;
194         logger.debug("dynamodb persistence service activated");
195     }
196
197     @Deactivate
198     public void deactivate() {
199         logger.debug("dynamodb persistence service deactivated");
200         logIfManyQueuedTasks();
201         disconnect();
202     }
203
204     /**
205      * Initializes Dynamo DB client and determines schema
206      *
207      * If construction fails, error is logged and false is returned.
208      *
209      * @return whether initialization was successful.
210      */
211     private boolean ensureClient() {
212         DynamoDBConfig localDbConfig = dbConfig;
213         if (localDbConfig == null) {
214             return false;
215         }
216         if (client == null) {
217             try {
218                 synchronized (this) {
219                     if (this.client != null) {
220                         return true;
221                     }
222                     DynamoDbAsyncClientBuilder lowlevelClientBuilder = DynamoDbAsyncClient.builder()
223                             .credentialsProvider(StaticCredentialsProvider.create(localDbConfig.getCredentials()))
224                             .httpClient(NettyNioAsyncHttpClient.builder().maxConcurrency(MAX_CONCURRENCY).build())
225                             .asyncConfiguration(
226                                     ClientAsyncConfiguration.builder()
227                                             .advancedOption(SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
228                                                     executor)
229                                             .build())
230                             .overrideConfiguration(this::overrideConfig).region(localDbConfig.getRegion());
231                     if (endpointOverride != null) {
232                         logger.debug("DynamoDB has been overriden to {}", endpointOverride);
233                         lowlevelClientBuilder.endpointOverride(endpointOverride);
234                     }
235                     DynamoDbAsyncClient lowlevelClient = lowlevelClientBuilder.build();
236                     client = DynamoDbEnhancedAsyncClient.builder().dynamoDbClient(lowlevelClient).build();
237                     this.lowLevelClient = lowlevelClient;
238                 }
239             } catch (Exception e) {
240                 logger.error("Error constructing dynamodb client", e);
241                 return false;
242             }
243         }
244         return true;
245     }
246
247     private CompletableFuture<Boolean> resolveTableSchema() {
248         DynamoDBTableNameResolver localTableNameResolver = tableNameResolver;
249         DynamoDbAsyncClient localLowLevelClient = lowLevelClient;
250         if (localTableNameResolver == null || localLowLevelClient == null) {
251             throw new IllegalStateException("tableNameResolver or localLowLevelClient not available");
252         }
253         if (localTableNameResolver.isFullyResolved()) {
254             return CompletableFuture.completedFuture(true);
255         } else {
256             synchronized (localTableNameResolver) {
257                 if (localTableNameResolver.isFullyResolved()) {
258                     return CompletableFuture.completedFuture(true);
259                 }
260                 return localTableNameResolver.resolveSchema(localLowLevelClient,
261                         b -> b.overrideConfiguration(this::overrideConfig), executor).thenApplyAsync(resolved -> {
262                             if (resolved && localTableNameResolver.getTableSchema() == ExpectedTableSchema.LEGACY) {
263                                 logger.warn(
264                                         "Using legacy table format. Is it recommended to migrate to the new table format: specify the 'table' parameter and unset the old 'tablePrefix' parameter.");
265                             }
266                             return resolved;
267                         }, executor);
268             }
269         }
270     }
271
272     private <T extends DynamoDBItem<?>> DynamoDbAsyncTable<T> getTable(Class<T> dtoClass) {
273         DynamoDbEnhancedAsyncClient localClient = client;
274         DynamoDBTableNameResolver localTableNameResolver = tableNameResolver;
275         if (!ensureClient() || localClient == null || localTableNameResolver == null) {
276             throw new IllegalStateException("Client not ready");
277         }
278         ExpectedTableSchema expectedTableSchemaRevision = localTableNameResolver.getTableSchema();
279         String tableName = localTableNameResolver.fromClass(dtoClass);
280         final TableSchema<T> schema = getDynamoDBTableSchema(dtoClass, expectedTableSchemaRevision);
281         @SuppressWarnings("unchecked") // OK since this is the only place tableCache is populated
282         DynamoDbAsyncTable<T> table = (DynamoDbAsyncTable<T>) tableCache.computeIfAbsent(dtoClass, clz -> {
283             return localClient.table(tableName, schema);
284         });
285         if (table == null) {
286             // Invariant. To make null checker happy
287             throw new IllegalStateException();
288         }
289         return table;
290     }
291
292     private static <T extends DynamoDBItem<?>> TableSchema<T> getDynamoDBTableSchema(Class<T> dtoClass,
293             ExpectedTableSchema expectedTableSchemaRevision) {
294         if (dtoClass.equals(DynamoDBBigDecimalItem.class)) {
295             @SuppressWarnings("unchecked") // OK thanks to above conditional
296             TableSchema<T> schema = (TableSchema<T>) (expectedTableSchemaRevision == ExpectedTableSchema.NEW
297                     ? DynamoDBBigDecimalItem.TABLE_SCHEMA_NEW
298                     : DynamoDBBigDecimalItem.TABLE_SCHEMA_LEGACY);
299             return schema;
300         } else if (dtoClass.equals(DynamoDBStringItem.class)) {
301             @SuppressWarnings("unchecked") // OK thanks to above conditional
302             TableSchema<T> schema = (TableSchema<T>) (expectedTableSchemaRevision == ExpectedTableSchema.NEW
303                     ? DynamoDBStringItem.TABLE_SCHEMA_NEW
304                     : DynamoDBStringItem.TABLE_SCHEMA_LEGACY);
305             return schema;
306         } else {
307             throw new IllegalStateException("Unknown DTO class. Bug");
308         }
309     }
310
311     private void disconnect() {
312         DynamoDbAsyncClient localLowLevelClient = lowLevelClient;
313         if (client == null || localLowLevelClient == null) {
314             return;
315         }
316         localLowLevelClient.close();
317         lowLevelClient = null;
318         client = null;
319         dbConfig = null;
320         tableNameResolver = null;
321         isProperlyConfigured = false;
322         tableCache.clear();
323     }
324
325     protected boolean isReadyToStore() {
326         return isProperlyConfigured && ensureClient();
327     }
328
329     @Override
330     public String getId() {
331         return "dynamodb";
332     }
333
334     @Override
335     public String getLabel(@Nullable Locale locale) {
336         return "DynamoDB";
337     }
338
339     @Override
340     public Set<PersistenceItemInfo> getItemInfo() {
341         return Collections.emptySet();
342     }
343
344     @Override
345     public Iterable<HistoricItem> query(FilterCriteria filter) {
346         logIfManyQueuedTasks();
347         Instant start = Instant.now();
348         String filterDescription = filterToString(filter);
349         logger.trace("Got a query with filter {}", filterDescription);
350         DynamoDbEnhancedAsyncClient localClient = client;
351         DynamoDBTableNameResolver localTableNameResolver = tableNameResolver;
352         if (!isProperlyConfigured) {
353             logger.debug("Configuration for dynamodb not yet loaded or broken. Returning empty query results.");
354             return Collections.emptyList();
355         }
356         if (!ensureClient() || localClient == null || localTableNameResolver == null) {
357             logger.warn("DynamoDB not connected. Returning empty query results.");
358             return Collections.emptyList();
359         }
360
361         //
362         // Resolve unclear table schema if needed
363         //
364         try {
365             Boolean resolved = resolveTableSchema().get();
366             if (!resolved) {
367                 logger.warn("Table schema not resolved, cannot query data.");
368                 return Collections.emptyList();
369             }
370         } catch (InterruptedException e) {
371             logger.warn("Table schema resolution interrupted, cannot query data");
372             return Collections.emptyList();
373         } catch (ExecutionException e) {
374             Throwable cause = e.getCause();
375             logger.warn("Table schema resolution errored, cannot query data: {} {}",
376                     cause == null ? e.getClass().getSimpleName() : cause.getClass().getSimpleName(),
377                     cause == null ? e.getMessage() : cause.getMessage());
378             return Collections.emptyList();
379         }
380         try {
381             //
382             // Proceed with query
383             //
384             String itemName = filter.getItemName();
385             Item item = getItemFromRegistry(itemName);
386             if (item == null) {
387                 logger.warn("Could not get item {} from registry! Returning empty query results.", itemName);
388                 return Collections.emptyList();
389             }
390             if (item instanceof GroupItem) {
391                 item = ((GroupItem) item).getBaseItem();
392                 logger.debug("Item is instanceof GroupItem '{}'", itemName);
393                 if (item == null) {
394                     logger.debug("BaseItem of GroupItem is null. Ignore and give up!");
395                     return Collections.emptyList();
396                 }
397                 if (item instanceof GroupItem) {
398                     logger.debug("BaseItem of GroupItem is a GroupItem too. Ignore and give up!");
399                     return Collections.emptyList();
400                 }
401             }
402             boolean legacy = localTableNameResolver.getTableSchema() == ExpectedTableSchema.LEGACY;
403             Class<? extends DynamoDBItem<?>> dtoClass = AbstractDynamoDBItem.getDynamoItemClass(item.getClass(),
404                     legacy);
405             String tableName = localTableNameResolver.fromClass(dtoClass);
406             DynamoDbAsyncTable<? extends DynamoDBItem<?>> table = getTable(dtoClass);
407             logger.debug("Item {} (of type {}) will be tried to query using DTO class {} from table {}", itemName,
408                     item.getClass().getSimpleName(), dtoClass.getSimpleName(), tableName);
409
410             QueryEnhancedRequest queryExpression = DynamoDBQueryUtils.createQueryExpression(dtoClass,
411                     localTableNameResolver.getTableSchema(), item, filter);
412
413             CompletableFuture<List<DynamoDBItem<?>>> itemsFuture = new CompletableFuture<>();
414             final SdkPublisher<? extends DynamoDBItem<?>> itemPublisher = table.query(queryExpression).items();
415             Subscriber<DynamoDBItem<?>> pageSubscriber = new PageOfInterestSubscriber<DynamoDBItem<?>>(itemsFuture,
416                     filter.getPageNumber(), filter.getPageSize());
417             itemPublisher.subscribe(pageSubscriber);
418             // NumberItem.getUnit() is expensive, we avoid calling it in the loop
419             // by fetching the unit here.
420             final Item localItem = item;
421             final Unit<?> itemUnit = localItem instanceof NumberItem ? ((NumberItem) localItem).getUnit() : null;
422             try {
423                 @SuppressWarnings("null")
424                 List<HistoricItem> results = itemsFuture.get().stream().map(dynamoItem -> {
425                     HistoricItem historicItem = dynamoItem.asHistoricItem(localItem, itemUnit);
426                     if (historicItem == null) {
427                         logger.warn(
428                                 "Dynamo item {} serialized state '{}' cannot be converted to item {} {}. Item type changed since persistence. Ignoring",
429                                 dynamoItem.getClass().getSimpleName(), dynamoItem.getState(),
430                                 localItem.getClass().getSimpleName(), localItem.getName());
431                         return null;
432                     }
433                     logger.trace("Dynamo item {} converted to historic item: {}", localItem, historicItem);
434                     return historicItem;
435                 }).filter(value -> value != null).collect(Collectors.toList());
436                 logger.debug("Query completed in {} ms. Filter was {}",
437                         Duration.between(start, Instant.now()).toMillis(), filterDescription);
438                 return results;
439             } catch (InterruptedException e) {
440                 logger.warn("Query interrupted. Filter was {}", filterDescription);
441                 return Collections.emptyList();
442             } catch (ExecutionException e) {
443                 Throwable cause = e.getCause();
444                 if (cause instanceof ResourceNotFoundException) {
445                     logger.trace("Query failed since the DynamoDB table '{}' does not exist. Filter was {}", tableName,
446                             filterDescription);
447                 } else if (logger.isTraceEnabled()) {
448                     logger.trace("Query failed. Filter was {}", filterDescription, e);
449                 } else {
450                     logger.warn("Query failed {} {}. Filter was {}",
451                             cause == null ? e.getClass().getSimpleName() : cause.getClass().getSimpleName(),
452                             cause == null ? e.getMessage() : cause.getMessage(), filterDescription);
453                 }
454                 return Collections.emptyList();
455             }
456         } catch (Exception e) {
457             logger.error("Unexpected error with query having filter {}: {} {}. Returning empty query results.",
458                     filterDescription, e.getClass().getSimpleName(), e.getMessage());
459             return Collections.emptyList();
460         }
461     }
462
463     /**
464      * Retrieves the item for the given name from the item registry
465      *
466      * @param itemName
467      * @return item with the given name, or null if no such item exists in item registry.
468      */
469     private @Nullable Item getItemFromRegistry(String itemName) {
470         try {
471             return itemRegistry.getItem(itemName);
472         } catch (ItemNotFoundException e1) {
473             return null;
474         }
475     }
476
477     @Override
478     public List<PersistenceStrategy> getDefaultStrategies() {
479         return List.of(PersistenceStrategy.Globals.RESTORE, PersistenceStrategy.Globals.CHANGE);
480     }
481
482     @Override
483     public void store(Item item) {
484         store(item, null);
485     }
486
487     @Override
488     public void store(Item item, @Nullable String alias) {
489         // Timestamp and capture state immediately as rest of the store is asynchronous (state might change in between)
490         ZonedDateTime time = ZonedDateTime.now();
491
492         logIfManyQueuedTasks();
493         if (!(item instanceof GenericItem)) {
494             return;
495         }
496         if (item.getState() instanceof UnDefType) {
497             logger.debug("Undefined item state received. Not storing item {}.", item.getName());
498             return;
499         }
500         if (!isReadyToStore()) {
501             logger.warn("Not ready to store (config error?), not storing item {}.", item.getName());
502             return;
503         }
504         // Get Item describing the real type of data
505         // With non-group items this is same as the argument item. With Group items, this is item describing the type of
506         // state stored in the group.
507         final Item itemTemplate;
508         try {
509             itemTemplate = getEffectiveItem(item);
510         } catch (IllegalStateException e) {
511             // Exception is raised when underlying item type cannot be determined with Group item
512             // Logged already
513             return;
514         }
515
516         String effectiveName = (alias != null) ? alias : item.getName();
517
518         // We do not want to rely item.state since async context below can execute much later.
519         // We 'copy' the item for local use. copyItem also normalizes the unit with NumberItems.
520         final GenericItem copiedItem = copyItem(itemTemplate, item, effectiveName, null);
521
522         resolveTableSchema().thenAcceptAsync(resolved -> {
523             if (!resolved) {
524                 logger.warn("Table schema not resolved, not storing item {}.", copiedItem.getName());
525                 return;
526             }
527
528             DynamoDbEnhancedAsyncClient localClient = client;
529             DynamoDbAsyncClient localLowlevelClient = lowLevelClient;
530             DynamoDBConfig localConfig = dbConfig;
531             DynamoDBTableNameResolver localTableNameResolver = tableNameResolver;
532             if (!isProperlyConfigured || localClient == null || localLowlevelClient == null || localConfig == null
533                     || localTableNameResolver == null) {
534                 logger.warn("Not ready to store (config error?), not storing item {}.", item.getName());
535                 return;
536             }
537
538             Integer expireDays = localConfig.getExpireDays();
539
540             final DynamoDBItem<?> dto;
541             switch (localTableNameResolver.getTableSchema()) {
542                 case NEW:
543                     dto = AbstractDynamoDBItem.fromStateNew(copiedItem, time, expireDays);
544                     break;
545                 case LEGACY:
546                     dto = AbstractDynamoDBItem.fromStateLegacy(copiedItem, time);
547                     break;
548                 default:
549                     throw new IllegalStateException("Unexpected. Bug");
550             }
551             logger.trace("store() called with item {} {} '{}', which was converted to DTO {}",
552                     copiedItem.getClass().getSimpleName(), effectiveName, copiedItem.getState(), dto);
553             dto.accept(new DynamoDBItemVisitor<TableCreatingPutItem<? extends DynamoDBItem<?>>>() {
554
555                 @Override
556                 public TableCreatingPutItem<? extends DynamoDBItem<?>> visit(
557                         DynamoDBBigDecimalItem dynamoBigDecimalItem) {
558                     return new TableCreatingPutItem<DynamoDBBigDecimalItem>(DynamoDBPersistenceService.this,
559                             dynamoBigDecimalItem, getTable(DynamoDBBigDecimalItem.class));
560                 }
561
562                 @Override
563                 public TableCreatingPutItem<? extends DynamoDBItem<?>> visit(DynamoDBStringItem dynamoStringItem) {
564                     return new TableCreatingPutItem<DynamoDBStringItem>(DynamoDBPersistenceService.this,
565                             dynamoStringItem, getTable(DynamoDBStringItem.class));
566                 }
567             }).putItemAsync();
568         }, executor).exceptionally(e -> {
569             logger.error("Unexcepted error", e);
570             return null;
571         });
572     }
573
574     private Item getEffectiveItem(Item item) {
575         final Item effectiveItem;
576         if (item instanceof GroupItem) {
577             Item baseItem = ((GroupItem) item).getBaseItem();
578             if (baseItem == null) {
579                 // if GroupItem:<ItemType> is not defined in
580                 // *.items using StringType
581                 logger.debug(
582                         "Cannot detect ItemType for {} because the GroupItems' base type isn't set in *.items File.",
583                         item.getName());
584                 Iterator<Item> firstGroupMemberItem = ((GroupItem) item).getMembers().iterator();
585                 if (firstGroupMemberItem.hasNext()) {
586                     effectiveItem = firstGroupMemberItem.next();
587                 } else {
588                     throw new IllegalStateException("GroupItem " + item.getName()
589                             + " does not have children nor base item set, cannot determine underlying item type. Aborting!");
590                 }
591             } else {
592                 effectiveItem = baseItem;
593             }
594         } else {
595             effectiveItem = item;
596         }
597         return effectiveItem;
598     }
599
600     /**
601      * Copy item and optionally override name and state
602      *
603      * State is normalized to source item's unit with Quantity NumberItems and QuantityTypes
604      *
605      * @param itemTemplate 'template item' to be used to construct the new copy. It is also used to determine UoM unit
606      *            and get GenericItem.type
607      * @param item item that is used to acquire name and state
608      * @param nameOverride name override for the resulting copy
609      * @param stateOverride state override for the resulting copy
610      * @throws IllegalArgumentException when state is QuantityType and not compatible with item
611      */
612     static GenericItem copyItem(Item itemTemplate, Item item, @Nullable String nameOverride,
613             @Nullable State stateOverride) {
614         final GenericItem copiedItem;
615         try {
616             if (itemTemplate instanceof NumberItem) {
617                 copiedItem = (GenericItem) itemTemplate.getClass().getDeclaredConstructor(String.class, String.class)
618                         .newInstance(itemTemplate.getType(), nameOverride == null ? item.getName() : nameOverride);
619             } else {
620                 copiedItem = (GenericItem) itemTemplate.getClass().getDeclaredConstructor(String.class)
621                         .newInstance(nameOverride == null ? item.getName() : nameOverride);
622             }
623
624         } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException
625                 | NoSuchMethodException | SecurityException e) {
626             throw new IllegalArgumentException(item.toString(), e);
627         }
628         State state = stateOverride == null ? item.getState() : stateOverride;
629         if (state instanceof QuantityType<?> && itemTemplate instanceof NumberItem) {
630             Unit<?> itemUnit = ((NumberItem) itemTemplate).getUnit();
631             if (itemUnit != null) {
632                 State convertedState = ((QuantityType<?>) state).toUnit(itemUnit);
633                 if (convertedState == null) {
634                     logger.error("Unexpected unit conversion failure: {} to item unit {}", state, itemUnit);
635                     throw new IllegalArgumentException(
636                             String.format("Unexpected unit conversion failure: %s to item unit %s", state, itemUnit));
637                 }
638                 state = convertedState;
639             }
640         }
641         copiedItem.setState(state);
642         return copiedItem;
643     }
644
645     private void logIfManyQueuedTasks() {
646         if (executor instanceof ThreadPoolExecutor) {
647             ThreadPoolExecutor localExecutor = (ThreadPoolExecutor) executor;
648             if (localExecutor.getQueue().size() >= 5) {
649                 logger.trace("executor queue size: {}, remaining space {}. Active threads {}",
650                         localExecutor.getQueue().size(), localExecutor.getQueue().remainingCapacity(),
651                         localExecutor.getActiveCount());
652             } else if (localExecutor.getQueue().size() >= 50) {
653                 logger.warn(
654                         "Many ({}) tasks queued in executor! This might be sign of bad design or bug in the addon code.",
655                         localExecutor.getQueue().size());
656             }
657         }
658     }
659
660     private String filterToString(FilterCriteria filter) {
661         return String.format(
662                 "FilterCriteria@%s(item=%s, pageNumber=%d, pageSize=%d, time=[%s, %s, %s], state=[%s, %s of %s] )",
663                 System.identityHashCode(filter), filter.getItemName(), filter.getPageNumber(), filter.getPageSize(),
664                 filter.getBeginDate(), filter.getEndDate(), filter.getOrdering(), filter.getOperator(),
665                 filter.getState(), filter.getState() == null ? "null" : filter.getState().getClass().getSimpleName());
666     }
667 }