]> git.basschouten.com Git - openhab-addons.git/blob
2ae10db2cd76c398bece2179e98385f1b93f0733
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.persistence.dynamodb.internal;
14
15 import java.time.Duration;
16 import java.time.Instant;
17 import java.util.concurrent.CompletableFuture;
18 import java.util.concurrent.CompletionException;
19 import java.util.concurrent.ExecutorService;
20
21 import org.eclipse.jdt.annotation.NonNullByDefault;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25 import software.amazon.awssdk.core.internal.waiters.ResponseOrException;
26 import software.amazon.awssdk.enhanced.dynamodb.DynamoDbAsyncTable;
27 import software.amazon.awssdk.enhanced.dynamodb.model.CreateTableEnhancedRequest;
28 import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
29 import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
30 import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;
31 import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException;
32 import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
33
34 /**
35  * PutItem request which creates table if needed.
36  *
37  * Designed such that competing PutItem requests should complete successfully, only one of them
38  * 'winning the race' and creating the table.
39  *
40  *
41  * PutItem
42  * . |
43  * . \ (ERR: ResourceNotFoundException) (1)
44  * ....|
45  * ....CreateTable
46  * ....|.........\
47  * .... \ (OK)....\ (ERR: ResourceInUseException) (2)
48  * ......|..................|
49  * ..... |..................|
50  * ..... |...........Wait for table to become active
51  * ..... |......................\
52  * ..... |......................| (OK)
53  * ..... |......................|
54  * ..... |......................PutItem
55  * ..... |
56  * ..... |
57  * ..... Wait for table to become active
58  * ......|
59  * .......\
60  * ........| (OK)
61  * ........|
62  * ........\
63  * ....... Configure TTL (no-op with legacy schema)
64  * ..........|
65  * ...........\ (OK)
66  * ...........|
67  * ...........PutItem
68  *
69  *
70  * (1) Most likely table does not exist yet
71  * (2) Raised when Table created by someone else
72  *
73  * @author Sami Salonen - Initial contribution
74  *
75  */
76 @NonNullByDefault
77 public class TableCreatingPutItem<T extends DynamoDBItem<?>> {
78     private final Logger logger = LoggerFactory.getLogger(TableCreatingPutItem.class);
79
80     private final DynamoDBPersistenceService service;
81     private T dto;
82     private DynamoDbAsyncTable<T> table;
83     private CompletableFuture<Void> aggregateFuture = new CompletableFuture<Void>();
84     private Instant start = Instant.now();
85     private ExecutorService executor;
86     private DynamoDbAsyncClient lowLevelClient;
87     private DynamoDBConfig dbConfig;
88     private DynamoDBTableNameResolver tableNameResolver;
89
90     public TableCreatingPutItem(DynamoDBPersistenceService service, T dto, DynamoDbAsyncTable<T> table) {
91         this.service = service;
92         this.dto = dto;
93         this.table = table;
94         this.executor = this.service.getExecutor();
95         DynamoDbAsyncClient localLowLevelClient = this.service.getLowLevelClient();
96         DynamoDBConfig localDbConfig = this.service.getDbConfig();
97         DynamoDBTableNameResolver localTableNameResolver = this.service.getTableNameResolver();
98         if (localLowLevelClient == null || localDbConfig == null || localTableNameResolver == null) {
99             throw new IllegalStateException("Service is not ready");
100         }
101         lowLevelClient = localLowLevelClient;
102         dbConfig = localDbConfig;
103         tableNameResolver = localTableNameResolver;
104     }
105
106     public CompletableFuture<Void> putItemAsync() {
107         start = Instant.now();
108         return internalPutItemAsync(false, true);
109     }
110
111     private CompletableFuture<Void> internalPutItemAsync(boolean createTable, boolean recursionAllowed) {
112         if (createTable) {
113             // Try again, first creating the table
114             Instant tableCreationStart = Instant.now();
115             table.createTable(CreateTableEnhancedRequest.builder()
116                     .provisionedThroughput(
117                             ProvisionedThroughput.builder().readCapacityUnits(dbConfig.getReadCapacityUnits())
118                                     .writeCapacityUnits(dbConfig.getWriteCapacityUnits()).build())
119                     .build())//
120                     .whenCompleteAsync((resultTableCreation, exceptionTableCreation) -> {
121                         if (exceptionTableCreation == null) {
122                             logger.trace("PutItem: Table created in {} ms. Proceeding to TTL creation.",
123                                     Duration.between(tableCreationStart, Instant.now()).toMillis());
124                             //
125                             // Table creation OK. Configure TTL
126                             //
127                             boolean legacy = tableNameResolver.getTableSchema() == ExpectedTableSchema.LEGACY;
128                             waitForTableToBeActive().thenComposeAsync(_void -> {
129                                 if (legacy) {
130                                     // We have legacy table schema. TTL configuration is skipped
131                                     return CompletableFuture.completedFuture(null);
132                                 } else {
133                                     // We have the new table schema -> configure TTL
134                                     // for the newly created table
135                                     return lowLevelClient.updateTimeToLive(req -> req
136                                             .overrideConfiguration(this.service::overrideConfig)
137                                             .tableName(table.tableName()).timeToLiveSpecification(spec -> spec
138                                                     .attributeName(DynamoDBItem.ATTRIBUTE_NAME_EXPIRY).enabled(true)));
139                                 }
140                             }, executor)
141                                     //
142                                     // Table is ready and TTL configured (possibly with error)
143                                     //
144                                     .whenCompleteAsync((resultTTL, exceptionTTL) -> {
145                                         if (exceptionTTL == null) {
146                                             //
147                                             // TTL configuration OK, continue with PutItem
148                                             //
149                                             logger.trace("PutItem: TTL configured successfully");
150                                             internalPutItemAsync(false, false);
151                                         } else {
152                                             //
153                                             // TTL configuration failed, abort
154                                             //
155                                             logger.trace("PutItem: TTL configuration failed");
156                                             Throwable exceptionTTLCause = exceptionTTL.getCause();
157                                             aggregateFuture.completeExceptionally(
158                                                     exceptionTTLCause == null ? exceptionTTL : exceptionTTLCause);
159                                         }
160                                     }, executor);
161                         } else {
162                             // Table creation failed. We give up and complete the aggregate
163                             // future -- unless the error was ResourceInUseException, in which case wait for
164                             // table to become active and try again
165                             Throwable cause = exceptionTableCreation.getCause();
166                             if (cause instanceof ResourceInUseException) {
167                                 logger.trace(
168                                         "PutItem: table creation failed (will be retried) with {} {}. Perhaps tried to create table that already exists. Trying one more time without creating table.",
169                                         cause.getClass().getSimpleName(), cause.getMessage());
170                                 // Wait table to be active, then retry PutItem
171                                 waitForTableToBeActive().whenCompleteAsync((_tableWaitResponse, tableWaitException) -> {
172                                     if (tableWaitException != null) {
173                                         // error when waiting for table to become active
174                                         Throwable tableWaitExceptionCause = tableWaitException.getCause();
175                                         logger.warn(
176                                                 "PutItem: failed (final) with {} {} when waiting to become active. Aborting.",
177                                                 tableWaitExceptionCause == null
178                                                         ? tableWaitException.getClass().getSimpleName()
179                                                         : tableWaitExceptionCause.getClass().getSimpleName(),
180                                                 tableWaitExceptionCause == null ? tableWaitException.getMessage()
181                                                         : tableWaitExceptionCause.getMessage());
182                                         aggregateFuture.completeExceptionally(
183                                                 tableWaitExceptionCause == null ? tableWaitException
184                                                         : tableWaitExceptionCause);
185                                     }
186                                 }, executor)
187                                         // table wait OK, retry PutItem
188                                         .thenRunAsync(() -> internalPutItemAsync(false, false), executor);
189                             } else {
190                                 logger.warn("PutItem: failed (final) with {} {}. Aborting.",
191                                         cause == null ? exceptionTableCreation.getClass().getSimpleName()
192                                                 : cause.getClass().getSimpleName(),
193                                         cause == null ? exceptionTableCreation.getMessage() : cause.getMessage());
194                                 aggregateFuture.completeExceptionally(cause == null ? exceptionTableCreation : cause);
195                             }
196                         }
197                     }, executor);
198
199         } else {
200             // First try, optimistically assuming that table exists
201             table.putItem(dto).whenCompleteAsync((result, exception) -> {
202                 if (exception == null) {
203                     logger.trace("PutItem: DTO {} was successfully written in {} ms.", dto,
204                             Duration.between(start, Instant.now()).toMillis());
205                     aggregateFuture.complete(result);
206                 } else {
207                     // PutItem failed. We retry i failure was due to non-existing table. Retry is triggered by calling
208                     // this method again with createTable=true)
209                     // With other errors, we abort.
210                     if (!(exception instanceof CompletionException)) {
211                         logger.error("PutItem: Expecting only CompletionException, got {} {}. BUG",
212                                 exception.getClass().getName(), exception.getMessage());
213                         aggregateFuture.completeExceptionally(new IllegalStateException("unexpected exception"));
214                     }
215                     Throwable cause = exception.getCause();
216                     if (cause instanceof ResourceNotFoundException && recursionAllowed) {
217                         logger.trace(
218                                 "PutItem: Table '{}' was not present. Retrying, this time creating the table first",
219                                 table.tableName());
220                         internalPutItemAsync(true, true);
221                     } else {
222                         logger.warn("PutItem: failed (final) with {} {}. Aborting.",
223                                 cause == null ? exception.getClass().getSimpleName() : cause.getClass().getSimpleName(),
224                                 cause == null ? exception.getMessage() : cause.getMessage());
225                         aggregateFuture.completeExceptionally(cause == null ? exception : cause);
226                     }
227                 }
228             }, executor);
229         }
230         return aggregateFuture;
231     }
232
233     private CompletableFuture<Void> waitForTableToBeActive() {
234         return lowLevelClient.waiter()
235                 .waitUntilTableExists(
236                         req -> req.tableName(table.tableName()).overrideConfiguration(this.service::overrideConfig))
237                 .thenAcceptAsync(tableWaitResponse -> {
238                     // if waiter fails, the future is completed exceptionally (not entering this step)
239                     ResponseOrException<DescribeTableResponse> responseOrException = tableWaitResponse.matched();
240                     logger.trace("PutItem: Table wait completed successfully with {} attempts: {}",
241                             tableWaitResponse.attemptsExecuted(), toString(responseOrException));
242                 }, executor);
243     }
244
245     private String toString(ResponseOrException<?> responseOrException) {
246         if (responseOrException.response().isPresent()) {
247             return String.format("response=%s", responseOrException.response().get());
248         } else if (responseOrException.exception().isPresent()) {
249             Throwable exception = responseOrException.exception().get();
250             return String.format("exception=%s %s", exception.getClass().getSimpleName(), exception.getMessage());
251         } else {
252             return String.format("<N/A>");
253         }
254     }
255 }