]> git.basschouten.com Git - openhab-addons.git/commitdiff
[bluetooth] Add some utility classes (#9064)
authorConnor Petty <mistercpp2000@gmail.com>
Mon, 23 Nov 2020 10:34:39 +0000 (02:34 -0800)
committerGitHub <noreply@github.com>
Mon, 23 Nov 2020 10:34:39 +0000 (11:34 +0100)
* Add some utility classes that will be used by other bluetooth bindings.
* Add handle field to BluetoothDescriptor

Signed-off-by: Connor Petty <mistercpp2000+gitsignoff@gmail.com>
bundles/org.openhab.binding.bluetooth.bluez/src/main/java/org/openhab/binding/bluetooth/bluez/internal/BlueZBluetoothDevice.java
bundles/org.openhab.binding.bluetooth.bluez/src/main/java/org/openhab/binding/bluetooth/bluez/internal/DeviceManagerFactory.java
bundles/org.openhab.binding.bluetooth/src/main/java/org/openhab/binding/bluetooth/BluetoothBindingConstants.java
bundles/org.openhab.binding.bluetooth/src/main/java/org/openhab/binding/bluetooth/BluetoothCharacteristic.java
bundles/org.openhab.binding.bluetooth/src/main/java/org/openhab/binding/bluetooth/BluetoothDescriptor.java
bundles/org.openhab.binding.bluetooth/src/main/java/org/openhab/binding/bluetooth/BluetoothService.java
bundles/org.openhab.binding.bluetooth/src/main/java/org/openhab/binding/bluetooth/util/HeritableFuture.java [new file with mode: 0644]
bundles/org.openhab.binding.bluetooth/src/main/java/org/openhab/binding/bluetooth/util/RetryException.java [new file with mode: 0644]
bundles/org.openhab.binding.bluetooth/src/main/java/org/openhab/binding/bluetooth/util/RetryFuture.java [new file with mode: 0644]
bundles/org.openhab.binding.bluetooth/src/test/java/org/openhab/binding/bluetooth/util/RetryFutureTest.java [new file with mode: 0644]

index d11b1bb647270a45b76993cbe48f61abed065dc7..af80f79d2f12a77fa67eaf27e706a371fb7c7622 100644 (file)
@@ -396,7 +396,7 @@ public class BlueZBluetoothDevice extends BaseBluetoothDevice implements BlueZEv
 
                     for (BluetoothGattDescriptor dBusBlueZDescriptor : dBusBlueZCharacteristic.getGattDescriptors()) {
                         BluetoothDescriptor descriptor = new BluetoothDescriptor(characteristic,
-                                UUID.fromString(dBusBlueZDescriptor.getUuid()));
+                                UUID.fromString(dBusBlueZDescriptor.getUuid()), 0);
                         characteristic.addDescriptor(descriptor);
                     }
                     service.addCharacteristic(characteristic);
index 78ec6838bbcc3733b39e3f4cc43c67ed9d28c966..d32bf9f842efd8d66e5380a53e428bda96770b99 100644 (file)
@@ -12,9 +12,7 @@
  */
 package org.openhab.binding.bluetooth.bluez.internal;
 
-import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -22,6 +20,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.eclipse.jdt.annotation.Nullable;
 import org.freedesktop.dbus.exceptions.DBusException;
+import org.openhab.binding.bluetooth.util.RetryException;
+import org.openhab.binding.bluetooth.util.RetryFuture;
 import org.openhab.core.common.ThreadPoolManager;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
@@ -71,7 +71,7 @@ public class DeviceManagerFactory {
     public void initialize() {
         logger.debug("initializing DeviceManagerFactory");
 
-        var stage1 = this.deviceManagerFuture = callAsync(() -> {
+        var stage1 = this.deviceManagerFuture = RetryFuture.callWithRetry(() -> {
             try {
                 // if this is the first call to the library, this call
                 // should throw an exception (that we are catching)
@@ -83,12 +83,10 @@ public class DeviceManagerFactory {
             }
         }, scheduler);
 
-        stage1.thenCompose(devManager -> {
+        this.deviceManagerWrapperFuture = stage1.thenCompose(devManager -> {
             // lambdas can't modify outside variables due to scoping, so instead we use an AtomicInteger.
             AtomicInteger tryCount = new AtomicInteger();
-            // We need to set deviceManagerWrapperFuture here since we want to be able to cancel the underlying
-            // AsyncCompletableFuture instance
-            return this.deviceManagerWrapperFuture = callAsync(() -> {
+            return RetryFuture.callWithRetry(() -> {
                 int count = tryCount.incrementAndGet();
                 try {
                     logger.debug("Registering property handler attempt: {}", count);
@@ -127,60 +125,4 @@ public class DeviceManagerFactory {
         }
         this.deviceManagerWrapperFuture = null;
     }
-
-    private static <T> CompletableFuture<T> callAsync(Callable<T> callable, ScheduledExecutorService scheduler) {
-        return new AsyncCompletableFuture<>(callable, scheduler);
-    }
-
-    // this is a utility class that allows use of Callable with CompletableFutures in a way such that the
-    // async future is cancellable thru this CompletableFuture instance.
-    private static class AsyncCompletableFuture<T> extends CompletableFuture<T> implements Runnable {
-
-        private final Callable<T> callable;
-        private final ScheduledExecutorService scheduler;
-        private final Object futureLock = new Object();
-        private Future<?> future;
-
-        public AsyncCompletableFuture(Callable<T> callable, ScheduledExecutorService scheduler) {
-            this.callable = callable;
-            this.scheduler = scheduler;
-            future = scheduler.submit(this);
-        }
-
-        @Override
-        public boolean cancel(boolean mayInterruptIfRunning) {
-            synchronized (futureLock) {
-                future.cancel(mayInterruptIfRunning);
-            }
-            return super.cancel(mayInterruptIfRunning);
-        }
-
-        @Override
-        public void run() {
-            try {
-                complete(callable.call());
-            } catch (RetryException e) {
-                synchronized (futureLock) {
-                    if (!future.isCancelled()) {
-                        future = scheduler.schedule(this, e.delay, e.unit);
-                    }
-                }
-            } catch (Exception e) {
-                completeExceptionally(e);
-            }
-        }
-    }
-
-    // this is a special exception to indicate to a AsyncCompletableFuture that the task needs to be retried.
-    private static class RetryException extends Exception {
-
-        private static final long serialVersionUID = 8512275408512109328L;
-        private long delay;
-        private TimeUnit unit;
-
-        public RetryException(long delay, TimeUnit unit) {
-            this.delay = delay;
-            this.unit = unit;
-        }
-    }
 }
index a258601dffc1ab75eea4157dea6cba35cc11a2c3..5c7290d23b8b4bdd4ff986b834d5a6c7301cf861 100644 (file)
@@ -46,15 +46,21 @@ public class BluetoothBindingConstants {
 
     public static final long BLUETOOTH_BASE_UUID = 0x800000805f9b34fbL;
 
+    public static UUID createBluetoothUUID(long uuid16) {
+        return new UUID((uuid16 << 32) | 0x1000, BluetoothBindingConstants.BLUETOOTH_BASE_UUID);
+    }
+
     // Bluetooth profile UUID definitions
-    public static final UUID PROFILE_GATT = UUID.fromString("00001801-0000-1000-8000-00805f9b34fb");
-    public static final UUID PROFILE_A2DP_SOURCE = UUID.fromString("0000110a-0000-1000-8000-00805f9b34fb");
-    public static final UUID PROFILE_A2DP_SINK = UUID.fromString("0000110b-0000-1000-8000-00805f9b34fb");
-    public static final UUID PROFILE_A2DP = UUID.fromString("0000110d-0000-1000-8000-00805f9b34fb");
-    public static final UUID PROFILE_AVRCP_REMOTE = UUID.fromString("0000110c-0000-1000-8000-00805f9b34fb");
-    public static final UUID PROFILE_CORDLESS_TELEPHONE = UUID.fromString("00001109-0000-1000-8000-00805f9b34fb");
-    public static final UUID PROFILE_DID_PNPINFO = UUID.fromString("00001200-0000-1000-8000-00805f9b34fb");
-    public static final UUID PROFILE_HEADSET = UUID.fromString("00001108-0000-1000-8000-00805f9b34fb");
-    public static final UUID PROFILE_HFP = UUID.fromString("0000111e-0000-1000-8000-00805f9b34fb");
-    public static final UUID PROFILE_HFP_AUDIOGATEWAY = UUID.fromString("0000111f-0000-1000-8000-00805f9b34fb");
+    public static final UUID PROFILE_GATT = createBluetoothUUID(0x1801);
+    public static final UUID PROFILE_A2DP_SOURCE = createBluetoothUUID(0x110a);
+    public static final UUID PROFILE_A2DP_SINK = createBluetoothUUID(0x110b);
+    public static final UUID PROFILE_A2DP = createBluetoothUUID(0x110d);
+    public static final UUID PROFILE_AVRCP_REMOTE = createBluetoothUUID(0x110c);
+    public static final UUID PROFILE_CORDLESS_TELEPHONE = createBluetoothUUID(0x1109);
+    public static final UUID PROFILE_DID_PNPINFO = createBluetoothUUID(0x1200);
+    public static final UUID PROFILE_HEADSET = createBluetoothUUID(0x1108);
+    public static final UUID PROFILE_HFP = createBluetoothUUID(0x111e);
+    public static final UUID PROFILE_HFP_AUDIOGATEWAY = createBluetoothUUID(0x111f);
+
+    public static final UUID ATTR_CHARACTERISTIC_DECLARATION = createBluetoothUUID(0x2803);
 }
index 36d61def615dbe1765ffb074520cacae01807808..6c2551a141cfeecee1a4f3f8cf1c7e016b6b4675 100644 (file)
@@ -672,7 +672,7 @@ public class BluetoothCharacteristic {
         private UUID uuid;
 
         private GattCharacteristic(long key) {
-            this.uuid = new UUID((key << 32) | 0x1000, BluetoothBindingConstants.BLUETOOTH_BASE_UUID);
+            this.uuid = BluetoothBindingConstants.createBluetoothUUID(key);
         }
 
         private static void initMapping() {
index 8ac763c1d180afd6ea110e56e7cba743e4e8d934..33e25528a93e05bed3ec11fa6a278cb84c6216a5 100644 (file)
@@ -30,6 +30,7 @@ public class BluetoothDescriptor {
 
     protected final BluetoothCharacteristic characteristic;
     protected final UUID uuid;
+    protected final int handle;
     protected byte[] value;
 
     /**
@@ -38,9 +39,10 @@ public class BluetoothDescriptor {
      * @param characteristic the characteristic that this class describes
      * @param uuid the uuid of the descriptor
      */
-    public BluetoothDescriptor(BluetoothCharacteristic characteristic, UUID uuid) {
+    public BluetoothDescriptor(BluetoothCharacteristic characteristic, UUID uuid, int handle) {
         this.characteristic = characteristic;
         this.uuid = uuid;
+        this.handle = handle;
     }
 
     /**
@@ -70,6 +72,15 @@ public class BluetoothDescriptor {
         return uuid;
     }
 
+    /**
+     * Returns the handle for this descriptor
+     *
+     * @return the handle for the descriptor
+     */
+    public int getHandle() {
+        return handle;
+    }
+
     /**
      * Returns the stored value for this descriptor. It doesn't read remote data.
      *
@@ -111,7 +122,7 @@ public class BluetoothDescriptor {
         private final UUID uuid;
 
         private GattDescriptor(long key) {
-            this.uuid = new UUID((key << 32) | 0x1000, BluetoothBindingConstants.BLUETOOTH_BASE_UUID);
+            this.uuid = BluetoothBindingConstants.createBluetoothUUID(key);
         }
 
         private static void initMapping() {
index 82abaf15fe48dafda664b7331684ad6095c98628..4bc026e5a61e858f0dedee71cc56cea84641a7fb 100644 (file)
@@ -246,7 +246,7 @@ public class BluetoothService {
         private UUID uuid;
 
         private GattService(long key) {
-            this.uuid = new UUID((key << 32) | 0x1000, BluetoothBindingConstants.BLUETOOTH_BASE_UUID);
+            this.uuid = BluetoothBindingConstants.createBluetoothUUID(key);
         }
 
         private static void initMapping() {
diff --git a/bundles/org.openhab.binding.bluetooth/src/main/java/org/openhab/binding/bluetooth/util/HeritableFuture.java b/bundles/org.openhab.binding.bluetooth/src/main/java/org/openhab/binding/bluetooth/util/HeritableFuture.java
new file mode 100644 (file)
index 0000000..efc99e7
--- /dev/null
@@ -0,0 +1,209 @@
+/**
+ * Copyright (c) 2010-2020 Contributors to the openHAB project
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.openhab.binding.bluetooth.util;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.eclipse.jdt.annotation.Nullable;
+
+/**
+ * The {@code HeritableFuture} class extends {@link CompletableFuture} and adds the ability
+ * to cancel upstream CompletableFuture tasks. Normally when a CompletableFuture
+ * is cancelled only dependent futures cancel. This class will also cancel the parent
+ * HeritableFuture instances as well. All of the {@code CompletionStage} methods will
+ * return HeritableFuture children and thus by only maintaining a reference to the final future
+ * in the task chain it would be possible to cancel the entire chain by calling {@code cancel}.
+ * <p>
+ * Due to child futures now having a link to their parent futures, it is no longer possible
+ * for HeritableFuture to be garbage collected as upstream futures complete. It is highly
+ * advisable to only use a HeritableFuture for defining finite (preferably small) task trees. Do not use
+ * HeritableFuture in situations where you would endlessly append new tasks otherwise you will eventually
+ * cause an OutOfMemoryError.
+ *
+ * @author Connor Petty - Initial contribution
+ *
+ */
+@NonNullByDefault
+public class HeritableFuture<T> extends CompletableFuture<T> {
+
+    protected final Object futureLock = new Object();
+    protected @Nullable Future<?> parentFuture;
+
+    public HeritableFuture() {
+    }
+
+    public HeritableFuture(Future<?> parent) {
+        this.parentFuture = parent;
+    }
+
+    /**
+     *
+     * {@inheritDoc}
+     *
+     * @implSpec
+     *           This implementation returns a new HeritableFuture instance that uses
+     *           the current instance as a parent. Cancellation of the child will result in
+     *           cancellation of the parent.
+     */
+    @Override
+    public <U> CompletableFuture<U> newIncompleteFuture() {
+        return new HeritableFuture<>(this);
+    }
+
+    protected void setParentFuture(Supplier<@Nullable Future<?>> futureSupplier) {
+        synchronized (futureLock) {
+            var future = futureSupplier.get();
+            if (future != this) {
+                if (isCancelled() && future != null) {
+                    future.cancel(true);
+                } else {
+                    parentFuture = future;
+                }
+            }
+        }
+    }
+
+    /**
+     *
+     * {@inheritDoc}
+     *
+     * @implSpec
+     *           This implementation cancels this future first, then cancels the parent future.
+     */
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+        if (completeExceptionally(new CancellationException())) {
+            synchronized (futureLock) {
+                var future = parentFuture;
+                parentFuture = null;
+                if (future != null) {
+                    future.cancel(mayInterruptIfRunning);
+                }
+            }
+            return true;
+        }
+        return isCancelled();
+    }
+
+    /**
+     *
+     * {@inheritDoc}
+     *
+     * @implSpec
+     *           This implementation will treat the future returned by the function as a parent future.
+     */
+    @Override
+    @NonNullByDefault({}) // the generics here don't play well with the null checker
+    public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {
+        return new ComposeFunctionWrapper<>(fn, false, null).returnedFuture;
+    }
+
+    /**
+     *
+     * {@inheritDoc}
+     *
+     * @implSpec
+     *           This implementation will treat the future returned by the function as a parent future.
+     */
+    @Override
+    @NonNullByDefault({}) // the generics here don't play well with the null checker
+    public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) {
+        return new ComposeFunctionWrapper<>(fn, true, null).returnedFuture;
+    }
+
+    /**
+     *
+     * {@inheritDoc}
+     *
+     * @implSpec
+     *           This implementation will treat the future returned by the function as a parent future.
+     */
+    @Override
+    @NonNullByDefault({}) // the generics here don't play well with the null checker
+    public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,
+            Executor executor) {
+        return new ComposeFunctionWrapper<>(fn, true, executor).returnedFuture;
+    }
+
+    /**
+     * This class is responsible for wrapping the supplied compose function.
+     * The instant the function returns the next CompletionStage, the parentFuture of the downstream HeritableFuture
+     * will be reassigned to the completion stage. This way cancellations of
+     * downstream futures will be able to reach the future returned by the supplied function.
+     *
+     * Most of the complexity going on in this class is due to the fact that the apply function might be
+     * called while calling `super.thenCompose`. This would happen if the current future is already complete
+     * since the next stage would be started immediately either on the current thread or asynchronously.
+     *
+     * @param <U> the type to be returned by the composed future
+     */
+    private class ComposeFunctionWrapper<U> implements Function<T, CompletionStage<U>> {
+
+        private final Object fieldsLock = new Object();
+        private final Function<? super T, ? extends CompletionStage<U>> fn;
+        private @Nullable HeritableFuture<U> composedFuture;
+        private @Nullable CompletionStage<U> innerStage;
+        // The final composed future to be used by users of this wrapper class
+        final HeritableFuture<U> returnedFuture;
+
+        public ComposeFunctionWrapper(Function<? super T, ? extends CompletionStage<U>> fn, boolean async,
+                @Nullable Executor executor) {
+            this.fn = fn;
+
+            var f = (HeritableFuture<U>) thenCompose(async, executor);
+            synchronized (fieldsLock) {
+                this.composedFuture = f;
+                var stage = innerStage;
+                if (stage != null) {
+                    // getting here means that the `apply` function was run before `composedFuture` was initialized.
+                    f.setParentFuture(stage::toCompletableFuture);
+                }
+            }
+            this.returnedFuture = f;
+        }
+
+        private CompletableFuture<U> thenCompose(boolean async, @Nullable Executor executor) {
+            if (!async) {
+                return HeritableFuture.super.thenCompose(this);
+            }
+            if (executor == null) {
+                return HeritableFuture.super.thenComposeAsync(this);
+            }
+            return HeritableFuture.super.thenComposeAsync(this, executor);
+        }
+
+        @Override
+        public CompletionStage<U> apply(T t) {
+            CompletionStage<U> stage = fn.apply(t);
+            synchronized (fieldsLock) {
+                var f = composedFuture;
+                if (f == null) {
+                    // We got here before the wrapper finished initializing, so that
+                    // means that the enclosing future was already complete at the time `super.thenCompose` was called.
+                    // In which case the best we can do is save this stage so that the constructor can finish the job.
+                    innerStage = stage;
+                } else {
+                    f.setParentFuture(stage::toCompletableFuture);
+                }
+            }
+            return stage;
+        }
+    }
+}
diff --git a/bundles/org.openhab.binding.bluetooth/src/main/java/org/openhab/binding/bluetooth/util/RetryException.java b/bundles/org.openhab.binding.bluetooth/src/main/java/org/openhab/binding/bluetooth/util/RetryException.java
new file mode 100644 (file)
index 0000000..639165f
--- /dev/null
@@ -0,0 +1,37 @@
+/**
+ * Copyright (c) 2010-2020 Contributors to the openHAB project
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.openhab.binding.bluetooth.util;
+
+import java.util.concurrent.TimeUnit;
+
+import org.eclipse.jdt.annotation.NonNullByDefault;
+
+/**
+ * This is a special exception that can be thrown by Callable instances
+ * used by a RetryFuture.
+ *
+ * @author Connor Petty - Initial contribution
+ *
+ */
+@NonNullByDefault
+public class RetryException extends Exception {
+
+    private static final long serialVersionUID = 8512275408512109328L;
+    final long delay;
+    final TimeUnit unit;
+
+    public RetryException(long delay, TimeUnit unit) {
+        this.delay = delay;
+        this.unit = unit;
+    }
+}
diff --git a/bundles/org.openhab.binding.bluetooth/src/main/java/org/openhab/binding/bluetooth/util/RetryFuture.java b/bundles/org.openhab.binding.bluetooth/src/main/java/org/openhab/binding/bluetooth/util/RetryFuture.java
new file mode 100644 (file)
index 0000000..3b887f7
--- /dev/null
@@ -0,0 +1,161 @@
+/**
+ * Copyright (c) 2010-2020 Contributors to the openHAB project
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.openhab.binding.bluetooth.util;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.eclipse.jdt.annotation.NonNullByDefault;
+
+/**
+ * This is a utility class that allows adding {@link CompletableFuture} capabilities to a {@link Callable}.
+ * The provided callable will be executed asynchronously and the result will be used
+ * to complete the {@code RetryFuture} instance. As per its namesake, the RetryFuture allows
+ * the callable to reschedule itself by throwing a {@link RetryException}. Any other exception
+ * will simply complete the RetryFuture exceptionally as per {@link CompletableFuture#completeExceptionally(Throwable)}.
+ *
+ * @author Connor Petty - Initial contribution
+ *
+ */
+@NonNullByDefault
+public class RetryFuture<T> extends HeritableFuture<T> {
+
+    private final ScheduledExecutorService scheduler;
+
+    public RetryFuture(Callable<T> callable, ScheduledExecutorService scheduler, long delay, TimeUnit unit) {
+        this.scheduler = scheduler;
+        setParentFuture(() -> scheduler.schedule(new CallableTask(callable), delay, unit));
+    }
+
+    public RetryFuture(Supplier<CompletableFuture<T>> supplier, ScheduledExecutorService scheduler, long delay,
+            TimeUnit unit) {
+        this.scheduler = scheduler;
+        setParentFuture(() -> scheduler.schedule(new ComposeTask(supplier), delay, unit));
+    }
+
+    @Override
+    public Executor defaultExecutor() {
+        return scheduler;
+    }
+
+    private class CallableTask implements Runnable {
+
+        private final Callable<T> callable;
+
+        public CallableTask(Callable<T> callable) {
+            this.callable = callable;
+        }
+
+        @Override
+        public void run() {
+            try {
+                complete(callable.call());
+            } catch (RetryException e) {
+                setParentFuture(() -> {
+                    if (!isDone()) {
+                        return scheduler.schedule(this, e.delay, e.unit);
+                    }
+                    return null;
+                });
+            } catch (Exception e) {
+                completeExceptionally(e);
+            }
+        }
+    }
+
+    private class ComposeTask implements Runnable {
+
+        private final Supplier<CompletableFuture<T>> supplier;
+
+        public ComposeTask(Supplier<CompletableFuture<T>> supplier) {
+            this.supplier = supplier;
+        }
+
+        @Override
+        public void run() {
+            CompletableFuture<T> future = supplier.get();
+            setParentFuture(() -> future);
+            future.whenComplete((result, th) -> {
+                if (th instanceof CompletionException) {
+                    th = th.getCause();
+                }
+                if (th instanceof RetryException) {
+                    RetryException e = (RetryException) th;
+                    setParentFuture(() -> {
+                        if (!isDone()) {
+                            return scheduler.schedule(this, e.delay, e.unit);
+                        }
+                        return null;
+                    });
+                } else if (th != null) {
+                    completeExceptionally(th);
+                } else {
+                    complete(result);
+                }
+            });
+        }
+    }
+
+    /**
+     * This is a convinience method for calling {@code new RetryFuture<>(callable, scheduler)}
+     *
+     * @param <T> the result type of the callable task.
+     * @param callable the task to execute
+     * @param scheduler the scheduler to use
+     * @return a CompletableFuture that will return the result of the callable.
+     */
+    public static <T> CompletableFuture<T> callWithRetry(Callable<T> callable, ScheduledExecutorService scheduler) {
+        return new RetryFuture<>(callable, scheduler, 0, TimeUnit.NANOSECONDS);
+    }
+
+    public static <T> CompletableFuture<T> scheduleWithRetry(Callable<T> callable, ScheduledExecutorService scheduler,
+            long delay, TimeUnit unit) {
+        return new RetryFuture<>(callable, scheduler, delay, unit);
+    }
+
+    @SafeVarargs
+    public static <T> CompletableFuture<T> scheduleWithRetryForExceptions(Callable<T> callable,
+            ScheduledExecutorService scheduler, long initDelay, long retryDelay, TimeUnit unit,
+            Class<? extends Exception>... exceptions) {
+        Callable<T> task = () -> {
+            try {
+                return callable.call();
+            } catch (RetryException ex) {
+                throw ex;
+            } catch (Exception ex) {
+                for (Class<? extends Exception> exClass : exceptions) {
+                    if (exClass.isInstance(ex)) {
+                        throw new RetryException(retryDelay, unit);
+                    }
+                }
+                throw ex;
+            }
+        };
+        return new RetryFuture<>(task, scheduler, initDelay, unit);
+    }
+
+    public static <T> CompletableFuture<T> composeWithRetry(Supplier<CompletableFuture<T>> supplier,
+            ScheduledExecutorService scheduler) {
+        return new RetryFuture<>(supplier, scheduler, 0, TimeUnit.NANOSECONDS);
+    }
+
+    public static <T> CompletableFuture<T> composeWithRetry(Supplier<CompletableFuture<T>> supplier,
+            ScheduledExecutorService scheduler, long initDelay, TimeUnit unit) {
+        return new RetryFuture<>(supplier, scheduler, initDelay, unit);
+    }
+}
diff --git a/bundles/org.openhab.binding.bluetooth/src/test/java/org/openhab/binding/bluetooth/util/RetryFutureTest.java b/bundles/org.openhab.binding.bluetooth/src/test/java/org/openhab/binding/bluetooth/util/RetryFutureTest.java
new file mode 100644 (file)
index 0000000..b16621a
--- /dev/null
@@ -0,0 +1,168 @@
+/**
+ * Copyright (c) 2010-2020 Contributors to the openHAB project
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.openhab.binding.bluetooth.util;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.openhab.core.common.NamedThreadFactory;
+
+/**
+ * @author Connor Petty - Initial contribution
+ *
+ */
+class RetryFutureTest {
+
+    private ScheduledExecutorService scheduler;
+
+    @BeforeEach
+    public void init() {
+        ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1,
+                new NamedThreadFactory("RetryFutureTest", true));
+        scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+        scheduler.setRemoveOnCancelPolicy(true);
+        this.scheduler = scheduler;
+    }
+
+    @AfterEach
+    public void cleanup() {
+        scheduler.shutdownNow();
+    }
+
+    @Test
+    void callWithRetryNormal() throws InterruptedException {
+        Future<String> retryFuture = RetryFuture.callWithRetry(() -> "test", scheduler);
+        try {
+            assertEquals("test", retryFuture.get(100, TimeUnit.MILLISECONDS));
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            fail(e);
+        }
+    }
+
+    @Test
+    void callWithRetry1() throws InterruptedException {
+        AtomicInteger visitCount = new AtomicInteger();
+        Future<String> retryFuture = RetryFuture.callWithRetry(() -> {
+            if (visitCount.getAndIncrement() == 0) {
+                throw new RetryException(0, TimeUnit.SECONDS);
+            }
+            return "test";
+        }, scheduler);
+        try {
+            assertEquals("test", retryFuture.get(100, TimeUnit.MILLISECONDS));
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            fail(e);
+        }
+    }
+
+    @Test
+    void composeWithRetryNormal() throws InterruptedException {
+        CompletableFuture<?> composedFuture = new CompletableFuture<>();
+
+        Future<?> retryFuture = RetryFuture.composeWithRetry(() -> {
+            composedFuture.complete(null);
+            return composedFuture;
+        }, scheduler);
+
+        try {
+            retryFuture.get(100, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            fail(e);
+        }
+        assertTrue(composedFuture.isDone());
+    }
+
+    @Test
+    void composeWithRetryThrow() throws InterruptedException {
+        CompletableFuture<?> composedFuture = new CompletableFuture<>();
+
+        Future<?> retryFuture = RetryFuture.composeWithRetry(() -> {
+            composedFuture.completeExceptionally(new DummyException());
+            return composedFuture;
+        }, scheduler);
+
+        try {
+            retryFuture.get(100, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException | TimeoutException e) {
+            fail(e);
+        } catch (ExecutionException ex) {
+            assertTrue(ex.getCause() instanceof DummyException);
+        }
+        assertTrue(composedFuture.isDone());
+    }
+
+    @Test
+    void composeWithRetry1() throws InterruptedException {
+        AtomicInteger visitCount = new AtomicInteger();
+        CompletableFuture<String> composedFuture = new CompletableFuture<>();
+        Future<String> retryFuture = RetryFuture.composeWithRetry(() -> {
+            if (visitCount.getAndIncrement() == 0) {
+                return CompletableFuture.failedFuture(new RetryException(0, TimeUnit.SECONDS));
+            }
+            composedFuture.complete("test");
+            return composedFuture;
+        }, scheduler);
+
+        try {
+            assertEquals("test", retryFuture.get(100, TimeUnit.MILLISECONDS));
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            fail(e);
+        }
+        assertEquals(2, visitCount.get());
+        assertTrue(composedFuture.isDone());
+    }
+
+    @Test
+    void composeWithRetry1Cancel() throws InterruptedException {
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicInteger visitCount = new AtomicInteger();
+        CompletableFuture<String> composedFuture = new CompletableFuture<>();
+        Future<String> retryFuture = RetryFuture.composeWithRetry(() -> {
+            if (visitCount.getAndIncrement() == 0) {
+                return CompletableFuture.failedFuture(new RetryException(0, TimeUnit.SECONDS));
+            }
+            latch.countDown();
+            return composedFuture;
+        }, scheduler);
+
+        try {
+            if (!latch.await(100, TimeUnit.MILLISECONDS)) {
+                fail("Timeout while waiting for latch");
+            }
+            Thread.sleep(1);
+            retryFuture.cancel(false);
+
+            assertTrue(composedFuture.isCancelled());
+        } catch (InterruptedException e) {
+            fail(e);
+        }
+        assertEquals(2, visitCount.get());
+        assertTrue(composedFuture.isDone());
+    }
+
+    private static class DummyException extends Exception {
+
+    }
+}