]> git.basschouten.com Git - openhab-addons.git/blob
3f5ef550ddb20aba225a1618a3961a7086f6c293
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.bluetooth.bluegiga.internal;
14
15 import java.util.LinkedList;
16 import java.util.List;
17 import java.util.Optional;
18 import java.util.Queue;
19 import java.util.Set;
20 import java.util.concurrent.Callable;
21 import java.util.concurrent.CopyOnWriteArrayList;
22 import java.util.concurrent.CopyOnWriteArraySet;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.ScheduledExecutorService;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.TimeoutException;
28 import java.util.concurrent.atomic.AtomicInteger;
29
30 import org.eclipse.jdt.annotation.NonNullByDefault;
31 import org.eclipse.jdt.annotation.Nullable;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /**
36  * This class provides transaction management and queuing of {@link BlueGigaCommand} frames.
37  *
38  * @author Pauli Anttila - Initial contribution
39  *
40  */
41 @NonNullByDefault
42 public class BlueGigaTransactionManager implements BlueGigaSerialEventListener {
43
44     private static final int TRANSACTION_TIMEOUT_PERIOD_MS = 100;
45
46     private final Logger logger = LoggerFactory.getLogger(BlueGigaTransactionManager.class);
47
48     /**
49      * Unique transaction id for request and response correlation
50      */
51     private AtomicInteger transactionId = new AtomicInteger();
52
53     /**
54      * Ongoing transaction id. If not present, no ongoing transaction.
55      */
56     private volatile Optional<Integer> ongoingTransactionId = Optional.empty();
57
58     /**
59      * Transaction listeners are used internally to correlate the commands and responses
60      */
61     private final List<BluetoothListener<? extends BlueGigaResponse>> transactionListeners = new CopyOnWriteArrayList<>();
62
63     /**
64      * The event listeners will be notified of any asynchronous events
65      */
66     private final Set<BlueGigaEventListener> eventListeners = new CopyOnWriteArraySet<>();
67
68     private final Queue<BlueGigaUniqueCommand> sendQueue = new LinkedList<>();
69     private final ScheduledExecutorService executor;
70     private final BlueGigaSerialHandler serialHandler;
71
72     private @Nullable Future<?> transactionTimeoutTimer;
73
74     /**
75      * Internal interface for transaction listeners.
76      */
77     interface BluetoothListener<T extends BlueGigaResponse> {
78         boolean transactionEvent(BlueGigaResponse response, int transactionId);
79
80         boolean transactionTimeout(int transactionId);
81     }
82
83     public BlueGigaTransactionManager(BlueGigaSerialHandler serialHandler, ScheduledExecutorService executor) {
84         this.serialHandler = serialHandler;
85         this.executor = executor;
86         serialHandler.addEventListener(this);
87     }
88
89     /**
90      * Close transaction manager.
91      */
92     public void close() {
93         serialHandler.removeEventListener(this);
94         cancelTransactionTimer();
95         sendQueue.clear();
96         transactionListeners.clear();
97         eventListeners.clear();
98         logger.debug("Closed");
99     }
100
101     private void startTransactionTimer() {
102         transactionTimeoutTimer = executor.schedule(() -> {
103             notifyTransactionTimeout(ongoingTransactionId);
104         }, TRANSACTION_TIMEOUT_PERIOD_MS, TimeUnit.MILLISECONDS);
105     }
106
107     private void cancelTransactionTimer() {
108         @Nullable
109         Future<?> transTimer = transactionTimeoutTimer;
110         if (transTimer != null) {
111             transTimer.cancel(true);
112             transTimer = null;
113         }
114     }
115
116     private void sendNextFrame() {
117         getNextFrame().ifPresent(frame -> {
118             cancelTransactionTimer();
119             logger.debug("Send frame #{}: {}", frame.getTransactionId(), frame.getMessage());
120             ongoingTransactionId = Optional.of(frame.getTransactionId());
121             serialHandler.sendFrame(frame.getMessage());
122             startTransactionTimer();
123         });
124     }
125
126     private Optional<BlueGigaUniqueCommand> getNextFrame() {
127         while (!sendQueue.isEmpty()) {
128             @Nullable
129             BlueGigaUniqueCommand frame = sendQueue.poll();
130             if (frame != null) {
131                 return Optional.of(frame);
132             } else {
133                 logger.debug("Null frame found from queue, skip it");
134                 continue;
135             }
136         }
137         return Optional.empty();
138     }
139
140     /**
141      * Add a {@link BlueGigaUniqueCommand} frame to the send queue. The sendQueue is a
142      * FIFO queue. This method queues a {@link BlueGigaCommand} frame without
143      * waiting for a response.
144      *
145      * @param request
146      *            {@link BlueGigaUniqueCommand}
147      */
148     public void queueFrame(BlueGigaUniqueCommand request) {
149         logger.trace("Queue TX BLE frame: {}", request);
150         sendQueue.add(request);
151         logger.trace("TX BLE queue size: {}", sendQueue.size());
152     }
153
154     private void sendNextTransactionIfNoOngoing() {
155         synchronized (this) {
156             logger.trace("Send next transaction if no ongoing");
157             if (ongoingTransactionId.isEmpty()) {
158                 sendNextFrame();
159             }
160         }
161     }
162
163     private void clearOngoingTransactionAndSendNext() {
164         synchronized (this) {
165             logger.trace("Clear ongoing transaction and send next frame from queue");
166             ongoingTransactionId = Optional.empty();
167             sendNextFrame();
168         }
169     }
170
171     private void addTransactionListener(BluetoothListener<? extends BlueGigaResponse> listener) {
172         if (transactionListeners.contains(listener)) {
173             return;
174         }
175
176         transactionListeners.add(listener);
177     }
178
179     private void removeTransactionListener(BluetoothListener<?> listener) {
180         transactionListeners.remove(listener);
181     }
182
183     /**
184      * Sends a BlueGiga request without waiting for the response.
185      *
186      * @param bleCommand {@link BlueGigaCommand}
187      * @return response {@link Future} {@link BlueGigaResponse}
188      */
189     private <T extends BlueGigaResponse> Future<T> sendBleRequestAsync(final BlueGigaCommand bleCommand,
190             final Class<T> expected) {
191         class TransactionWaiter implements Callable<T>, BluetoothListener<T> {
192             private volatile boolean complete;
193             private Optional<BlueGigaResponse> response = Optional.empty();
194             private BlueGigaUniqueCommand query = new BlueGigaUniqueCommand(bleCommand,
195                     transactionId.getAndIncrement());
196
197             @SuppressWarnings("unchecked")
198             @Override
199             public T call() throws TimeoutException {
200                 // Register a listener
201                 addTransactionListener(this);
202
203                 // Send the transaction
204                 queueFrame(query);
205                 sendNextTransactionIfNoOngoing();
206
207                 // Wait transaction completed or timeout
208                 synchronized (this) {
209                     while (!complete) {
210                         try {
211                             wait();
212                         } catch (InterruptedException e) {
213                             complete = true;
214                         }
215                     }
216                 }
217
218                 cancelTransactionTimer();
219
220                 // Remove the listener
221                 removeTransactionListener(this);
222
223                 // Send next transaction if any
224                 executor.submit(BlueGigaTransactionManager.this::clearOngoingTransactionAndSendNext);
225
226                 if (response.isPresent()) {
227                     return (T) response.get();
228                 } else {
229                     throw new TimeoutException("No response from BlueGiga controller");
230                 }
231             }
232
233             @Override
234             public boolean transactionEvent(BlueGigaResponse bleResponse, int transactionId) {
235                 logger.trace("Expected transactionId: {}, received transactionId: {}", query.getTransactionId(),
236                         transactionId);
237
238                 if (transactionId != query.getTransactionId()) {
239                     logger.trace("Ignore frame as received transaction Id {} doesn't match expected transaction Id {}.",
240                             transactionId, query.getTransactionId());
241                     return false;
242                 }
243
244                 logger.trace("Expected frame: {}, received frame: {}", expected.getSimpleName(), bleResponse);
245
246                 if (bleCommand instanceof BlueGigaDeviceCommand devCommand
247                         && bleResponse instanceof BlueGigaDeviceResponse devResponse) {
248                     logger.trace("Expected connection id: {}, received connection id: {}", devCommand.getConnection(),
249                             devResponse.getConnection());
250
251                     if (devCommand.getConnection() != devResponse.getConnection()) {
252                         logger.trace("Ignore response as received connection id {} doesn't match expected id {}.",
253                                 devResponse.getConnection(), devCommand.getConnection());
254                         return false;
255                     }
256                 }
257
258                 if (!expected.isInstance(bleResponse)) {
259                     logger.trace("Ignoring {} frame which has not been requested.",
260                             bleResponse.getClass().getSimpleName());
261                     return false;
262                 }
263
264                 // Response received, notify waiter
265                 response = Optional.of(bleResponse);
266                 complete = true;
267                 logger.debug("Received frame #{}: {}", transactionId, bleResponse);
268                 synchronized (this) {
269                     notify();
270                 }
271                 return true;
272             }
273
274             @Override
275             public boolean transactionTimeout(int transactionId) {
276                 if (transactionId != query.getTransactionId()) {
277                     return false;
278                 }
279                 logger.debug("Timeout, no response received for transaction {}", query.getTransactionId());
280                 complete = true;
281                 synchronized (this) {
282                     notify();
283                 }
284                 return true;
285             }
286         }
287
288         Callable<T> worker = new TransactionWaiter();
289         return executor.submit(worker);
290     }
291
292     /**
293      * Sends a {@link BlueGigaCommand} request to the NCP and waits for the response for specified period of time.
294      * The response is correlated with the request and the returned {@link BlueGigaResponse}
295      * contains the request and response data.
296      *
297      * @param bleCommand {@link BlueGigaCommand}
298      * @param timeout milliseconds to wait until {@link TimeoutException} is thrown
299      * @return response {@link BlueGigaResponse}
300      * @throws BlueGigaException when any error occurred
301      */
302     public <T extends BlueGigaResponse> T sendTransaction(BlueGigaCommand bleCommand, Class<T> expected, long timeout)
303             throws BlueGigaException {
304         Future<T> futureResponse = sendBleRequestAsync(bleCommand, expected);
305         try {
306             return futureResponse.get(timeout, TimeUnit.MILLISECONDS);
307         } catch (TimeoutException | InterruptedException | ExecutionException e) {
308             futureResponse.cancel(true);
309             throw new BlueGigaException(String.format("Error sending BLE transaction: %s", e.getMessage()), e);
310         }
311     }
312
313     public void addEventListener(BlueGigaEventListener listener) {
314         eventListeners.add(listener);
315     }
316
317     public void removeEventListener(BlueGigaEventListener listener) {
318         eventListeners.remove(listener);
319     }
320
321     @Override
322     public void bluegigaFrameReceived(BlueGigaResponse event) {
323         if (event.isEvent()) {
324             notifyEventListeners(event);
325         } else {
326             notifyTransactionComplete(event);
327         }
328     }
329
330     /**
331      * Notify any event listeners when we receive a response.
332      * This uses a separate thread to separate the processing of the event.
333      *
334      * @param response the response data received
335      * @return true if the response was processed
336      */
337     private void notifyEventListeners(final BlueGigaResponse response) {
338         // Notify the listeners
339         for (final BlueGigaEventListener listener : eventListeners) {
340             executor.submit(() -> listener.bluegigaEventReceived(response));
341         }
342     }
343
344     /**
345      * Notify any internal transaction listeners when we receive a response.
346      *
347      * @param response
348      *            the response data received
349      */
350     private void notifyTransactionComplete(final BlueGigaResponse response) {
351         ongoingTransactionId.ifPresent(id -> {
352             boolean processed = false;
353             for (BluetoothListener<? extends BlueGigaResponse> listener : transactionListeners) {
354                 if (listener.transactionEvent(response, id)) {
355                     processed = true;
356                 }
357             }
358             if (!processed) {
359                 logger.debug("No listener found for received response: {}", response);
360             }
361         });
362     }
363
364     private void notifyTransactionTimeout(final Optional<Integer> transactionId) {
365         transactionId.ifPresent(id -> {
366             boolean processed = false;
367             for (BluetoothListener<? extends BlueGigaResponse> listener : transactionListeners) {
368                 if (listener.transactionTimeout(id)) {
369                     processed = true;
370                 }
371             }
372             if (!processed) {
373                 logger.debug("No listener found for transaction timeout event, transaction id {}", id);
374             }
375         });
376     }
377 }