]> git.basschouten.com Git - openhab-addons.git/blob
d65b8d57f76d515c27641dca3e28d1551b676cbf
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2024 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.bluetooth.gattserial;
14
15 import java.util.Deque;
16 import java.util.Iterator;
17 import java.util.List;
18 import java.util.concurrent.CompletableFuture;
19 import java.util.concurrent.CompletionException;
20 import java.util.concurrent.ConcurrentLinkedDeque;
21 import java.util.concurrent.Future;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.TimeoutException;
25 import java.util.function.Consumer;
26
27 import org.eclipse.jdt.annotation.NonNullByDefault;
28 import org.eclipse.jdt.annotation.Nullable;
29
30 /**
31  * @author Connor Petty - Initial Contribution
32  *
33  */
34 @NonNullByDefault
35 public abstract class GattSocket<T extends GattMessage, R extends GattMessage> {
36
37     private static final Future<?> COMPLETED_FUTURE = CompletableFuture.completedFuture(null);
38
39     private final Deque<MessageProcessor> messageProcessors = new ConcurrentLinkedDeque<>();
40
41     public void registerMessageHandler(MessageHandler<T, R> messageHandler) {
42         // we need to use a dummy future since ConcurrentHashMap doesn't allow null values
43         messageProcessors.addFirst(new MessageProcessor(messageHandler, COMPLETED_FUTURE));
44     }
45
46     protected abstract ScheduledExecutorService getScheduler();
47
48     public void sendMessage(MessageServicer<T, R> messageServicer) {
49         T message = messageServicer.createMessage();
50
51         CompletableFuture<@Nullable Void> messageFuture = sendMessage(message);
52
53         Future<?> timeoutFuture = getScheduler().schedule(() -> {
54             messageFuture.completeExceptionally(new TimeoutException("Timeout while waiting for response"));
55         }, messageServicer.getTimeout(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
56
57         MessageProcessor processor = new MessageProcessor(messageServicer, timeoutFuture);
58         messageProcessors.addLast(processor);
59
60         messageFuture.whenComplete((v, ex) -> {
61             if (ex instanceof CompletionException) {
62                 ex = ex.getCause();
63             }
64             if (ex != null) {
65                 if (messageServicer.handleFailedMessage(message, ex)) {
66                     timeoutFuture.cancel(false);
67                     messageProcessors.remove(processor);
68                 }
69             }
70         });
71     }
72
73     public CompletableFuture<@Nullable Void> sendMessage(T message) {
74         List<byte[]> packets = createPackets(message);
75         var futures = packets.stream()//
76                 .map(this::sendPacket)//
77                 .toArray(CompletableFuture[]::new);
78
79         return CompletableFuture.allOf(futures);
80     }
81
82     protected List<byte[]> createPackets(T message) {
83         return List.of(message.getPayload());
84     }
85
86     protected abstract void parsePacket(byte[] packet, Consumer<R> messageHandler);
87
88     protected abstract CompletableFuture<@Nullable Void> sendPacket(byte[] value);
89
90     public void receivePacket(byte[] packet) {
91         parsePacket(packet, this::handleMessage);
92     }
93
94     private void handleMessage(R message) {
95         for (Iterator<MessageProcessor> it = messageProcessors.iterator(); it.hasNext();) {
96             MessageProcessor processor = it.next();
97             if (processor.messageHandler.handleReceivedMessage(message)) {
98                 processor.timeoutFuture.cancel(false);
99                 it.remove();
100                 // we want to return after the first message servicer handles the message
101                 if (processor.timeoutFuture != COMPLETED_FUTURE) {
102                     return;
103                 }
104             }
105         }
106     }
107
108     private class MessageProcessor {
109         private MessageHandler<T, R> messageHandler;
110         private Future<?> timeoutFuture;
111
112         public MessageProcessor(MessageHandler<T, R> messageHandler, Future<?> timeoutFuture) {
113             this.messageHandler = messageHandler;
114             this.timeoutFuture = timeoutFuture;
115         }
116     }
117 }