]> git.basschouten.com Git - openhab-addons.git/blob
5e58c2e5d32578f6d250176a280ae87b81b55b72
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.persistence.dynamodb.internal;
14
15 import java.lang.reflect.InvocationTargetException;
16 import java.net.URI;
17 import java.time.Duration;
18 import java.time.Instant;
19 import java.time.ZonedDateTime;
20 import java.util.Collections;
21 import java.util.Iterator;
22 import java.util.List;
23 import java.util.Locale;
24 import java.util.Map;
25 import java.util.Set;
26 import java.util.concurrent.CompletableFuture;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.ThreadPoolExecutor;
31 import java.util.stream.Collectors;
32
33 import javax.measure.Unit;
34
35 import org.eclipse.jdt.annotation.NonNullByDefault;
36 import org.eclipse.jdt.annotation.Nullable;
37 import org.openhab.core.common.ThreadPoolManager;
38 import org.openhab.core.config.core.ConfigurableService;
39 import org.openhab.core.items.GenericItem;
40 import org.openhab.core.items.GroupItem;
41 import org.openhab.core.items.Item;
42 import org.openhab.core.items.ItemNotFoundException;
43 import org.openhab.core.items.ItemRegistry;
44 import org.openhab.core.library.items.NumberItem;
45 import org.openhab.core.library.types.QuantityType;
46 import org.openhab.core.persistence.FilterCriteria;
47 import org.openhab.core.persistence.HistoricItem;
48 import org.openhab.core.persistence.PersistenceItemInfo;
49 import org.openhab.core.persistence.PersistenceService;
50 import org.openhab.core.persistence.QueryablePersistenceService;
51 import org.openhab.core.persistence.strategy.PersistenceStrategy;
52 import org.openhab.core.types.State;
53 import org.openhab.core.types.UnDefType;
54 import org.osgi.framework.BundleContext;
55 import org.osgi.framework.Constants;
56 import org.osgi.service.component.annotations.Activate;
57 import org.osgi.service.component.annotations.Component;
58 import org.osgi.service.component.annotations.Deactivate;
59 import org.osgi.service.component.annotations.Reference;
60 import org.reactivestreams.Subscriber;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
63
64 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
65 import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
66 import software.amazon.awssdk.awscore.defaultsmode.DefaultsMode;
67 import software.amazon.awssdk.core.async.SdkPublisher;
68 import software.amazon.awssdk.core.client.config.ClientAsyncConfiguration;
69 import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
70 import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
71 import software.amazon.awssdk.enhanced.dynamodb.DynamoDbAsyncTable;
72 import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedAsyncClient;
73 import software.amazon.awssdk.enhanced.dynamodb.TableSchema;
74 import software.amazon.awssdk.enhanced.dynamodb.model.QueryEnhancedRequest;
75 import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
76 import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
77 import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClientBuilder;
78 import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
79
80 /**
81  * This is the implementation of the DynamoDB {@link PersistenceService}. It persists item values
82  * using the <a href="https://aws.amazon.com/dynamodb/">Amazon DynamoDB</a> database. The states (
83  * {@link State}) of an {@link Item} are persisted in DynamoDB tables.
84  *
85  * The service creates tables automatically, one for numbers, and one for strings.
86  *
87  * @see AbstractDynamoDBItem.fromState for details how different items are persisted
88  *
89  * @author Sami Salonen - Initial contribution
90  * @author Kai Kreuzer - Migration to 3.x
91  *
92  */
93 @NonNullByDefault
94 @Component(service = { PersistenceService.class,
95         QueryablePersistenceService.class }, configurationPid = "org.openhab.dynamodb", //
96         property = Constants.SERVICE_PID + "=org.openhab.dynamodb")
97 @ConfigurableService(category = "persistence", label = "DynamoDB Persistence Service", description_uri = DynamoDBPersistenceService.CONFIG_URI)
98 public class DynamoDBPersistenceService implements QueryablePersistenceService {
99
100     private static final int MAX_CONCURRENCY = 100;
101
102     protected static final String CONFIG_URI = "persistence:dynamodb";
103
104     private static final String DYNAMODB_THREADPOOL_NAME = "dynamodbPersistenceService";
105
106     private ItemRegistry itemRegistry;
107     private @Nullable DynamoDbEnhancedAsyncClient client;
108     private @Nullable DynamoDbAsyncClient lowLevelClient;
109     private static final Logger logger = LoggerFactory.getLogger(DynamoDBPersistenceService.class);
110     private boolean isProperlyConfigured;
111     private @Nullable DynamoDBConfig dbConfig;
112     private @Nullable DynamoDBTableNameResolver tableNameResolver;
113     private final ExecutorService executor = ThreadPoolManager.getPool(DYNAMODB_THREADPOOL_NAME);
114     private static final Duration TIMEOUT_API_CALL = Duration.ofSeconds(60);
115     private static final Duration TIMEOUT_API_CALL_ATTEMPT = Duration.ofSeconds(5);
116     private Map<Class<? extends DynamoDBItem<?>>, DynamoDbAsyncTable<? extends DynamoDBItem<?>>> tableCache = new ConcurrentHashMap<>(
117             2);
118
119     private @Nullable URI endpointOverride;
120
121     void overrideConfig(AwsRequestOverrideConfiguration.Builder config) {
122         config.apiCallAttemptTimeout(TIMEOUT_API_CALL_ATTEMPT).apiCallTimeout(TIMEOUT_API_CALL);
123     }
124
125     void overrideConfig(ClientOverrideConfiguration.Builder config) {
126         DynamoDBConfig localDbConfig = dbConfig;
127         config.apiCallAttemptTimeout(TIMEOUT_API_CALL_ATTEMPT).apiCallTimeout(TIMEOUT_API_CALL);
128         if (localDbConfig != null) {
129             localDbConfig.getRetryPolicy().ifPresent(config::retryPolicy);
130         }
131     }
132
133     @Activate
134     public DynamoDBPersistenceService(final @Reference ItemRegistry itemRegistry) {
135         this.itemRegistry = itemRegistry;
136     }
137
138     /**
139      * For tests
140      */
141     DynamoDBPersistenceService(final ItemRegistry itemRegistry, @Nullable URI endpointOverride) {
142         this.itemRegistry = itemRegistry;
143         this.endpointOverride = endpointOverride;
144     }
145
146     /**
147      * For tests
148      */
149     @Nullable
150     URI getEndpointOverride() {
151         return endpointOverride;
152     }
153
154     @Nullable
155     DynamoDbAsyncClient getLowLevelClient() {
156         return lowLevelClient;
157     }
158
159     ExecutorService getExecutor() {
160         return executor;
161     }
162
163     @Nullable
164     DynamoDBTableNameResolver getTableNameResolver() {
165         return tableNameResolver;
166     }
167
168     @Nullable
169     DynamoDBConfig getDbConfig() {
170         return dbConfig;
171     }
172
173     @Activate
174     public void activate(final @Nullable BundleContext bundleContext, final Map<String, Object> config) {
175         disconnect();
176         DynamoDBConfig localDbConfig = dbConfig = DynamoDBConfig.fromConfig(config);
177         if (localDbConfig == null) {
178             // Configuration was invalid. Abort service activation.
179             // Error is already logger in fromConfig.
180             return;
181         }
182         tableNameResolver = new DynamoDBTableNameResolver(localDbConfig.getTableRevision(), localDbConfig.getTable(),
183                 localDbConfig.getTablePrefixLegacy());
184         try {
185             if (!ensureClient()) {
186                 logger.error("Error creating dynamodb database client. Aborting service activation.");
187                 return;
188             }
189         } catch (Exception e) {
190             logger.error("Error constructing dynamodb client", e);
191             return;
192         }
193
194         isProperlyConfigured = true;
195         logger.debug("dynamodb persistence service activated");
196     }
197
198     @Deactivate
199     public void deactivate() {
200         logger.debug("dynamodb persistence service deactivated");
201         logIfManyQueuedTasks();
202         disconnect();
203     }
204
205     /**
206      * Initializes Dynamo DB client and determines schema
207      *
208      * If construction fails, error is logged and false is returned.
209      *
210      * @return whether initialization was successful.
211      */
212     private boolean ensureClient() {
213         DynamoDBConfig localDbConfig = dbConfig;
214         if (localDbConfig == null) {
215             return false;
216         }
217         if (client == null) {
218             try {
219                 synchronized (this) {
220                     if (this.client != null) {
221                         return true;
222                     }
223                     DynamoDbAsyncClientBuilder lowlevelClientBuilder = DynamoDbAsyncClient.builder()
224                             .defaultsMode(DefaultsMode.STANDARD)
225                             .credentialsProvider(StaticCredentialsProvider.create(localDbConfig.getCredentials()))
226                             .httpClient(NettyNioAsyncHttpClient.builder().maxConcurrency(MAX_CONCURRENCY).build())
227                             .asyncConfiguration(
228                                     ClientAsyncConfiguration.builder()
229                                             .advancedOption(SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
230                                                     executor)
231                                             .build())
232                             .overrideConfiguration(this::overrideConfig).region(localDbConfig.getRegion());
233                     if (endpointOverride != null) {
234                         logger.debug("DynamoDB has been overriden to {}", endpointOverride);
235                         lowlevelClientBuilder.endpointOverride(endpointOverride);
236                     }
237                     DynamoDbAsyncClient lowlevelClient = lowlevelClientBuilder.build();
238                     client = DynamoDbEnhancedAsyncClient.builder().dynamoDbClient(lowlevelClient).build();
239                     this.lowLevelClient = lowlevelClient;
240                 }
241             } catch (Exception e) {
242                 logger.error("Error constructing dynamodb client", e);
243                 return false;
244             }
245         }
246         return true;
247     }
248
249     private CompletableFuture<Boolean> resolveTableSchema() {
250         DynamoDBTableNameResolver localTableNameResolver = tableNameResolver;
251         DynamoDbAsyncClient localLowLevelClient = lowLevelClient;
252         if (localTableNameResolver == null || localLowLevelClient == null) {
253             throw new IllegalStateException("tableNameResolver or localLowLevelClient not available");
254         }
255         if (localTableNameResolver.isFullyResolved()) {
256             return CompletableFuture.completedFuture(true);
257         } else {
258             synchronized (localTableNameResolver) {
259                 if (localTableNameResolver.isFullyResolved()) {
260                     return CompletableFuture.completedFuture(true);
261                 }
262                 return localTableNameResolver.resolveSchema(localLowLevelClient,
263                         b -> b.overrideConfiguration(this::overrideConfig), executor).thenApplyAsync(resolved -> {
264                             if (resolved && localTableNameResolver.getTableSchema() == ExpectedTableSchema.LEGACY) {
265                                 logger.warn(
266                                         "Using legacy table format. Is it recommended to migrate to the new table format: specify the 'table' parameter and unset the old 'tablePrefix' parameter.");
267                             }
268                             return resolved;
269                         }, executor);
270             }
271         }
272     }
273
274     private <T extends DynamoDBItem<?>> DynamoDbAsyncTable<T> getTable(Class<T> dtoClass) {
275         DynamoDbEnhancedAsyncClient localClient = client;
276         DynamoDBTableNameResolver localTableNameResolver = tableNameResolver;
277         if (!ensureClient() || localClient == null || localTableNameResolver == null) {
278             throw new IllegalStateException("Client not ready");
279         }
280         ExpectedTableSchema expectedTableSchemaRevision = localTableNameResolver.getTableSchema();
281         String tableName = localTableNameResolver.fromClass(dtoClass);
282         final TableSchema<T> schema = getDynamoDBTableSchema(dtoClass, expectedTableSchemaRevision);
283         @SuppressWarnings("unchecked") // OK since this is the only place tableCache is populated
284         DynamoDbAsyncTable<T> table = (DynamoDbAsyncTable<T>) tableCache.computeIfAbsent(dtoClass, clz -> {
285             return localClient.table(tableName, schema);
286         });
287         if (table == null) {
288             // Invariant. To make null checker happy
289             throw new IllegalStateException();
290         }
291         return table;
292     }
293
294     private static <T extends DynamoDBItem<?>> TableSchema<T> getDynamoDBTableSchema(Class<T> dtoClass,
295             ExpectedTableSchema expectedTableSchemaRevision) {
296         if (dtoClass.equals(DynamoDBBigDecimalItem.class)) {
297             @SuppressWarnings("unchecked") // OK thanks to above conditional
298             TableSchema<T> schema = (TableSchema<T>) (expectedTableSchemaRevision == ExpectedTableSchema.NEW
299                     ? DynamoDBBigDecimalItem.TABLE_SCHEMA_NEW
300                     : DynamoDBBigDecimalItem.TABLE_SCHEMA_LEGACY);
301             return schema;
302         } else if (dtoClass.equals(DynamoDBStringItem.class)) {
303             @SuppressWarnings("unchecked") // OK thanks to above conditional
304             TableSchema<T> schema = (TableSchema<T>) (expectedTableSchemaRevision == ExpectedTableSchema.NEW
305                     ? DynamoDBStringItem.TABLE_SCHEMA_NEW
306                     : DynamoDBStringItem.TABLE_SCHEMA_LEGACY);
307             return schema;
308         } else {
309             throw new IllegalStateException("Unknown DTO class. Bug");
310         }
311     }
312
313     private void disconnect() {
314         DynamoDbAsyncClient localLowLevelClient = lowLevelClient;
315         if (client == null || localLowLevelClient == null) {
316             return;
317         }
318         localLowLevelClient.close();
319         lowLevelClient = null;
320         client = null;
321         dbConfig = null;
322         tableNameResolver = null;
323         isProperlyConfigured = false;
324         tableCache.clear();
325     }
326
327     protected boolean isReadyToStore() {
328         return isProperlyConfigured && ensureClient();
329     }
330
331     @Override
332     public String getId() {
333         return "dynamodb";
334     }
335
336     @Override
337     public String getLabel(@Nullable Locale locale) {
338         return "DynamoDB";
339     }
340
341     @Override
342     public Set<PersistenceItemInfo> getItemInfo() {
343         return Collections.emptySet();
344     }
345
346     @Override
347     public Iterable<HistoricItem> query(FilterCriteria filter) {
348         logIfManyQueuedTasks();
349         Instant start = Instant.now();
350         String filterDescription = filterToString(filter);
351         logger.trace("Got a query with filter {}", filterDescription);
352         DynamoDbEnhancedAsyncClient localClient = client;
353         DynamoDBTableNameResolver localTableNameResolver = tableNameResolver;
354         if (!isProperlyConfigured) {
355             logger.debug("Configuration for dynamodb not yet loaded or broken. Returning empty query results.");
356             return Collections.emptyList();
357         }
358         if (!ensureClient() || localClient == null || localTableNameResolver == null) {
359             logger.warn("DynamoDB not connected. Returning empty query results.");
360             return Collections.emptyList();
361         }
362
363         //
364         // Resolve unclear table schema if needed
365         //
366         try {
367             Boolean resolved = resolveTableSchema().get();
368             if (!resolved) {
369                 logger.warn("Table schema not resolved, cannot query data.");
370                 return Collections.emptyList();
371             }
372         } catch (InterruptedException e) {
373             logger.warn("Table schema resolution interrupted, cannot query data");
374             return Collections.emptyList();
375         } catch (ExecutionException e) {
376             Throwable cause = e.getCause();
377             logger.warn("Table schema resolution errored, cannot query data: {} {}",
378                     cause == null ? e.getClass().getSimpleName() : cause.getClass().getSimpleName(),
379                     cause == null ? e.getMessage() : cause.getMessage());
380             return Collections.emptyList();
381         }
382         try {
383             //
384             // Proceed with query
385             //
386             String itemName = filter.getItemName();
387             if (itemName == null) {
388                 logger.warn("Item name is missing in filter {}", filter);
389                 return List.of();
390             }
391             Item item = getItemFromRegistry(itemName);
392             if (item == null) {
393                 logger.warn("Could not get item {} from registry! Returning empty query results.", itemName);
394                 return Collections.emptyList();
395             }
396             if (item instanceof GroupItem) {
397                 item = ((GroupItem) item).getBaseItem();
398                 logger.debug("Item is instanceof GroupItem '{}'", itemName);
399                 if (item == null) {
400                     logger.debug("BaseItem of GroupItem is null. Ignore and give up!");
401                     return Collections.emptyList();
402                 }
403                 if (item instanceof GroupItem) {
404                     logger.debug("BaseItem of GroupItem is a GroupItem too. Ignore and give up!");
405                     return Collections.emptyList();
406                 }
407             }
408             boolean legacy = localTableNameResolver.getTableSchema() == ExpectedTableSchema.LEGACY;
409             Class<? extends DynamoDBItem<?>> dtoClass = AbstractDynamoDBItem.getDynamoItemClass(item.getClass(),
410                     legacy);
411             String tableName = localTableNameResolver.fromClass(dtoClass);
412             DynamoDbAsyncTable<? extends DynamoDBItem<?>> table = getTable(dtoClass);
413             logger.debug("Item {} (of type {}) will be tried to query using DTO class {} from table {}", itemName,
414                     item.getClass().getSimpleName(), dtoClass.getSimpleName(), tableName);
415
416             QueryEnhancedRequest queryExpression = DynamoDBQueryUtils.createQueryExpression(dtoClass,
417                     localTableNameResolver.getTableSchema(), item, filter);
418
419             CompletableFuture<List<DynamoDBItem<?>>> itemsFuture = new CompletableFuture<>();
420             final SdkPublisher<? extends DynamoDBItem<?>> itemPublisher = table.query(queryExpression).items();
421             Subscriber<DynamoDBItem<?>> pageSubscriber = new PageOfInterestSubscriber<DynamoDBItem<?>>(itemsFuture,
422                     filter.getPageNumber(), filter.getPageSize());
423             itemPublisher.subscribe(pageSubscriber);
424             // NumberItem.getUnit() is expensive, we avoid calling it in the loop
425             // by fetching the unit here.
426             final Item localItem = item;
427             final Unit<?> itemUnit = localItem instanceof NumberItem ? ((NumberItem) localItem).getUnit() : null;
428             try {
429                 @SuppressWarnings("null")
430                 List<HistoricItem> results = itemsFuture.get().stream().map(dynamoItem -> {
431                     HistoricItem historicItem = dynamoItem.asHistoricItem(localItem, itemUnit);
432                     if (historicItem == null) {
433                         logger.warn(
434                                 "Dynamo item {} serialized state '{}' cannot be converted to item {} {}. Item type changed since persistence. Ignoring",
435                                 dynamoItem.getClass().getSimpleName(), dynamoItem.getState(),
436                                 localItem.getClass().getSimpleName(), localItem.getName());
437                         return null;
438                     }
439                     logger.trace("Dynamo item {} converted to historic item: {}", localItem, historicItem);
440                     return historicItem;
441                 }).filter(value -> value != null).collect(Collectors.toList());
442                 logger.debug("Query completed in {} ms. Filter was {}",
443                         Duration.between(start, Instant.now()).toMillis(), filterDescription);
444                 return results;
445             } catch (InterruptedException e) {
446                 logger.warn("Query interrupted. Filter was {}", filterDescription);
447                 return Collections.emptyList();
448             } catch (ExecutionException e) {
449                 Throwable cause = e.getCause();
450                 if (cause instanceof ResourceNotFoundException) {
451                     logger.trace("Query failed since the DynamoDB table '{}' does not exist. Filter was {}", tableName,
452                             filterDescription);
453                 } else if (logger.isTraceEnabled()) {
454                     logger.trace("Query failed. Filter was {}", filterDescription, e);
455                 } else {
456                     logger.warn("Query failed {} {}. Filter was {}",
457                             cause == null ? e.getClass().getSimpleName() : cause.getClass().getSimpleName(),
458                             cause == null ? e.getMessage() : cause.getMessage(), filterDescription);
459                 }
460                 return Collections.emptyList();
461             }
462         } catch (Exception e) {
463             logger.error("Unexpected error with query having filter {}: {} {}. Returning empty query results.",
464                     filterDescription, e.getClass().getSimpleName(), e.getMessage());
465             return Collections.emptyList();
466         }
467     }
468
469     /**
470      * Retrieves the item for the given name from the item registry
471      *
472      * @param itemName
473      * @return item with the given name, or null if no such item exists in item registry.
474      */
475     private @Nullable Item getItemFromRegistry(String itemName) {
476         try {
477             return itemRegistry.getItem(itemName);
478         } catch (ItemNotFoundException e1) {
479             return null;
480         }
481     }
482
483     @Override
484     public List<PersistenceStrategy> getDefaultStrategies() {
485         return List.of(PersistenceStrategy.Globals.RESTORE, PersistenceStrategy.Globals.CHANGE);
486     }
487
488     @Override
489     public void store(Item item) {
490         store(item, null);
491     }
492
493     @Override
494     public void store(Item item, @Nullable String alias) {
495         // Timestamp and capture state immediately as rest of the store is asynchronous (state might change in between)
496         ZonedDateTime time = ZonedDateTime.now();
497
498         logIfManyQueuedTasks();
499         if (!(item instanceof GenericItem)) {
500             return;
501         }
502         if (item.getState() instanceof UnDefType) {
503             logger.debug("Undefined item state received. Not storing item {}.", item.getName());
504             return;
505         }
506         if (!isReadyToStore()) {
507             logger.warn("Not ready to store (config error?), not storing item {}.", item.getName());
508             return;
509         }
510         // Get Item describing the real type of data
511         // With non-group items this is same as the argument item. With Group items, this is item describing the type of
512         // state stored in the group.
513         final Item itemTemplate;
514         try {
515             itemTemplate = getEffectiveItem(item);
516         } catch (IllegalStateException e) {
517             // Exception is raised when underlying item type cannot be determined with Group item
518             // Logged already
519             return;
520         }
521
522         String effectiveName = (alias != null) ? alias : item.getName();
523
524         // We do not want to rely item.state since async context below can execute much later.
525         // We 'copy' the item for local use. copyItem also normalizes the unit with NumberItems.
526         final GenericItem copiedItem = copyItem(itemTemplate, item, effectiveName, null);
527
528         resolveTableSchema().thenAcceptAsync(resolved -> {
529             if (!resolved) {
530                 logger.warn("Table schema not resolved, not storing item {}.", copiedItem.getName());
531                 return;
532             }
533
534             DynamoDbEnhancedAsyncClient localClient = client;
535             DynamoDbAsyncClient localLowlevelClient = lowLevelClient;
536             DynamoDBConfig localConfig = dbConfig;
537             DynamoDBTableNameResolver localTableNameResolver = tableNameResolver;
538             if (!isProperlyConfigured || localClient == null || localLowlevelClient == null || localConfig == null
539                     || localTableNameResolver == null) {
540                 logger.warn("Not ready to store (config error?), not storing item {}.", item.getName());
541                 return;
542             }
543
544             Integer expireDays = localConfig.getExpireDays();
545
546             final DynamoDBItem<?> dto;
547             switch (localTableNameResolver.getTableSchema()) {
548                 case NEW:
549                     dto = AbstractDynamoDBItem.fromStateNew(copiedItem, time, expireDays);
550                     break;
551                 case LEGACY:
552                     dto = AbstractDynamoDBItem.fromStateLegacy(copiedItem, time);
553                     break;
554                 default:
555                     throw new IllegalStateException("Unexpected. Bug");
556             }
557             logger.trace("store() called with item {} {} '{}', which was converted to DTO {}",
558                     copiedItem.getClass().getSimpleName(), effectiveName, copiedItem.getState(), dto);
559             dto.accept(new DynamoDBItemVisitor<TableCreatingPutItem<? extends DynamoDBItem<?>>>() {
560
561                 @Override
562                 public TableCreatingPutItem<? extends DynamoDBItem<?>> visit(
563                         DynamoDBBigDecimalItem dynamoBigDecimalItem) {
564                     return new TableCreatingPutItem<DynamoDBBigDecimalItem>(DynamoDBPersistenceService.this,
565                             dynamoBigDecimalItem, getTable(DynamoDBBigDecimalItem.class));
566                 }
567
568                 @Override
569                 public TableCreatingPutItem<? extends DynamoDBItem<?>> visit(DynamoDBStringItem dynamoStringItem) {
570                     return new TableCreatingPutItem<DynamoDBStringItem>(DynamoDBPersistenceService.this,
571                             dynamoStringItem, getTable(DynamoDBStringItem.class));
572                 }
573             }).putItemAsync();
574         }, executor).exceptionally(e -> {
575             logger.error("Unexcepted error", e);
576             return null;
577         });
578     }
579
580     private Item getEffectiveItem(Item item) {
581         final Item effectiveItem;
582         if (item instanceof GroupItem) {
583             Item baseItem = ((GroupItem) item).getBaseItem();
584             if (baseItem == null) {
585                 // if GroupItem:<ItemType> is not defined in
586                 // *.items using StringType
587                 logger.debug(
588                         "Cannot detect ItemType for {} because the GroupItems' base type isn't set in *.items File.",
589                         item.getName());
590                 Iterator<Item> firstGroupMemberItem = ((GroupItem) item).getMembers().iterator();
591                 if (firstGroupMemberItem.hasNext()) {
592                     effectiveItem = firstGroupMemberItem.next();
593                 } else {
594                     throw new IllegalStateException("GroupItem " + item.getName()
595                             + " does not have children nor base item set, cannot determine underlying item type. Aborting!");
596                 }
597             } else {
598                 effectiveItem = baseItem;
599             }
600         } else {
601             effectiveItem = item;
602         }
603         return effectiveItem;
604     }
605
606     /**
607      * Copy item and optionally override name and state
608      *
609      * State is normalized to source item's unit with Quantity NumberItems and QuantityTypes
610      *
611      * @param itemTemplate 'template item' to be used to construct the new copy. It is also used to determine UoM unit
612      *            and get GenericItem.type
613      * @param item item that is used to acquire name and state
614      * @param nameOverride name override for the resulting copy
615      * @param stateOverride state override for the resulting copy
616      * @throws IllegalArgumentException when state is QuantityType and not compatible with item
617      */
618     static GenericItem copyItem(Item itemTemplate, Item item, @Nullable String nameOverride,
619             @Nullable State stateOverride) {
620         final GenericItem copiedItem;
621         try {
622             if (itemTemplate instanceof NumberItem) {
623                 copiedItem = (GenericItem) itemTemplate.getClass().getDeclaredConstructor(String.class, String.class)
624                         .newInstance(itemTemplate.getType(), nameOverride == null ? item.getName() : nameOverride);
625             } else {
626                 copiedItem = (GenericItem) itemTemplate.getClass().getDeclaredConstructor(String.class)
627                         .newInstance(nameOverride == null ? item.getName() : nameOverride);
628             }
629
630         } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException
631                 | NoSuchMethodException | SecurityException e) {
632             throw new IllegalArgumentException(item.toString(), e);
633         }
634         State state = stateOverride == null ? item.getState() : stateOverride;
635         if (state instanceof QuantityType<?> && itemTemplate instanceof NumberItem) {
636             Unit<?> itemUnit = ((NumberItem) itemTemplate).getUnit();
637             if (itemUnit != null) {
638                 State convertedState = ((QuantityType<?>) state).toUnit(itemUnit);
639                 if (convertedState == null) {
640                     logger.error("Unexpected unit conversion failure: {} to item unit {}", state, itemUnit);
641                     throw new IllegalArgumentException(
642                             String.format("Unexpected unit conversion failure: %s to item unit %s", state, itemUnit));
643                 }
644                 state = convertedState;
645             }
646         }
647         copiedItem.setState(state);
648         return copiedItem;
649     }
650
651     private void logIfManyQueuedTasks() {
652         if (executor instanceof ThreadPoolExecutor) {
653             ThreadPoolExecutor localExecutor = (ThreadPoolExecutor) executor;
654             if (localExecutor.getQueue().size() >= 5) {
655                 logger.trace("executor queue size: {}, remaining space {}. Active threads {}",
656                         localExecutor.getQueue().size(), localExecutor.getQueue().remainingCapacity(),
657                         localExecutor.getActiveCount());
658             } else if (localExecutor.getQueue().size() >= 50) {
659                 logger.warn(
660                         "Many ({}) tasks queued in executor! This might be sign of bad design or bug in the addon code.",
661                         localExecutor.getQueue().size());
662             }
663         }
664     }
665
666     private String filterToString(FilterCriteria filter) {
667         return String.format(
668                 "FilterCriteria@%s(item=%s, pageNumber=%d, pageSize=%d, time=[%s, %s, %s], state=[%s, %s of %s] )",
669                 System.identityHashCode(filter), filter.getItemName(), filter.getPageNumber(), filter.getPageSize(),
670                 filter.getBeginDate(), filter.getEndDate(), filter.getOrdering(), filter.getOperator(),
671                 filter.getState(), filter.getState() == null ? "null" : filter.getState().getClass().getSimpleName());
672     }
673 }