2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.persistence.dynamodb.internal;
15 import java.time.Duration;
16 import java.time.Instant;
17 import java.util.concurrent.CompletableFuture;
18 import java.util.concurrent.CompletionException;
19 import java.util.concurrent.ExecutorService;
21 import org.eclipse.jdt.annotation.NonNullByDefault;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
25 import software.amazon.awssdk.core.internal.waiters.ResponseOrException;
26 import software.amazon.awssdk.enhanced.dynamodb.DynamoDbAsyncTable;
27 import software.amazon.awssdk.enhanced.dynamodb.model.CreateTableEnhancedRequest;
28 import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
29 import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
30 import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;
31 import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException;
32 import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
35 * PutItem request which creates table if needed.
37 * Designed such that competing PutItem requests should complete successfully, only one of them
38 * 'winning the race' and creating the table.
43 * . \ (ERR: ResourceNotFoundException) (1)
47 * .... \ (OK)....\ (ERR: ResourceInUseException) (2)
48 * ......|..................|
49 * ..... |..................|
50 * ..... |...........Wait for table to become active
51 * ..... |......................\
52 * ..... |......................| (OK)
53 * ..... |......................|
54 * ..... |......................PutItem
57 * ..... Wait for table to become active
63 * ....... Configure TTL (no-op with legacy schema)
70 * (1) Most likely table does not exist yet
71 * (2) Raised when Table created by someone else
73 * @author Sami Salonen - Initial contribution
77 public class TableCreatingPutItem<T extends DynamoDBItem<?>> {
78 private final Logger logger = LoggerFactory.getLogger(TableCreatingPutItem.class);
80 private final DynamoDBPersistenceService service;
82 private DynamoDbAsyncTable<T> table;
83 private CompletableFuture<Void> aggregateFuture = new CompletableFuture<Void>();
84 private Instant start = Instant.now();
85 private ExecutorService executor;
86 private DynamoDbAsyncClient lowLevelClient;
87 private DynamoDBConfig dbConfig;
88 private DynamoDBTableNameResolver tableNameResolver;
90 public TableCreatingPutItem(DynamoDBPersistenceService service, T dto, DynamoDbAsyncTable<T> table) {
91 this.service = service;
94 this.executor = this.service.getExecutor();
95 DynamoDbAsyncClient localLowLevelClient = this.service.getLowLevelClient();
96 DynamoDBConfig localDbConfig = this.service.getDbConfig();
97 DynamoDBTableNameResolver localTableNameResolver = this.service.getTableNameResolver();
98 if (localLowLevelClient == null || localDbConfig == null || localTableNameResolver == null) {
99 throw new IllegalStateException("Service is not ready");
101 lowLevelClient = localLowLevelClient;
102 dbConfig = localDbConfig;
103 tableNameResolver = localTableNameResolver;
106 public CompletableFuture<Void> putItemAsync() {
107 start = Instant.now();
108 return internalPutItemAsync(false, true);
111 private CompletableFuture<Void> internalPutItemAsync(boolean createTable, boolean recursionAllowed) {
113 // Try again, first creating the table
114 Instant tableCreationStart = Instant.now();
115 table.createTable(CreateTableEnhancedRequest.builder()
116 .provisionedThroughput(
117 ProvisionedThroughput.builder().readCapacityUnits(dbConfig.getReadCapacityUnits())
118 .writeCapacityUnits(dbConfig.getWriteCapacityUnits()).build())
120 .whenCompleteAsync((resultTableCreation, exceptionTableCreation) -> {
121 if (exceptionTableCreation == null) {
122 logger.trace("PutItem: Table created in {} ms. Proceeding to TTL creation.",
123 Duration.between(tableCreationStart, Instant.now()).toMillis());
125 // Table creation OK. Configure TTL
127 boolean legacy = tableNameResolver.getTableSchema() == ExpectedTableSchema.LEGACY;
128 waitForTableToBeActive().thenComposeAsync(_void -> {
130 // We have legacy table schema. TTL configuration is skipped
131 return CompletableFuture.completedFuture(null);
133 // We have the new table schema -> configure TTL
134 // for the newly created table
135 return lowLevelClient.updateTimeToLive(req -> req
136 .overrideConfiguration(this.service::overrideConfig)
137 .tableName(table.tableName()).timeToLiveSpecification(spec -> spec
138 .attributeName(DynamoDBItem.ATTRIBUTE_NAME_EXPIRY).enabled(true)));
142 // Table is ready and TTL configured (possibly with error)
144 .whenCompleteAsync((resultTTL, exceptionTTL) -> {
145 if (exceptionTTL == null) {
147 // TTL configuration OK, continue with PutItem
149 logger.trace("PutItem: TTL configured successfully");
150 internalPutItemAsync(false, false);
153 // TTL configuration failed, abort
155 logger.trace("PutItem: TTL configuration failed");
156 Throwable exceptionTTLCause = exceptionTTL.getCause();
157 aggregateFuture.completeExceptionally(
158 exceptionTTLCause == null ? exceptionTTL : exceptionTTLCause);
162 // Table creation failed. We give up and complete the aggregate
163 // future -- unless the error was ResourceInUseException, in which case wait for
164 // table to become active and try again
165 Throwable cause = exceptionTableCreation.getCause();
166 if (cause instanceof ResourceInUseException) {
168 "PutItem: table creation failed (will be retried) with {} {}. Perhaps tried to create table that already exists. Trying one more time without creating table.",
169 cause.getClass().getSimpleName(), cause.getMessage());
170 // Wait table to be active, then retry PutItem
171 waitForTableToBeActive().whenCompleteAsync((_tableWaitResponse, tableWaitException) -> {
172 if (tableWaitException != null) {
173 // error when waiting for table to become active
174 Throwable tableWaitExceptionCause = tableWaitException.getCause();
176 "PutItem: failed (final) with {} {} when waiting to become active. Aborting.",
177 tableWaitExceptionCause == null
178 ? tableWaitException.getClass().getSimpleName()
179 : tableWaitExceptionCause.getClass().getSimpleName(),
180 tableWaitExceptionCause == null ? tableWaitException.getMessage()
181 : tableWaitExceptionCause.getMessage());
182 aggregateFuture.completeExceptionally(
183 tableWaitExceptionCause == null ? tableWaitException
184 : tableWaitExceptionCause);
187 // table wait OK, retry PutItem
188 .thenRunAsync(() -> internalPutItemAsync(false, false), executor);
190 logger.warn("PutItem: failed (final) with {} {}. Aborting.",
191 cause == null ? exceptionTableCreation.getClass().getSimpleName()
192 : cause.getClass().getSimpleName(),
193 cause == null ? exceptionTableCreation.getMessage() : cause.getMessage());
194 aggregateFuture.completeExceptionally(cause == null ? exceptionTableCreation : cause);
200 // First try, optimistically assuming that table exists
201 table.putItem(dto).whenCompleteAsync((result, exception) -> {
202 if (exception == null) {
203 logger.trace("PutItem: DTO {} was successfully written in {} ms.", dto,
204 Duration.between(start, Instant.now()).toMillis());
205 aggregateFuture.complete(result);
207 // PutItem failed. We retry i failure was due to non-existing table. Retry is triggered by calling
208 // this method again with createTable=true)
209 // With other errors, we abort.
210 if (!(exception instanceof CompletionException)) {
211 logger.error("PutItem: Expecting only CompletionException, got {} {}. BUG",
212 exception.getClass().getName(), exception.getMessage());
213 aggregateFuture.completeExceptionally(new IllegalStateException("unexpected exception"));
215 Throwable cause = exception.getCause();
216 if (cause instanceof ResourceNotFoundException && recursionAllowed) {
218 "PutItem: Table '{}' was not present. Retrying, this time creating the table first",
220 internalPutItemAsync(true, true);
222 logger.warn("PutItem: failed (final) with {} {}. Aborting.",
223 cause == null ? exception.getClass().getSimpleName() : cause.getClass().getSimpleName(),
224 cause == null ? exception.getMessage() : cause.getMessage());
225 aggregateFuture.completeExceptionally(cause == null ? exception : cause);
230 return aggregateFuture;
233 private CompletableFuture<Void> waitForTableToBeActive() {
234 return lowLevelClient.waiter()
235 .waitUntilTableExists(
236 req -> req.tableName(table.tableName()).overrideConfiguration(this.service::overrideConfig))
237 .thenAcceptAsync(tableWaitResponse -> {
238 // if waiter fails, the future is completed exceptionally (not entering this step)
239 ResponseOrException<DescribeTableResponse> responseOrException = tableWaitResponse.matched();
240 logger.trace("PutItem: Table wait completed successfully with {} attempts: {}",
241 tableWaitResponse.attemptsExecuted(), toString(responseOrException));
245 private String toString(ResponseOrException<?> responseOrException) {
246 if (responseOrException.response().isPresent()) {
247 return String.format("response=%s", responseOrException.response().get());
248 } else if (responseOrException.exception().isPresent()) {
249 Throwable exception = responseOrException.exception().get();
250 return String.format("exception=%s %s", exception.getClass().getSimpleName(), exception.getMessage());
252 return String.format("<N/A>");