* Add some utility classes that will be used by other bluetooth bindings.
* Add handle field to BluetoothDescriptor
Signed-off-by: Connor Petty <mistercpp2000+gitsignoff@gmail.com>
for (BluetoothGattDescriptor dBusBlueZDescriptor : dBusBlueZCharacteristic.getGattDescriptors()) {
BluetoothDescriptor descriptor = new BluetoothDescriptor(characteristic,
- UUID.fromString(dBusBlueZDescriptor.getUuid()));
+ UUID.fromString(dBusBlueZDescriptor.getUuid()), 0);
characteristic.addDescriptor(descriptor);
}
service.addCharacteristic(characteristic);
*/
package org.openhab.binding.bluetooth.bluez.internal;
-import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.freedesktop.dbus.exceptions.DBusException;
+import org.openhab.binding.bluetooth.util.RetryException;
+import org.openhab.binding.bluetooth.util.RetryFuture;
import org.openhab.core.common.ThreadPoolManager;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
public void initialize() {
logger.debug("initializing DeviceManagerFactory");
- var stage1 = this.deviceManagerFuture = callAsync(() -> {
+ var stage1 = this.deviceManagerFuture = RetryFuture.callWithRetry(() -> {
try {
// if this is the first call to the library, this call
// should throw an exception (that we are catching)
}
}, scheduler);
- stage1.thenCompose(devManager -> {
+ this.deviceManagerWrapperFuture = stage1.thenCompose(devManager -> {
// lambdas can't modify outside variables due to scoping, so instead we use an AtomicInteger.
AtomicInteger tryCount = new AtomicInteger();
- // We need to set deviceManagerWrapperFuture here since we want to be able to cancel the underlying
- // AsyncCompletableFuture instance
- return this.deviceManagerWrapperFuture = callAsync(() -> {
+ return RetryFuture.callWithRetry(() -> {
int count = tryCount.incrementAndGet();
try {
logger.debug("Registering property handler attempt: {}", count);
}
this.deviceManagerWrapperFuture = null;
}
-
- private static <T> CompletableFuture<T> callAsync(Callable<T> callable, ScheduledExecutorService scheduler) {
- return new AsyncCompletableFuture<>(callable, scheduler);
- }
-
- // this is a utility class that allows use of Callable with CompletableFutures in a way such that the
- // async future is cancellable thru this CompletableFuture instance.
- private static class AsyncCompletableFuture<T> extends CompletableFuture<T> implements Runnable {
-
- private final Callable<T> callable;
- private final ScheduledExecutorService scheduler;
- private final Object futureLock = new Object();
- private Future<?> future;
-
- public AsyncCompletableFuture(Callable<T> callable, ScheduledExecutorService scheduler) {
- this.callable = callable;
- this.scheduler = scheduler;
- future = scheduler.submit(this);
- }
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- synchronized (futureLock) {
- future.cancel(mayInterruptIfRunning);
- }
- return super.cancel(mayInterruptIfRunning);
- }
-
- @Override
- public void run() {
- try {
- complete(callable.call());
- } catch (RetryException e) {
- synchronized (futureLock) {
- if (!future.isCancelled()) {
- future = scheduler.schedule(this, e.delay, e.unit);
- }
- }
- } catch (Exception e) {
- completeExceptionally(e);
- }
- }
- }
-
- // this is a special exception to indicate to a AsyncCompletableFuture that the task needs to be retried.
- private static class RetryException extends Exception {
-
- private static final long serialVersionUID = 8512275408512109328L;
- private long delay;
- private TimeUnit unit;
-
- public RetryException(long delay, TimeUnit unit) {
- this.delay = delay;
- this.unit = unit;
- }
- }
}
public static final long BLUETOOTH_BASE_UUID = 0x800000805f9b34fbL;
+ public static UUID createBluetoothUUID(long uuid16) {
+ return new UUID((uuid16 << 32) | 0x1000, BluetoothBindingConstants.BLUETOOTH_BASE_UUID);
+ }
+
// Bluetooth profile UUID definitions
- public static final UUID PROFILE_GATT = UUID.fromString("00001801-0000-1000-8000-00805f9b34fb");
- public static final UUID PROFILE_A2DP_SOURCE = UUID.fromString("0000110a-0000-1000-8000-00805f9b34fb");
- public static final UUID PROFILE_A2DP_SINK = UUID.fromString("0000110b-0000-1000-8000-00805f9b34fb");
- public static final UUID PROFILE_A2DP = UUID.fromString("0000110d-0000-1000-8000-00805f9b34fb");
- public static final UUID PROFILE_AVRCP_REMOTE = UUID.fromString("0000110c-0000-1000-8000-00805f9b34fb");
- public static final UUID PROFILE_CORDLESS_TELEPHONE = UUID.fromString("00001109-0000-1000-8000-00805f9b34fb");
- public static final UUID PROFILE_DID_PNPINFO = UUID.fromString("00001200-0000-1000-8000-00805f9b34fb");
- public static final UUID PROFILE_HEADSET = UUID.fromString("00001108-0000-1000-8000-00805f9b34fb");
- public static final UUID PROFILE_HFP = UUID.fromString("0000111e-0000-1000-8000-00805f9b34fb");
- public static final UUID PROFILE_HFP_AUDIOGATEWAY = UUID.fromString("0000111f-0000-1000-8000-00805f9b34fb");
+ public static final UUID PROFILE_GATT = createBluetoothUUID(0x1801);
+ public static final UUID PROFILE_A2DP_SOURCE = createBluetoothUUID(0x110a);
+ public static final UUID PROFILE_A2DP_SINK = createBluetoothUUID(0x110b);
+ public static final UUID PROFILE_A2DP = createBluetoothUUID(0x110d);
+ public static final UUID PROFILE_AVRCP_REMOTE = createBluetoothUUID(0x110c);
+ public static final UUID PROFILE_CORDLESS_TELEPHONE = createBluetoothUUID(0x1109);
+ public static final UUID PROFILE_DID_PNPINFO = createBluetoothUUID(0x1200);
+ public static final UUID PROFILE_HEADSET = createBluetoothUUID(0x1108);
+ public static final UUID PROFILE_HFP = createBluetoothUUID(0x111e);
+ public static final UUID PROFILE_HFP_AUDIOGATEWAY = createBluetoothUUID(0x111f);
+
+ public static final UUID ATTR_CHARACTERISTIC_DECLARATION = createBluetoothUUID(0x2803);
}
private UUID uuid;
private GattCharacteristic(long key) {
- this.uuid = new UUID((key << 32) | 0x1000, BluetoothBindingConstants.BLUETOOTH_BASE_UUID);
+ this.uuid = BluetoothBindingConstants.createBluetoothUUID(key);
}
private static void initMapping() {
protected final BluetoothCharacteristic characteristic;
protected final UUID uuid;
+ protected final int handle;
protected byte[] value;
/**
* @param characteristic the characteristic that this class describes
* @param uuid the uuid of the descriptor
*/
- public BluetoothDescriptor(BluetoothCharacteristic characteristic, UUID uuid) {
+ public BluetoothDescriptor(BluetoothCharacteristic characteristic, UUID uuid, int handle) {
this.characteristic = characteristic;
this.uuid = uuid;
+ this.handle = handle;
}
/**
return uuid;
}
+ /**
+ * Returns the handle for this descriptor
+ *
+ * @return the handle for the descriptor
+ */
+ public int getHandle() {
+ return handle;
+ }
+
/**
* Returns the stored value for this descriptor. It doesn't read remote data.
*
private final UUID uuid;
private GattDescriptor(long key) {
- this.uuid = new UUID((key << 32) | 0x1000, BluetoothBindingConstants.BLUETOOTH_BASE_UUID);
+ this.uuid = BluetoothBindingConstants.createBluetoothUUID(key);
}
private static void initMapping() {
private UUID uuid;
private GattService(long key) {
- this.uuid = new UUID((key << 32) | 0x1000, BluetoothBindingConstants.BLUETOOTH_BASE_UUID);
+ this.uuid = BluetoothBindingConstants.createBluetoothUUID(key);
}
private static void initMapping() {
--- /dev/null
+/**
+ * Copyright (c) 2010-2020 Contributors to the openHAB project
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.openhab.binding.bluetooth.util;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.eclipse.jdt.annotation.Nullable;
+
+/**
+ * The {@code HeritableFuture} class extends {@link CompletableFuture} and adds the ability
+ * to cancel upstream CompletableFuture tasks. Normally when a CompletableFuture
+ * is cancelled only dependent futures cancel. This class will also cancel the parent
+ * HeritableFuture instances as well. All of the {@code CompletionStage} methods will
+ * return HeritableFuture children and thus by only maintaining a reference to the final future
+ * in the task chain it would be possible to cancel the entire chain by calling {@code cancel}.
+ * <p>
+ * Due to child futures now having a link to their parent futures, it is no longer possible
+ * for HeritableFuture to be garbage collected as upstream futures complete. It is highly
+ * advisable to only use a HeritableFuture for defining finite (preferably small) task trees. Do not use
+ * HeritableFuture in situations where you would endlessly append new tasks otherwise you will eventually
+ * cause an OutOfMemoryError.
+ *
+ * @author Connor Petty - Initial contribution
+ *
+ */
+@NonNullByDefault
+public class HeritableFuture<T> extends CompletableFuture<T> {
+
+ protected final Object futureLock = new Object();
+ protected @Nullable Future<?> parentFuture;
+
+ public HeritableFuture() {
+ }
+
+ public HeritableFuture(Future<?> parent) {
+ this.parentFuture = parent;
+ }
+
+ /**
+ *
+ * {@inheritDoc}
+ *
+ * @implSpec
+ * This implementation returns a new HeritableFuture instance that uses
+ * the current instance as a parent. Cancellation of the child will result in
+ * cancellation of the parent.
+ */
+ @Override
+ public <U> CompletableFuture<U> newIncompleteFuture() {
+ return new HeritableFuture<>(this);
+ }
+
+ protected void setParentFuture(Supplier<@Nullable Future<?>> futureSupplier) {
+ synchronized (futureLock) {
+ var future = futureSupplier.get();
+ if (future != this) {
+ if (isCancelled() && future != null) {
+ future.cancel(true);
+ } else {
+ parentFuture = future;
+ }
+ }
+ }
+ }
+
+ /**
+ *
+ * {@inheritDoc}
+ *
+ * @implSpec
+ * This implementation cancels this future first, then cancels the parent future.
+ */
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ if (completeExceptionally(new CancellationException())) {
+ synchronized (futureLock) {
+ var future = parentFuture;
+ parentFuture = null;
+ if (future != null) {
+ future.cancel(mayInterruptIfRunning);
+ }
+ }
+ return true;
+ }
+ return isCancelled();
+ }
+
+ /**
+ *
+ * {@inheritDoc}
+ *
+ * @implSpec
+ * This implementation will treat the future returned by the function as a parent future.
+ */
+ @Override
+ @NonNullByDefault({}) // the generics here don't play well with the null checker
+ public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {
+ return new ComposeFunctionWrapper<>(fn, false, null).returnedFuture;
+ }
+
+ /**
+ *
+ * {@inheritDoc}
+ *
+ * @implSpec
+ * This implementation will treat the future returned by the function as a parent future.
+ */
+ @Override
+ @NonNullByDefault({}) // the generics here don't play well with the null checker
+ public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) {
+ return new ComposeFunctionWrapper<>(fn, true, null).returnedFuture;
+ }
+
+ /**
+ *
+ * {@inheritDoc}
+ *
+ * @implSpec
+ * This implementation will treat the future returned by the function as a parent future.
+ */
+ @Override
+ @NonNullByDefault({}) // the generics here don't play well with the null checker
+ public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,
+ Executor executor) {
+ return new ComposeFunctionWrapper<>(fn, true, executor).returnedFuture;
+ }
+
+ /**
+ * This class is responsible for wrapping the supplied compose function.
+ * The instant the function returns the next CompletionStage, the parentFuture of the downstream HeritableFuture
+ * will be reassigned to the completion stage. This way cancellations of
+ * downstream futures will be able to reach the future returned by the supplied function.
+ *
+ * Most of the complexity going on in this class is due to the fact that the apply function might be
+ * called while calling `super.thenCompose`. This would happen if the current future is already complete
+ * since the next stage would be started immediately either on the current thread or asynchronously.
+ *
+ * @param <U> the type to be returned by the composed future
+ */
+ private class ComposeFunctionWrapper<U> implements Function<T, CompletionStage<U>> {
+
+ private final Object fieldsLock = new Object();
+ private final Function<? super T, ? extends CompletionStage<U>> fn;
+ private @Nullable HeritableFuture<U> composedFuture;
+ private @Nullable CompletionStage<U> innerStage;
+ // The final composed future to be used by users of this wrapper class
+ final HeritableFuture<U> returnedFuture;
+
+ public ComposeFunctionWrapper(Function<? super T, ? extends CompletionStage<U>> fn, boolean async,
+ @Nullable Executor executor) {
+ this.fn = fn;
+
+ var f = (HeritableFuture<U>) thenCompose(async, executor);
+ synchronized (fieldsLock) {
+ this.composedFuture = f;
+ var stage = innerStage;
+ if (stage != null) {
+ // getting here means that the `apply` function was run before `composedFuture` was initialized.
+ f.setParentFuture(stage::toCompletableFuture);
+ }
+ }
+ this.returnedFuture = f;
+ }
+
+ private CompletableFuture<U> thenCompose(boolean async, @Nullable Executor executor) {
+ if (!async) {
+ return HeritableFuture.super.thenCompose(this);
+ }
+ if (executor == null) {
+ return HeritableFuture.super.thenComposeAsync(this);
+ }
+ return HeritableFuture.super.thenComposeAsync(this, executor);
+ }
+
+ @Override
+ public CompletionStage<U> apply(T t) {
+ CompletionStage<U> stage = fn.apply(t);
+ synchronized (fieldsLock) {
+ var f = composedFuture;
+ if (f == null) {
+ // We got here before the wrapper finished initializing, so that
+ // means that the enclosing future was already complete at the time `super.thenCompose` was called.
+ // In which case the best we can do is save this stage so that the constructor can finish the job.
+ innerStage = stage;
+ } else {
+ f.setParentFuture(stage::toCompletableFuture);
+ }
+ }
+ return stage;
+ }
+ }
+}
--- /dev/null
+/**
+ * Copyright (c) 2010-2020 Contributors to the openHAB project
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.openhab.binding.bluetooth.util;
+
+import java.util.concurrent.TimeUnit;
+
+import org.eclipse.jdt.annotation.NonNullByDefault;
+
+/**
+ * This is a special exception that can be thrown by Callable instances
+ * used by a RetryFuture.
+ *
+ * @author Connor Petty - Initial contribution
+ *
+ */
+@NonNullByDefault
+public class RetryException extends Exception {
+
+ private static final long serialVersionUID = 8512275408512109328L;
+ final long delay;
+ final TimeUnit unit;
+
+ public RetryException(long delay, TimeUnit unit) {
+ this.delay = delay;
+ this.unit = unit;
+ }
+}
--- /dev/null
+/**
+ * Copyright (c) 2010-2020 Contributors to the openHAB project
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.openhab.binding.bluetooth.util;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.eclipse.jdt.annotation.NonNullByDefault;
+
+/**
+ * This is a utility class that allows adding {@link CompletableFuture} capabilities to a {@link Callable}.
+ * The provided callable will be executed asynchronously and the result will be used
+ * to complete the {@code RetryFuture} instance. As per its namesake, the RetryFuture allows
+ * the callable to reschedule itself by throwing a {@link RetryException}. Any other exception
+ * will simply complete the RetryFuture exceptionally as per {@link CompletableFuture#completeExceptionally(Throwable)}.
+ *
+ * @author Connor Petty - Initial contribution
+ *
+ */
+@NonNullByDefault
+public class RetryFuture<T> extends HeritableFuture<T> {
+
+ private final ScheduledExecutorService scheduler;
+
+ public RetryFuture(Callable<T> callable, ScheduledExecutorService scheduler, long delay, TimeUnit unit) {
+ this.scheduler = scheduler;
+ setParentFuture(() -> scheduler.schedule(new CallableTask(callable), delay, unit));
+ }
+
+ public RetryFuture(Supplier<CompletableFuture<T>> supplier, ScheduledExecutorService scheduler, long delay,
+ TimeUnit unit) {
+ this.scheduler = scheduler;
+ setParentFuture(() -> scheduler.schedule(new ComposeTask(supplier), delay, unit));
+ }
+
+ @Override
+ public Executor defaultExecutor() {
+ return scheduler;
+ }
+
+ private class CallableTask implements Runnable {
+
+ private final Callable<T> callable;
+
+ public CallableTask(Callable<T> callable) {
+ this.callable = callable;
+ }
+
+ @Override
+ public void run() {
+ try {
+ complete(callable.call());
+ } catch (RetryException e) {
+ setParentFuture(() -> {
+ if (!isDone()) {
+ return scheduler.schedule(this, e.delay, e.unit);
+ }
+ return null;
+ });
+ } catch (Exception e) {
+ completeExceptionally(e);
+ }
+ }
+ }
+
+ private class ComposeTask implements Runnable {
+
+ private final Supplier<CompletableFuture<T>> supplier;
+
+ public ComposeTask(Supplier<CompletableFuture<T>> supplier) {
+ this.supplier = supplier;
+ }
+
+ @Override
+ public void run() {
+ CompletableFuture<T> future = supplier.get();
+ setParentFuture(() -> future);
+ future.whenComplete((result, th) -> {
+ if (th instanceof CompletionException) {
+ th = th.getCause();
+ }
+ if (th instanceof RetryException) {
+ RetryException e = (RetryException) th;
+ setParentFuture(() -> {
+ if (!isDone()) {
+ return scheduler.schedule(this, e.delay, e.unit);
+ }
+ return null;
+ });
+ } else if (th != null) {
+ completeExceptionally(th);
+ } else {
+ complete(result);
+ }
+ });
+ }
+ }
+
+ /**
+ * This is a convinience method for calling {@code new RetryFuture<>(callable, scheduler)}
+ *
+ * @param <T> the result type of the callable task.
+ * @param callable the task to execute
+ * @param scheduler the scheduler to use
+ * @return a CompletableFuture that will return the result of the callable.
+ */
+ public static <T> CompletableFuture<T> callWithRetry(Callable<T> callable, ScheduledExecutorService scheduler) {
+ return new RetryFuture<>(callable, scheduler, 0, TimeUnit.NANOSECONDS);
+ }
+
+ public static <T> CompletableFuture<T> scheduleWithRetry(Callable<T> callable, ScheduledExecutorService scheduler,
+ long delay, TimeUnit unit) {
+ return new RetryFuture<>(callable, scheduler, delay, unit);
+ }
+
+ @SafeVarargs
+ public static <T> CompletableFuture<T> scheduleWithRetryForExceptions(Callable<T> callable,
+ ScheduledExecutorService scheduler, long initDelay, long retryDelay, TimeUnit unit,
+ Class<? extends Exception>... exceptions) {
+ Callable<T> task = () -> {
+ try {
+ return callable.call();
+ } catch (RetryException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ for (Class<? extends Exception> exClass : exceptions) {
+ if (exClass.isInstance(ex)) {
+ throw new RetryException(retryDelay, unit);
+ }
+ }
+ throw ex;
+ }
+ };
+ return new RetryFuture<>(task, scheduler, initDelay, unit);
+ }
+
+ public static <T> CompletableFuture<T> composeWithRetry(Supplier<CompletableFuture<T>> supplier,
+ ScheduledExecutorService scheduler) {
+ return new RetryFuture<>(supplier, scheduler, 0, TimeUnit.NANOSECONDS);
+ }
+
+ public static <T> CompletableFuture<T> composeWithRetry(Supplier<CompletableFuture<T>> supplier,
+ ScheduledExecutorService scheduler, long initDelay, TimeUnit unit) {
+ return new RetryFuture<>(supplier, scheduler, initDelay, unit);
+ }
+}
--- /dev/null
+/**
+ * Copyright (c) 2010-2020 Contributors to the openHAB project
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.openhab.binding.bluetooth.util;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.openhab.core.common.NamedThreadFactory;
+
+/**
+ * @author Connor Petty - Initial contribution
+ *
+ */
+class RetryFutureTest {
+
+ private ScheduledExecutorService scheduler;
+
+ @BeforeEach
+ public void init() {
+ ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1,
+ new NamedThreadFactory("RetryFutureTest", true));
+ scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+ scheduler.setRemoveOnCancelPolicy(true);
+ this.scheduler = scheduler;
+ }
+
+ @AfterEach
+ public void cleanup() {
+ scheduler.shutdownNow();
+ }
+
+ @Test
+ void callWithRetryNormal() throws InterruptedException {
+ Future<String> retryFuture = RetryFuture.callWithRetry(() -> "test", scheduler);
+ try {
+ assertEquals("test", retryFuture.get(100, TimeUnit.MILLISECONDS));
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ fail(e);
+ }
+ }
+
+ @Test
+ void callWithRetry1() throws InterruptedException {
+ AtomicInteger visitCount = new AtomicInteger();
+ Future<String> retryFuture = RetryFuture.callWithRetry(() -> {
+ if (visitCount.getAndIncrement() == 0) {
+ throw new RetryException(0, TimeUnit.SECONDS);
+ }
+ return "test";
+ }, scheduler);
+ try {
+ assertEquals("test", retryFuture.get(100, TimeUnit.MILLISECONDS));
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ fail(e);
+ }
+ }
+
+ @Test
+ void composeWithRetryNormal() throws InterruptedException {
+ CompletableFuture<?> composedFuture = new CompletableFuture<>();
+
+ Future<?> retryFuture = RetryFuture.composeWithRetry(() -> {
+ composedFuture.complete(null);
+ return composedFuture;
+ }, scheduler);
+
+ try {
+ retryFuture.get(100, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ fail(e);
+ }
+ assertTrue(composedFuture.isDone());
+ }
+
+ @Test
+ void composeWithRetryThrow() throws InterruptedException {
+ CompletableFuture<?> composedFuture = new CompletableFuture<>();
+
+ Future<?> retryFuture = RetryFuture.composeWithRetry(() -> {
+ composedFuture.completeExceptionally(new DummyException());
+ return composedFuture;
+ }, scheduler);
+
+ try {
+ retryFuture.get(100, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException | TimeoutException e) {
+ fail(e);
+ } catch (ExecutionException ex) {
+ assertTrue(ex.getCause() instanceof DummyException);
+ }
+ assertTrue(composedFuture.isDone());
+ }
+
+ @Test
+ void composeWithRetry1() throws InterruptedException {
+ AtomicInteger visitCount = new AtomicInteger();
+ CompletableFuture<String> composedFuture = new CompletableFuture<>();
+ Future<String> retryFuture = RetryFuture.composeWithRetry(() -> {
+ if (visitCount.getAndIncrement() == 0) {
+ return CompletableFuture.failedFuture(new RetryException(0, TimeUnit.SECONDS));
+ }
+ composedFuture.complete("test");
+ return composedFuture;
+ }, scheduler);
+
+ try {
+ assertEquals("test", retryFuture.get(100, TimeUnit.MILLISECONDS));
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ fail(e);
+ }
+ assertEquals(2, visitCount.get());
+ assertTrue(composedFuture.isDone());
+ }
+
+ @Test
+ void composeWithRetry1Cancel() throws InterruptedException {
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicInteger visitCount = new AtomicInteger();
+ CompletableFuture<String> composedFuture = new CompletableFuture<>();
+ Future<String> retryFuture = RetryFuture.composeWithRetry(() -> {
+ if (visitCount.getAndIncrement() == 0) {
+ return CompletableFuture.failedFuture(new RetryException(0, TimeUnit.SECONDS));
+ }
+ latch.countDown();
+ return composedFuture;
+ }, scheduler);
+
+ try {
+ if (!latch.await(100, TimeUnit.MILLISECONDS)) {
+ fail("Timeout while waiting for latch");
+ }
+ Thread.sleep(1);
+ retryFuture.cancel(false);
+
+ assertTrue(composedFuture.isCancelled());
+ } catch (InterruptedException e) {
+ fail(e);
+ }
+ assertEquals(2, visitCount.get());
+ assertTrue(composedFuture.isDone());
+ }
+
+ private static class DummyException extends Exception {
+
+ }
+}