]> git.basschouten.com Git - openhab-addons.git/blob
b62e68da9db3b11e2840d69ccff4facd2c93c54a
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.bluetooth.bluegiga.internal;
14
15 import java.util.LinkedList;
16 import java.util.List;
17 import java.util.Optional;
18 import java.util.Queue;
19 import java.util.Set;
20 import java.util.concurrent.Callable;
21 import java.util.concurrent.CopyOnWriteArrayList;
22 import java.util.concurrent.CopyOnWriteArraySet;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.ScheduledExecutorService;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.TimeoutException;
28 import java.util.concurrent.atomic.AtomicInteger;
29
30 import org.eclipse.jdt.annotation.NonNullByDefault;
31 import org.eclipse.jdt.annotation.Nullable;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /**
36  * This class provides transaction management and queuing of {@link BlueGigaCommand} frames.
37  *
38  * @author Pauli Anttila - Initial contribution
39  *
40  */
41 @NonNullByDefault
42 public class BlueGigaTransactionManager implements BlueGigaSerialEventListener {
43
44     private static final int TRANSACTION_TIMEOUT_PERIOD_MS = 100;
45
46     private final Logger logger = LoggerFactory.getLogger(BlueGigaTransactionManager.class);
47
48     /**
49      * Unique transaction id for request and response correlation
50      */
51     private AtomicInteger transactionId = new AtomicInteger();
52
53     /**
54      * Ongoing transaction id. If not present, no ongoing transaction.
55      */
56     private volatile Optional<Integer> ongoingTransactionId = Optional.empty();
57
58     /**
59      * Transaction listeners are used internally to correlate the commands and responses
60      */
61     private final List<BluetoothListener<? extends BlueGigaResponse>> transactionListeners = new CopyOnWriteArrayList<>();
62
63     /**
64      * The event listeners will be notified of any asynchronous events
65      */
66     private final Set<BlueGigaEventListener> eventListeners = new CopyOnWriteArraySet<>();
67
68     private final Queue<BlueGigaUniqueCommand> sendQueue = new LinkedList<>();
69     private final ScheduledExecutorService executor;
70     private final BlueGigaSerialHandler serialHandler;
71
72     private @Nullable Future<?> transactionTimeoutTimer;
73
74     /**
75      * Internal interface for transaction listeners.
76      */
77     interface BluetoothListener<T extends BlueGigaResponse> {
78         boolean transactionEvent(BlueGigaResponse response, int transactionId);
79
80         boolean transactionTimeout(int transactionId);
81     }
82
83     public BlueGigaTransactionManager(BlueGigaSerialHandler serialHandler, ScheduledExecutorService executor) {
84         this.serialHandler = serialHandler;
85         this.executor = executor;
86         serialHandler.addEventListener(this);
87     }
88
89     /**
90      * Close transaction manager.
91      */
92     public void close() {
93         serialHandler.removeEventListener(this);
94         cancelTransactionTimer();
95         sendQueue.clear();
96         transactionListeners.clear();
97         eventListeners.clear();
98         logger.debug("Closed");
99     }
100
101     private void startTransactionTimer() {
102         transactionTimeoutTimer = executor.schedule(() -> {
103             notifyTransactionTimeout(ongoingTransactionId);
104         }, TRANSACTION_TIMEOUT_PERIOD_MS, TimeUnit.MILLISECONDS);
105     }
106
107     private void cancelTransactionTimer() {
108         @Nullable
109         Future<?> transTimer = transactionTimeoutTimer;
110         if (transTimer != null) {
111             transTimer.cancel(true);
112             transTimer = null;
113         }
114     }
115
116     private void sendNextFrame() {
117         getNextFrame().ifPresent(frame -> {
118             cancelTransactionTimer();
119             logger.debug("Send frame #{}: {}", frame.getTransactionId(), frame.getMessage());
120             ongoingTransactionId = Optional.of(frame.getTransactionId());
121             serialHandler.sendFrame(frame.getMessage());
122             startTransactionTimer();
123         });
124     }
125
126     private Optional<BlueGigaUniqueCommand> getNextFrame() {
127         while (!sendQueue.isEmpty()) {
128             @Nullable
129             BlueGigaUniqueCommand frame = sendQueue.poll();
130             if (frame != null) {
131                 return Optional.of(frame);
132             } else {
133                 logger.debug("Null frame found from queue, skip it");
134                 continue;
135             }
136         }
137         return Optional.empty();
138     }
139
140     /**
141      * Add a {@link BlueGigaUniqueCommand} frame to the send queue. The sendQueue is a
142      * FIFO queue. This method queues a {@link BlueGigaCommand} frame without
143      * waiting for a response.
144      *
145      * @param transaction
146      *            {@link BlueGigaUniqueCommand}
147      */
148     public void queueFrame(BlueGigaUniqueCommand request) {
149         logger.trace("Queue TX BLE frame: {}", request);
150         sendQueue.add(request);
151         logger.trace("TX BLE queue size: {}", sendQueue.size());
152     }
153
154     private void sendNextTransactionIfNoOngoing() {
155         synchronized (this) {
156             logger.trace("Send next transaction if no ongoing");
157             if (ongoingTransactionId.isEmpty()) {
158                 sendNextFrame();
159             }
160         }
161     }
162
163     private void clearOngoingTransactionAndSendNext() {
164         synchronized (this) {
165             logger.trace("Clear ongoing transaction and send next frame from queue");
166             ongoingTransactionId = Optional.empty();
167             sendNextFrame();
168         }
169     }
170
171     private void addTransactionListener(BluetoothListener<? extends BlueGigaResponse> listener) {
172         if (transactionListeners.contains(listener)) {
173             return;
174         }
175
176         transactionListeners.add(listener);
177     }
178
179     private void removeTransactionListener(BluetoothListener<?> listener) {
180         transactionListeners.remove(listener);
181     }
182
183     /**
184      * Sends a BlueGiga request without waiting for the response.
185      *
186      * @param bleCommand {@link BlueGigaCommand}
187      * @return response {@link Future} {@link BlueGigaResponse}
188      */
189     private <T extends BlueGigaResponse> Future<T> sendBleRequestAsync(final BlueGigaCommand bleCommand,
190             final Class<T> expected) {
191         class TransactionWaiter implements Callable<T>, BluetoothListener<T> {
192             private volatile boolean complete;
193             private Optional<BlueGigaResponse> response = Optional.empty();
194             private BlueGigaUniqueCommand query = new BlueGigaUniqueCommand(bleCommand,
195                     transactionId.getAndIncrement());
196
197             @SuppressWarnings("unchecked")
198             @Override
199             public T call() throws TimeoutException {
200                 // Register a listener
201                 addTransactionListener(this);
202
203                 // Send the transaction
204                 queueFrame(query);
205                 sendNextTransactionIfNoOngoing();
206
207                 // Wait transaction completed or timeout
208                 synchronized (this) {
209                     while (!complete) {
210                         try {
211                             wait();
212                         } catch (InterruptedException e) {
213                             complete = true;
214                         }
215                     }
216                 }
217
218                 cancelTransactionTimer();
219
220                 // Remove the listener
221                 removeTransactionListener(this);
222
223                 // Send next transaction if any
224                 executor.submit(BlueGigaTransactionManager.this::clearOngoingTransactionAndSendNext);
225
226                 if (response.isPresent()) {
227                     return (T) response.get();
228                 } else {
229                     throw new TimeoutException("No response from BlueGiga controller");
230                 }
231             }
232
233             @Override
234             public boolean transactionEvent(BlueGigaResponse bleResponse, int transactionId) {
235                 logger.trace("Expected transactionId: {}, received transactionId: {}", query.getTransactionId(),
236                         transactionId);
237
238                 if (transactionId != query.getTransactionId()) {
239                     logger.trace("Ignore frame as received transaction Id {} doesn't match expected transaction Id {}.",
240                             transactionId, query.getTransactionId());
241                     return false;
242                 }
243
244                 logger.trace("Expected frame: {}, received frame: {}", expected.getSimpleName(), bleResponse);
245
246                 if (bleCommand instanceof BlueGigaDeviceCommand devCommand
247                         && bleResponse instanceof BlueGigaDeviceResponse devResponse) {
248
249                     logger.trace("Expected connection id: {}, received connection id: {}", devCommand.getConnection(),
250                             devResponse.getConnection());
251
252                     if (devCommand.getConnection() != devResponse.getConnection()) {
253                         logger.trace("Ignore response as received connection id {} doesn't match expected id {}.",
254                                 devResponse.getConnection(), devCommand.getConnection());
255                         return false;
256                     }
257                 }
258
259                 if (!expected.isInstance(bleResponse)) {
260                     logger.trace("Ignoring {} frame which has not been requested.",
261                             bleResponse.getClass().getSimpleName());
262                     return false;
263                 }
264
265                 // Response received, notify waiter
266                 response = Optional.of(bleResponse);
267                 complete = true;
268                 logger.debug("Received frame #{}: {}", transactionId, bleResponse);
269                 synchronized (this) {
270                     notify();
271                 }
272                 return true;
273             }
274
275             @Override
276             public boolean transactionTimeout(int transactionId) {
277                 if (transactionId != query.getTransactionId()) {
278                     return false;
279                 }
280                 logger.debug("Timeout, no response received for transaction {}", query.getTransactionId());
281                 complete = true;
282                 synchronized (this) {
283                     notify();
284                 }
285                 return true;
286             }
287         }
288
289         Callable<T> worker = new TransactionWaiter();
290         return executor.submit(worker);
291     }
292
293     /**
294      * Sends a {@link BlueGigaCommand} request to the NCP and waits for the response for specified period of time.
295      * The response is correlated with the request and the returned {@link BlueGigaResponse}
296      * contains the request and response data.
297      *
298      * @param bleCommand {@link BlueGigaCommand}
299      * @param timeout milliseconds to wait until {@link TimeoutException} is thrown
300      * @return response {@link BlueGigaResponse}
301      * @throws BlueGigaException when any error occurred
302      */
303     public <T extends BlueGigaResponse> T sendTransaction(BlueGigaCommand bleCommand, Class<T> expected, long timeout)
304             throws BlueGigaException {
305         Future<T> futureResponse = sendBleRequestAsync(bleCommand, expected);
306         try {
307             return futureResponse.get(timeout, TimeUnit.MILLISECONDS);
308         } catch (TimeoutException | InterruptedException | ExecutionException e) {
309             futureResponse.cancel(true);
310             throw new BlueGigaException(String.format("Error sending BLE transaction: %s", e.getMessage()), e);
311         }
312     }
313
314     public void addEventListener(BlueGigaEventListener listener) {
315         eventListeners.add(listener);
316     }
317
318     public void removeEventListener(BlueGigaEventListener listener) {
319         eventListeners.remove(listener);
320     }
321
322     @Override
323     public void bluegigaFrameReceived(BlueGigaResponse event) {
324         if (event.isEvent()) {
325             notifyEventListeners(event);
326         } else {
327             notifyTransactionComplete(event);
328         }
329     }
330
331     /**
332      * Notify any event listeners when we receive a response.
333      * This uses a separate thread to separate the processing of the event.
334      *
335      * @param response the response data received
336      * @return true if the response was processed
337      */
338     private void notifyEventListeners(final BlueGigaResponse response) {
339         // Notify the listeners
340         for (final BlueGigaEventListener listener : eventListeners) {
341             executor.submit(() -> listener.bluegigaEventReceived(response));
342         }
343     }
344
345     /**
346      * Notify any internal transaction listeners when we receive a response.
347      *
348      * @param response
349      *            the response data received
350      */
351     private void notifyTransactionComplete(final BlueGigaResponse response) {
352         ongoingTransactionId.ifPresent(id -> {
353             boolean processed = false;
354             for (BluetoothListener<? extends BlueGigaResponse> listener : transactionListeners) {
355                 if (listener.transactionEvent(response, id)) {
356                     processed = true;
357                 }
358             }
359             if (!processed) {
360                 logger.debug("No listener found for received response: {}", response);
361             }
362         });
363     }
364
365     private void notifyTransactionTimeout(final Optional<Integer> transactionId) {
366         transactionId.ifPresent(id -> {
367             boolean processed = false;
368             for (BluetoothListener<? extends BlueGigaResponse> listener : transactionListeners) {
369                 if (listener.transactionTimeout(id)) {
370                     processed = true;
371                 }
372             }
373             if (!processed) {
374                 logger.debug("No listener found for transaction timeout event, transaction id {}", id);
375             }
376         });
377     }
378 }