]> git.basschouten.com Git - openhab-addons.git/blob
35ad9f4c55c8f37f6467aa15fb4582d978dc38b2
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.persistence.dynamodb.internal;
14
15 import java.lang.reflect.InvocationTargetException;
16 import java.net.URI;
17 import java.time.Duration;
18 import java.time.Instant;
19 import java.time.ZonedDateTime;
20 import java.util.Collections;
21 import java.util.Iterator;
22 import java.util.List;
23 import java.util.Locale;
24 import java.util.Map;
25 import java.util.Set;
26 import java.util.concurrent.CompletableFuture;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.ThreadPoolExecutor;
31 import java.util.stream.Collectors;
32
33 import javax.measure.Unit;
34
35 import org.eclipse.jdt.annotation.NonNullByDefault;
36 import org.eclipse.jdt.annotation.Nullable;
37 import org.openhab.core.common.ThreadPoolManager;
38 import org.openhab.core.config.core.ConfigurableService;
39 import org.openhab.core.i18n.UnitProvider;
40 import org.openhab.core.items.GenericItem;
41 import org.openhab.core.items.GroupItem;
42 import org.openhab.core.items.Item;
43 import org.openhab.core.items.ItemNotFoundException;
44 import org.openhab.core.items.ItemRegistry;
45 import org.openhab.core.library.items.NumberItem;
46 import org.openhab.core.library.types.QuantityType;
47 import org.openhab.core.persistence.FilterCriteria;
48 import org.openhab.core.persistence.HistoricItem;
49 import org.openhab.core.persistence.PersistenceItemInfo;
50 import org.openhab.core.persistence.PersistenceService;
51 import org.openhab.core.persistence.QueryablePersistenceService;
52 import org.openhab.core.persistence.strategy.PersistenceStrategy;
53 import org.openhab.core.types.State;
54 import org.openhab.core.types.UnDefType;
55 import org.osgi.framework.BundleContext;
56 import org.osgi.framework.Constants;
57 import org.osgi.service.component.annotations.Activate;
58 import org.osgi.service.component.annotations.Component;
59 import org.osgi.service.component.annotations.Deactivate;
60 import org.osgi.service.component.annotations.Reference;
61 import org.reactivestreams.Subscriber;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64
65 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
66 import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
67 import software.amazon.awssdk.awscore.defaultsmode.DefaultsMode;
68 import software.amazon.awssdk.core.async.SdkPublisher;
69 import software.amazon.awssdk.core.client.config.ClientAsyncConfiguration;
70 import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
71 import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
72 import software.amazon.awssdk.enhanced.dynamodb.DynamoDbAsyncTable;
73 import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedAsyncClient;
74 import software.amazon.awssdk.enhanced.dynamodb.TableSchema;
75 import software.amazon.awssdk.enhanced.dynamodb.model.QueryEnhancedRequest;
76 import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
77 import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
78 import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClientBuilder;
79 import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
80
81 /**
82  * This is the implementation of the DynamoDB {@link PersistenceService}. It persists item values
83  * using the <a href="https://aws.amazon.com/dynamodb/">Amazon DynamoDB</a> database. The states (
84  * {@link State}) of an {@link Item} are persisted in DynamoDB tables.
85  *
86  * The service creates tables automatically, one for numbers, and one for strings.
87  *
88  * @see AbstractDynamoDBItem.fromState for details how different items are persisted
89  *
90  * @author Sami Salonen - Initial contribution
91  * @author Kai Kreuzer - Migration to 3.x
92  *
93  */
94 @NonNullByDefault
95 @Component(service = { PersistenceService.class,
96         QueryablePersistenceService.class }, configurationPid = "org.openhab.dynamodb", //
97         property = Constants.SERVICE_PID + "=org.openhab.dynamodb")
98 @ConfigurableService(category = "persistence", label = "DynamoDB Persistence Service", description_uri = DynamoDBPersistenceService.CONFIG_URI)
99 public class DynamoDBPersistenceService implements QueryablePersistenceService {
100
101     private static final int MAX_CONCURRENCY = 100;
102
103     protected static final String CONFIG_URI = "persistence:dynamodb";
104
105     private static final String DYNAMODB_THREADPOOL_NAME = "dynamodbPersistenceService";
106
107     private final ItemRegistry itemRegistry;
108     private final UnitProvider unitProvider;
109     private @Nullable DynamoDbEnhancedAsyncClient client;
110     private @Nullable DynamoDbAsyncClient lowLevelClient;
111     private static final Logger logger = LoggerFactory.getLogger(DynamoDBPersistenceService.class);
112     private boolean isProperlyConfigured;
113     private @Nullable DynamoDBConfig dbConfig;
114     private @Nullable DynamoDBTableNameResolver tableNameResolver;
115     private final ExecutorService executor = ThreadPoolManager.getPool(DYNAMODB_THREADPOOL_NAME);
116     private static final Duration TIMEOUT_API_CALL = Duration.ofSeconds(60);
117     private static final Duration TIMEOUT_API_CALL_ATTEMPT = Duration.ofSeconds(5);
118     private Map<Class<? extends DynamoDBItem<?>>, DynamoDbAsyncTable<? extends DynamoDBItem<?>>> tableCache = new ConcurrentHashMap<>(
119             2);
120
121     private @Nullable URI endpointOverride;
122
123     void overrideConfig(AwsRequestOverrideConfiguration.Builder config) {
124         config.apiCallAttemptTimeout(TIMEOUT_API_CALL_ATTEMPT).apiCallTimeout(TIMEOUT_API_CALL);
125     }
126
127     void overrideConfig(ClientOverrideConfiguration.Builder config) {
128         DynamoDBConfig localDbConfig = dbConfig;
129         config.apiCallAttemptTimeout(TIMEOUT_API_CALL_ATTEMPT).apiCallTimeout(TIMEOUT_API_CALL);
130         if (localDbConfig != null) {
131             localDbConfig.getRetryPolicy().ifPresent(config::retryPolicy);
132         }
133     }
134
135     @Activate
136     public DynamoDBPersistenceService(final @Reference ItemRegistry itemRegistry,
137             final @Reference UnitProvider unitProvider) {
138         this.itemRegistry = itemRegistry;
139         this.unitProvider = unitProvider;
140     }
141
142     /**
143      * For tests
144      */
145     DynamoDBPersistenceService(final ItemRegistry itemRegistry, final UnitProvider unitProvider,
146             @Nullable URI endpointOverride) {
147         this.itemRegistry = itemRegistry;
148         this.unitProvider = unitProvider;
149         this.endpointOverride = endpointOverride;
150     }
151
152     /**
153      * For tests
154      */
155     @Nullable
156     URI getEndpointOverride() {
157         return endpointOverride;
158     }
159
160     @Nullable
161     DynamoDbAsyncClient getLowLevelClient() {
162         return lowLevelClient;
163     }
164
165     ExecutorService getExecutor() {
166         return executor;
167     }
168
169     @Nullable
170     DynamoDBTableNameResolver getTableNameResolver() {
171         return tableNameResolver;
172     }
173
174     @Nullable
175     DynamoDBConfig getDbConfig() {
176         return dbConfig;
177     }
178
179     @Activate
180     public void activate(final @Nullable BundleContext bundleContext, final Map<String, Object> config) {
181         disconnect();
182         DynamoDBConfig localDbConfig = dbConfig = DynamoDBConfig.fromConfig(config);
183         if (localDbConfig == null) {
184             // Configuration was invalid. Abort service activation.
185             // Error is already logger in fromConfig.
186             return;
187         }
188         tableNameResolver = new DynamoDBTableNameResolver(localDbConfig.getTableRevision(), localDbConfig.getTable(),
189                 localDbConfig.getTablePrefixLegacy());
190         try {
191             if (!ensureClient()) {
192                 logger.error("Error creating dynamodb database client. Aborting service activation.");
193                 return;
194             }
195         } catch (Exception e) {
196             logger.error("Error constructing dynamodb client", e);
197             return;
198         }
199
200         isProperlyConfigured = true;
201         logger.debug("dynamodb persistence service activated");
202     }
203
204     @Deactivate
205     public void deactivate() {
206         logger.debug("dynamodb persistence service deactivated");
207         logIfManyQueuedTasks();
208         disconnect();
209     }
210
211     /**
212      * Initializes Dynamo DB client and determines schema
213      *
214      * If construction fails, error is logged and false is returned.
215      *
216      * @return whether initialization was successful.
217      */
218     private boolean ensureClient() {
219         DynamoDBConfig localDbConfig = dbConfig;
220         if (localDbConfig == null) {
221             return false;
222         }
223         if (client == null) {
224             try {
225                 synchronized (this) {
226                     if (this.client != null) {
227                         return true;
228                     }
229                     DynamoDbAsyncClientBuilder lowlevelClientBuilder = DynamoDbAsyncClient.builder()
230                             .defaultsMode(DefaultsMode.STANDARD)
231                             .credentialsProvider(StaticCredentialsProvider.create(localDbConfig.getCredentials()))
232                             .httpClient(NettyNioAsyncHttpClient.builder().maxConcurrency(MAX_CONCURRENCY).build())
233                             .asyncConfiguration(
234                                     ClientAsyncConfiguration.builder()
235                                             .advancedOption(SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
236                                                     executor)
237                                             .build())
238                             .overrideConfiguration(this::overrideConfig).region(localDbConfig.getRegion());
239                     if (endpointOverride != null) {
240                         logger.debug("DynamoDB has been overriden to {}", endpointOverride);
241                         lowlevelClientBuilder.endpointOverride(endpointOverride);
242                     }
243                     DynamoDbAsyncClient lowlevelClient = lowlevelClientBuilder.build();
244                     client = DynamoDbEnhancedAsyncClient.builder().dynamoDbClient(lowlevelClient).build();
245                     this.lowLevelClient = lowlevelClient;
246                 }
247             } catch (Exception e) {
248                 logger.error("Error constructing dynamodb client", e);
249                 return false;
250             }
251         }
252         return true;
253     }
254
255     private CompletableFuture<Boolean> resolveTableSchema() {
256         DynamoDBTableNameResolver localTableNameResolver = tableNameResolver;
257         DynamoDbAsyncClient localLowLevelClient = lowLevelClient;
258         if (localTableNameResolver == null || localLowLevelClient == null) {
259             throw new IllegalStateException("tableNameResolver or localLowLevelClient not available");
260         }
261         if (localTableNameResolver.isFullyResolved()) {
262             return CompletableFuture.completedFuture(true);
263         } else {
264             synchronized (localTableNameResolver) {
265                 if (localTableNameResolver.isFullyResolved()) {
266                     return CompletableFuture.completedFuture(true);
267                 }
268                 return localTableNameResolver.resolveSchema(localLowLevelClient,
269                         b -> b.overrideConfiguration(this::overrideConfig), executor).thenApplyAsync(resolved -> {
270                             if (resolved && localTableNameResolver.getTableSchema() == ExpectedTableSchema.LEGACY) {
271                                 logger.warn(
272                                         "Using legacy table format. Is it recommended to migrate to the new table format: specify the 'table' parameter and unset the old 'tablePrefix' parameter.");
273                             }
274                             return resolved;
275                         }, executor);
276             }
277         }
278     }
279
280     private <T extends DynamoDBItem<?>> DynamoDbAsyncTable<T> getTable(Class<T> dtoClass) {
281         DynamoDbEnhancedAsyncClient localClient = client;
282         DynamoDBTableNameResolver localTableNameResolver = tableNameResolver;
283         if (!ensureClient() || localClient == null || localTableNameResolver == null) {
284             throw new IllegalStateException("Client not ready");
285         }
286         ExpectedTableSchema expectedTableSchemaRevision = localTableNameResolver.getTableSchema();
287         String tableName = localTableNameResolver.fromClass(dtoClass);
288         final TableSchema<T> schema = getDynamoDBTableSchema(dtoClass, expectedTableSchemaRevision);
289         @SuppressWarnings("unchecked") // OK since this is the only place tableCache is populated
290         DynamoDbAsyncTable<T> table = (DynamoDbAsyncTable<T>) tableCache.computeIfAbsent(dtoClass, clz -> {
291             return localClient.table(tableName, schema);
292         });
293         if (table == null) {
294             // Invariant. To make null checker happy
295             throw new IllegalStateException();
296         }
297         return table;
298     }
299
300     private static <T extends DynamoDBItem<?>> TableSchema<T> getDynamoDBTableSchema(Class<T> dtoClass,
301             ExpectedTableSchema expectedTableSchemaRevision) {
302         if (dtoClass.equals(DynamoDBBigDecimalItem.class)) {
303             @SuppressWarnings("unchecked") // OK thanks to above conditional
304             TableSchema<T> schema = (TableSchema<T>) (expectedTableSchemaRevision == ExpectedTableSchema.NEW
305                     ? DynamoDBBigDecimalItem.TABLE_SCHEMA_NEW
306                     : DynamoDBBigDecimalItem.TABLE_SCHEMA_LEGACY);
307             return schema;
308         } else if (dtoClass.equals(DynamoDBStringItem.class)) {
309             @SuppressWarnings("unchecked") // OK thanks to above conditional
310             TableSchema<T> schema = (TableSchema<T>) (expectedTableSchemaRevision == ExpectedTableSchema.NEW
311                     ? DynamoDBStringItem.TABLE_SCHEMA_NEW
312                     : DynamoDBStringItem.TABLE_SCHEMA_LEGACY);
313             return schema;
314         } else {
315             throw new IllegalStateException("Unknown DTO class. Bug");
316         }
317     }
318
319     private void disconnect() {
320         DynamoDbAsyncClient localLowLevelClient = lowLevelClient;
321         if (client == null || localLowLevelClient == null) {
322             return;
323         }
324         localLowLevelClient.close();
325         lowLevelClient = null;
326         client = null;
327         dbConfig = null;
328         tableNameResolver = null;
329         isProperlyConfigured = false;
330         tableCache.clear();
331     }
332
333     protected boolean isReadyToStore() {
334         return isProperlyConfigured && ensureClient();
335     }
336
337     @Override
338     public String getId() {
339         return "dynamodb";
340     }
341
342     @Override
343     public String getLabel(@Nullable Locale locale) {
344         return "DynamoDB";
345     }
346
347     @Override
348     public Set<PersistenceItemInfo> getItemInfo() {
349         return Collections.emptySet();
350     }
351
352     @Override
353     public Iterable<HistoricItem> query(FilterCriteria filter) {
354         logIfManyQueuedTasks();
355         Instant start = Instant.now();
356         String filterDescription = filterToString(filter);
357         logger.trace("Got a query with filter {}", filterDescription);
358         DynamoDbEnhancedAsyncClient localClient = client;
359         DynamoDBTableNameResolver localTableNameResolver = tableNameResolver;
360         if (!isProperlyConfigured) {
361             logger.debug("Configuration for dynamodb not yet loaded or broken. Returning empty query results.");
362             return Collections.emptyList();
363         }
364         if (!ensureClient() || localClient == null || localTableNameResolver == null) {
365             logger.warn("DynamoDB not connected. Returning empty query results.");
366             return Collections.emptyList();
367         }
368
369         //
370         // Resolve unclear table schema if needed
371         //
372         try {
373             Boolean resolved = resolveTableSchema().get();
374             if (!resolved) {
375                 logger.warn("Table schema not resolved, cannot query data.");
376                 return Collections.emptyList();
377             }
378         } catch (InterruptedException e) {
379             logger.warn("Table schema resolution interrupted, cannot query data");
380             return Collections.emptyList();
381         } catch (ExecutionException e) {
382             Throwable cause = e.getCause();
383             logger.warn("Table schema resolution errored, cannot query data: {} {}",
384                     cause == null ? e.getClass().getSimpleName() : cause.getClass().getSimpleName(),
385                     cause == null ? e.getMessage() : cause.getMessage());
386             return Collections.emptyList();
387         }
388         try {
389             //
390             // Proceed with query
391             //
392             String itemName = filter.getItemName();
393             if (itemName == null) {
394                 logger.warn("Item name is missing in filter {}", filter);
395                 return List.of();
396             }
397             Item item = getItemFromRegistry(itemName);
398             if (item == null) {
399                 logger.warn("Could not get item {} from registry! Returning empty query results.", itemName);
400                 return Collections.emptyList();
401             }
402             if (item instanceof GroupItem) {
403                 item = ((GroupItem) item).getBaseItem();
404                 logger.debug("Item is instanceof GroupItem '{}'", itemName);
405                 if (item == null) {
406                     logger.debug("BaseItem of GroupItem is null. Ignore and give up!");
407                     return Collections.emptyList();
408                 }
409                 if (item instanceof GroupItem) {
410                     logger.debug("BaseItem of GroupItem is a GroupItem too. Ignore and give up!");
411                     return Collections.emptyList();
412                 }
413             }
414             boolean legacy = localTableNameResolver.getTableSchema() == ExpectedTableSchema.LEGACY;
415             Class<? extends DynamoDBItem<?>> dtoClass = AbstractDynamoDBItem.getDynamoItemClass(item.getClass(),
416                     legacy);
417             String tableName = localTableNameResolver.fromClass(dtoClass);
418             DynamoDbAsyncTable<? extends DynamoDBItem<?>> table = getTable(dtoClass);
419             logger.debug("Item {} (of type {}) will be tried to query using DTO class {} from table {}", itemName,
420                     item.getClass().getSimpleName(), dtoClass.getSimpleName(), tableName);
421
422             QueryEnhancedRequest queryExpression = DynamoDBQueryUtils.createQueryExpression(dtoClass,
423                     localTableNameResolver.getTableSchema(), item, filter, unitProvider);
424
425             CompletableFuture<List<DynamoDBItem<?>>> itemsFuture = new CompletableFuture<>();
426             final SdkPublisher<? extends DynamoDBItem<?>> itemPublisher = table.query(queryExpression).items();
427             Subscriber<DynamoDBItem<?>> pageSubscriber = new PageOfInterestSubscriber<DynamoDBItem<?>>(itemsFuture,
428                     filter.getPageNumber(), filter.getPageSize());
429             itemPublisher.subscribe(pageSubscriber);
430             // NumberItem.getUnit() is expensive, we avoid calling it in the loop
431             // by fetching the unit here.
432             final Item localItem = item;
433             final Unit<?> itemUnit = localItem instanceof NumberItem ? ((NumberItem) localItem).getUnit() : null;
434             try {
435                 @SuppressWarnings("null")
436                 List<HistoricItem> results = itemsFuture.get().stream().map(dynamoItem -> {
437                     HistoricItem historicItem = dynamoItem.asHistoricItem(localItem, itemUnit);
438                     if (historicItem == null) {
439                         logger.warn(
440                                 "Dynamo item {} serialized state '{}' cannot be converted to item {} {}. Item type changed since persistence. Ignoring",
441                                 dynamoItem.getClass().getSimpleName(), dynamoItem.getState(),
442                                 localItem.getClass().getSimpleName(), localItem.getName());
443                         return null;
444                     }
445                     logger.trace("Dynamo item {} converted to historic item: {}", localItem, historicItem);
446                     return historicItem;
447                 }).filter(value -> value != null).collect(Collectors.toList());
448                 logger.debug("Query completed in {} ms. Filter was {}",
449                         Duration.between(start, Instant.now()).toMillis(), filterDescription);
450                 return results;
451             } catch (InterruptedException e) {
452                 logger.warn("Query interrupted. Filter was {}", filterDescription);
453                 return Collections.emptyList();
454             } catch (ExecutionException e) {
455                 Throwable cause = e.getCause();
456                 if (cause instanceof ResourceNotFoundException) {
457                     logger.trace("Query failed since the DynamoDB table '{}' does not exist. Filter was {}", tableName,
458                             filterDescription);
459                 } else if (logger.isTraceEnabled()) {
460                     logger.trace("Query failed. Filter was {}", filterDescription, e);
461                 } else {
462                     logger.warn("Query failed {} {}. Filter was {}",
463                             cause == null ? e.getClass().getSimpleName() : cause.getClass().getSimpleName(),
464                             cause == null ? e.getMessage() : cause.getMessage(), filterDescription);
465                 }
466                 return Collections.emptyList();
467             }
468         } catch (Exception e) {
469             logger.error("Unexpected error with query having filter {}: {} {}. Returning empty query results.",
470                     filterDescription, e.getClass().getSimpleName(), e.getMessage());
471             return Collections.emptyList();
472         }
473     }
474
475     /**
476      * Retrieves the item for the given name from the item registry
477      *
478      * @param itemName
479      * @return item with the given name, or null if no such item exists in item registry.
480      */
481     private @Nullable Item getItemFromRegistry(String itemName) {
482         try {
483             return itemRegistry.getItem(itemName);
484         } catch (ItemNotFoundException e1) {
485             return null;
486         }
487     }
488
489     @Override
490     public List<PersistenceStrategy> getDefaultStrategies() {
491         return List.of(PersistenceStrategy.Globals.RESTORE, PersistenceStrategy.Globals.CHANGE);
492     }
493
494     @Override
495     public void store(Item item) {
496         store(item, null);
497     }
498
499     @Override
500     public void store(Item item, @Nullable String alias) {
501         // Timestamp and capture state immediately as rest of the store is asynchronous (state might change in between)
502         ZonedDateTime time = ZonedDateTime.now();
503
504         logIfManyQueuedTasks();
505         if (!(item instanceof GenericItem)) {
506             return;
507         }
508         if (item.getState() instanceof UnDefType) {
509             logger.debug("Undefined item state received. Not storing item {}.", item.getName());
510             return;
511         }
512         if (!isReadyToStore()) {
513             logger.warn("Not ready to store (config error?), not storing item {}.", item.getName());
514             return;
515         }
516         // Get Item describing the real type of data
517         // With non-group items this is same as the argument item. With Group items, this is item describing the type of
518         // state stored in the group.
519         final Item itemTemplate;
520         try {
521             itemTemplate = getEffectiveItem(item);
522         } catch (IllegalStateException e) {
523             // Exception is raised when underlying item type cannot be determined with Group item
524             // Logged already
525             return;
526         }
527
528         String effectiveName = (alias != null) ? alias : item.getName();
529
530         // We do not want to rely item.state since async context below can execute much later.
531         // We 'copy' the item for local use. copyItem also normalizes the unit with NumberItems.
532         final GenericItem copiedItem = copyItem(itemTemplate, item, effectiveName, null, unitProvider);
533
534         resolveTableSchema().thenAcceptAsync(resolved -> {
535             if (!resolved) {
536                 logger.warn("Table schema not resolved, not storing item {}.", copiedItem.getName());
537                 return;
538             }
539
540             DynamoDbEnhancedAsyncClient localClient = client;
541             DynamoDbAsyncClient localLowlevelClient = lowLevelClient;
542             DynamoDBConfig localConfig = dbConfig;
543             DynamoDBTableNameResolver localTableNameResolver = tableNameResolver;
544             if (!isProperlyConfigured || localClient == null || localLowlevelClient == null || localConfig == null
545                     || localTableNameResolver == null) {
546                 logger.warn("Not ready to store (config error?), not storing item {}.", item.getName());
547                 return;
548             }
549
550             Integer expireDays = localConfig.getExpireDays();
551
552             final DynamoDBItem<?> dto;
553             switch (localTableNameResolver.getTableSchema()) {
554                 case NEW:
555                     dto = AbstractDynamoDBItem.fromStateNew(copiedItem, time, expireDays);
556                     break;
557                 case LEGACY:
558                     dto = AbstractDynamoDBItem.fromStateLegacy(copiedItem, time);
559                     break;
560                 default:
561                     throw new IllegalStateException("Unexpected. Bug");
562             }
563             logger.trace("store() called with item {} {} '{}', which was converted to DTO {}",
564                     copiedItem.getClass().getSimpleName(), effectiveName, copiedItem.getState(), dto);
565             dto.accept(new DynamoDBItemVisitor<TableCreatingPutItem<? extends DynamoDBItem<?>>>() {
566
567                 @Override
568                 public TableCreatingPutItem<? extends DynamoDBItem<?>> visit(
569                         DynamoDBBigDecimalItem dynamoBigDecimalItem) {
570                     return new TableCreatingPutItem<DynamoDBBigDecimalItem>(DynamoDBPersistenceService.this,
571                             dynamoBigDecimalItem, getTable(DynamoDBBigDecimalItem.class));
572                 }
573
574                 @Override
575                 public TableCreatingPutItem<? extends DynamoDBItem<?>> visit(DynamoDBStringItem dynamoStringItem) {
576                     return new TableCreatingPutItem<DynamoDBStringItem>(DynamoDBPersistenceService.this,
577                             dynamoStringItem, getTable(DynamoDBStringItem.class));
578                 }
579             }).putItemAsync();
580         }, executor).exceptionally(e -> {
581             logger.error("Unexcepted error", e);
582             return null;
583         });
584     }
585
586     private Item getEffectiveItem(Item item) {
587         final Item effectiveItem;
588         if (item instanceof GroupItem) {
589             Item baseItem = ((GroupItem) item).getBaseItem();
590             if (baseItem == null) {
591                 // if GroupItem:<ItemType> is not defined in
592                 // *.items using StringType
593                 logger.debug(
594                         "Cannot detect ItemType for {} because the GroupItems' base type isn't set in *.items File.",
595                         item.getName());
596                 Iterator<Item> firstGroupMemberItem = ((GroupItem) item).getMembers().iterator();
597                 if (firstGroupMemberItem.hasNext()) {
598                     effectiveItem = firstGroupMemberItem.next();
599                 } else {
600                     throw new IllegalStateException("GroupItem " + item.getName()
601                             + " does not have children nor base item set, cannot determine underlying item type. Aborting!");
602                 }
603             } else {
604                 effectiveItem = baseItem;
605             }
606         } else {
607             effectiveItem = item;
608         }
609         return effectiveItem;
610     }
611
612     /**
613      * Copy item and optionally override name and state
614      *
615      * State is normalized to source item's unit with Quantity NumberItems and QuantityTypes
616      *
617      * @param itemTemplate 'template item' to be used to construct the new copy. It is also used to determine UoM unit
618      *            and get GenericItem.type
619      * @param item item that is used to acquire name and state
620      * @param nameOverride name override for the resulting copy
621      * @param stateOverride state override for the resulting copy
622      * @param unitProvider the unit provider for number with dimension
623      * @throws IllegalArgumentException when state is QuantityType and not compatible with item
624      */
625     static GenericItem copyItem(Item itemTemplate, Item item, @Nullable String nameOverride,
626             @Nullable State stateOverride, UnitProvider unitProvider) {
627         final GenericItem copiedItem;
628         try {
629             if (itemTemplate instanceof NumberItem) {
630                 copiedItem = (GenericItem) itemTemplate.getClass()
631                         .getDeclaredConstructor(String.class, String.class, UnitProvider.class)
632                         .newInstance(itemTemplate.getType(), nameOverride == null ? item.getName() : nameOverride,
633                                 unitProvider);
634             } else {
635                 copiedItem = (GenericItem) itemTemplate.getClass().getDeclaredConstructor(String.class)
636                         .newInstance(nameOverride == null ? item.getName() : nameOverride);
637             }
638
639         } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException
640                 | NoSuchMethodException | SecurityException e) {
641             throw new IllegalArgumentException(item.toString(), e);
642         }
643         State state = stateOverride == null ? item.getState() : stateOverride;
644         if (state instanceof QuantityType<?> && itemTemplate instanceof NumberItem) {
645             Unit<?> itemUnit = ((NumberItem) itemTemplate).getUnit();
646             if (itemUnit != null) {
647                 State convertedState = ((QuantityType<?>) state).toUnit(itemUnit);
648                 if (convertedState == null) {
649                     logger.error("Unexpected unit conversion failure: {} to item unit {}", state, itemUnit);
650                     throw new IllegalArgumentException(
651                             String.format("Unexpected unit conversion failure: %s to item unit %s", state, itemUnit));
652                 }
653                 state = convertedState;
654             }
655         }
656         copiedItem.setState(state);
657         return copiedItem;
658     }
659
660     private void logIfManyQueuedTasks() {
661         if (executor instanceof ThreadPoolExecutor) {
662             ThreadPoolExecutor localExecutor = (ThreadPoolExecutor) executor;
663             if (localExecutor.getQueue().size() >= 5) {
664                 logger.trace("executor queue size: {}, remaining space {}. Active threads {}",
665                         localExecutor.getQueue().size(), localExecutor.getQueue().remainingCapacity(),
666                         localExecutor.getActiveCount());
667             } else if (localExecutor.getQueue().size() >= 50) {
668                 logger.warn(
669                         "Many ({}) tasks queued in executor! This might be sign of bad design or bug in the addon code.",
670                         localExecutor.getQueue().size());
671             }
672         }
673     }
674
675     private String filterToString(FilterCriteria filter) {
676         return String.format(
677                 "FilterCriteria@%s(item=%s, pageNumber=%d, pageSize=%d, time=[%s, %s, %s], state=[%s, %s of %s] )",
678                 System.identityHashCode(filter), filter.getItemName(), filter.getPageNumber(), filter.getPageSize(),
679                 filter.getBeginDate(), filter.getEndDate(), filter.getOrdering(), filter.getOperator(),
680                 filter.getState(), filter.getState() == null ? "null" : filter.getState().getClass().getSimpleName());
681     }
682 }