2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.bluetooth.gattserial;
15 import java.util.Deque;
16 import java.util.Iterator;
17 import java.util.List;
18 import java.util.concurrent.CompletableFuture;
19 import java.util.concurrent.CompletionException;
20 import java.util.concurrent.ConcurrentLinkedDeque;
21 import java.util.concurrent.Future;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.TimeoutException;
25 import java.util.function.Consumer;
27 import org.eclipse.jdt.annotation.NonNullByDefault;
28 import org.eclipse.jdt.annotation.Nullable;
31 * @author Connor Petty - Initial Contribution
35 public abstract class GattSocket<T extends GattMessage, R extends GattMessage> {
37 private static final Future<?> COMPLETED_FUTURE = CompletableFuture.completedFuture(null);
39 private final Deque<MessageProcessor> messageProcessors = new ConcurrentLinkedDeque<>();
41 public void registerMessageHandler(MessageHandler<T, R> messageHandler) {
42 // we need to use a dummy future since ConcurrentHashMap doesn't allow null values
43 messageProcessors.addFirst(new MessageProcessor(messageHandler, COMPLETED_FUTURE));
46 protected abstract ScheduledExecutorService getScheduler();
48 public void sendMessage(MessageServicer<T, R> messageServicer) {
49 T message = messageServicer.createMessage();
51 CompletableFuture<@Nullable Void> messageFuture = sendMessage(message);
53 Future<?> timeoutFuture = getScheduler().schedule(() -> {
54 messageFuture.completeExceptionally(new TimeoutException("Timeout while waiting for response"));
55 }, messageServicer.getTimeout(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
57 MessageProcessor processor = new MessageProcessor(messageServicer, timeoutFuture);
58 messageProcessors.addLast(processor);
60 messageFuture.whenComplete((v, ex) -> {
61 if (ex instanceof CompletionException) {
65 if (messageServicer.handleFailedMessage(message, ex)) {
66 timeoutFuture.cancel(false);
67 messageProcessors.remove(processor);
73 public CompletableFuture<@Nullable Void> sendMessage(T message) {
74 List<byte[]> packets = createPackets(message);
75 var futures = packets.stream()//
76 .map(this::sendPacket)//
77 .toArray(CompletableFuture[]::new);
79 return CompletableFuture.allOf(futures);
82 protected List<byte[]> createPackets(T message) {
83 return List.of(message.getPayload());
86 protected abstract void parsePacket(byte[] packet, Consumer<R> messageHandler);
88 protected abstract CompletableFuture<@Nullable Void> sendPacket(byte[] value);
90 public void receivePacket(byte[] packet) {
91 parsePacket(packet, this::handleMessage);
94 private void handleMessage(R message) {
95 for (Iterator<MessageProcessor> it = messageProcessors.iterator(); it.hasNext();) {
96 MessageProcessor processor = it.next();
97 if (processor.messageHandler.handleReceivedMessage(message)) {
98 processor.timeoutFuture.cancel(false);
100 // we want to return after the first message servicer handles the message
101 if (processor.timeoutFuture != COMPLETED_FUTURE) {
108 private class MessageProcessor {
109 private MessageHandler<T, R> messageHandler;
110 private Future<?> timeoutFuture;
112 public MessageProcessor(MessageHandler<T, R> messageHandler, Future<?> timeoutFuture) {
113 this.messageHandler = messageHandler;
114 this.timeoutFuture = timeoutFuture;