2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.bluetooth.bluegiga.internal;
15 import java.util.LinkedList;
16 import java.util.List;
17 import java.util.Optional;
18 import java.util.Queue;
20 import java.util.concurrent.Callable;
21 import java.util.concurrent.CopyOnWriteArrayList;
22 import java.util.concurrent.CopyOnWriteArraySet;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.ScheduledExecutorService;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.TimeoutException;
28 import java.util.concurrent.atomic.AtomicInteger;
30 import org.eclipse.jdt.annotation.NonNullByDefault;
31 import org.eclipse.jdt.annotation.Nullable;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
36 * This class provides transaction management and queuing of {@link BlueGigaCommand} frames.
38 * @author Pauli Anttila - Initial contribution
42 public class BlueGigaTransactionManager implements BlueGigaSerialEventListener {
44 private static final int TRANSACTION_TIMEOUT_PERIOD_MS = 100;
46 private final Logger logger = LoggerFactory.getLogger(BlueGigaTransactionManager.class);
49 * Unique transaction id for request and response correlation
51 private AtomicInteger transactionId = new AtomicInteger();
54 * Ongoing transaction id. If not present, no ongoing transaction.
56 private volatile Optional<Integer> ongoingTransactionId = Optional.empty();
59 * Transaction listeners are used internally to correlate the commands and responses
61 private final List<BluetoothListener<? extends BlueGigaResponse>> transactionListeners = new CopyOnWriteArrayList<>();
64 * The event listeners will be notified of any asynchronous events
66 private final Set<BlueGigaEventListener> eventListeners = new CopyOnWriteArraySet<>();
68 private final Queue<BlueGigaUniqueCommand> sendQueue = new LinkedList<>();
69 private final ScheduledExecutorService executor;
70 private final BlueGigaSerialHandler serialHandler;
72 private @Nullable Future<?> transactionTimeoutTimer;
75 * Internal interface for transaction listeners.
77 interface BluetoothListener<T extends BlueGigaResponse> {
78 boolean transactionEvent(BlueGigaResponse response, int transactionId);
80 boolean transactionTimeout(int transactionId);
83 public BlueGigaTransactionManager(BlueGigaSerialHandler serialHandler, ScheduledExecutorService executor) {
84 this.serialHandler = serialHandler;
85 this.executor = executor;
86 serialHandler.addEventListener(this);
90 * Close transaction manager.
93 serialHandler.removeEventListener(this);
94 cancelTransactionTimer();
96 transactionListeners.clear();
97 eventListeners.clear();
98 logger.debug("Closed");
101 private void startTransactionTimer() {
102 transactionTimeoutTimer = executor.schedule(() -> {
103 notifyTransactionTimeout(ongoingTransactionId);
104 }, TRANSACTION_TIMEOUT_PERIOD_MS, TimeUnit.MILLISECONDS);
107 private void cancelTransactionTimer() {
108 if (transactionTimeoutTimer != null) {
109 transactionTimeoutTimer.cancel(true);
110 transactionTimeoutTimer = null;
114 private void sendNextFrame() {
115 getNextFrame().ifPresent(frame -> {
116 cancelTransactionTimer();
117 logger.debug("Send frame #{}: {}", frame.getTransactionId(), frame.getMessage());
118 ongoingTransactionId = Optional.of(frame.getTransactionId());
119 serialHandler.sendFrame(frame.getMessage());
120 startTransactionTimer();
124 @SuppressWarnings({ "null", "unused" })
125 private Optional<BlueGigaUniqueCommand> getNextFrame() {
126 while (!sendQueue.isEmpty()) {
127 BlueGigaUniqueCommand frame = sendQueue.poll();
129 if (frame.getMessage() != null) {
130 return Optional.of(frame);
132 logger.debug("Null message found from queue, skip it");
136 logger.debug("Null frame found from queue, skip it");
140 return Optional.empty();
144 * Add a {@link BlueGigaUniqueCommand} frame to the send queue. The sendQueue is a
145 * FIFO queue. This method queues a {@link BlueGigaCommand} frame without
146 * waiting for a response.
149 * {@link BlueGigaUniqueCommand}
151 public void queueFrame(BlueGigaUniqueCommand request) {
152 logger.trace("Queue TX BLE frame: {}", request);
153 sendQueue.add(request);
154 logger.trace("TX BLE queue size: {}", sendQueue.size());
157 private void sendNextTransactionIfNoOngoing() {
158 synchronized (this) {
159 logger.trace("Send next transaction if no ongoing");
160 if (!ongoingTransactionId.isPresent()) {
166 private void clearOngoingTransactionAndSendNext() {
167 synchronized (this) {
168 logger.trace("Clear ongoing transaction and send next frame from queue");
169 ongoingTransactionId = Optional.empty();
174 private void addTransactionListener(BluetoothListener<? extends BlueGigaResponse> listener) {
175 if (transactionListeners.contains(listener)) {
179 transactionListeners.add(listener);
182 private void removeTransactionListener(BluetoothListener<?> listener) {
183 transactionListeners.remove(listener);
187 * Sends a BlueGiga request without waiting for the response.
189 * @param bleCommand {@link BlueGigaCommand}
190 * @return response {@link Future} {@link BlueGigaResponse}
192 private <T extends BlueGigaResponse> Future<T> sendBleRequestAsync(final BlueGigaCommand bleCommand,
193 final Class<T> expected) {
194 class TransactionWaiter implements Callable<T>, BluetoothListener<T> {
195 private volatile boolean complete;
196 private Optional<BlueGigaResponse> response = Optional.empty();
197 private BlueGigaUniqueCommand query = new BlueGigaUniqueCommand(bleCommand,
198 transactionId.getAndIncrement());
200 @SuppressWarnings("unchecked")
202 public T call() throws TimeoutException {
203 // Register a listener
204 addTransactionListener(this);
206 // Send the transaction
208 sendNextTransactionIfNoOngoing();
210 // Wait transaction completed or timeout
211 synchronized (this) {
215 } catch (InterruptedException e) {
221 cancelTransactionTimer();
223 // Remove the listener
224 removeTransactionListener(this);
226 // Send next transaction if any
227 executor.submit(BlueGigaTransactionManager.this::clearOngoingTransactionAndSendNext);
229 if (response.isPresent()) {
230 return (T) response.get();
232 throw new TimeoutException("No response from BlueGiga controller");
237 public boolean transactionEvent(BlueGigaResponse bleResponse, int transactionId) {
238 logger.trace("Expected transactionId: {}, received transactionId: {}", query.getTransactionId(),
241 if (transactionId != query.getTransactionId()) {
242 logger.trace("Ignore frame as received transaction Id {} doesn't match expected transaction Id {}.",
243 transactionId, query.getTransactionId());
247 logger.trace("Expected frame: {}, received frame: {}", expected.getSimpleName(), bleResponse);
249 if (bleCommand instanceof BlueGigaDeviceCommand && bleResponse instanceof BlueGigaDeviceResponse) {
250 BlueGigaDeviceCommand devCommand = (BlueGigaDeviceCommand) bleCommand;
251 BlueGigaDeviceResponse devResponse = (BlueGigaDeviceResponse) bleResponse;
253 logger.trace("Expected connection id: {}, received connection id: {}", devCommand.getConnection(),
254 devResponse.getConnection());
256 if (devCommand.getConnection() != devResponse.getConnection()) {
257 logger.trace("Ignore response as received connection id {} doesn't match expected id {}.",
258 devResponse.getConnection(), devCommand.getConnection());
263 if (!expected.isInstance(bleResponse)) {
264 logger.trace("Ignoring {} frame which has not been requested.",
265 bleResponse.getClass().getSimpleName());
269 // Response received, notify waiter
270 response = Optional.of(bleResponse);
272 logger.debug("Received frame #{}: {}", transactionId, bleResponse);
273 synchronized (this) {
280 public boolean transactionTimeout(int transactionId) {
281 if (transactionId != query.getTransactionId()) {
284 logger.debug("Timeout, no response received for transaction {}", query.getTransactionId());
286 synchronized (this) {
293 Callable<T> worker = new TransactionWaiter();
294 return executor.submit(worker);
298 * Sends a {@link BlueGigaCommand} request to the NCP and waits for the response for specified period of time.
299 * The response is correlated with the request and the returned {@link BlueGigaResponse}
300 * contains the request and response data.
302 * @param bleCommand {@link BlueGigaCommand}
303 * @param timeout milliseconds to wait until {@link TimeoutException} is thrown
304 * @return response {@link BlueGigaResponse}
305 * @throws BlueGigaException when any error occurred
307 public <T extends BlueGigaResponse> T sendTransaction(BlueGigaCommand bleCommand, Class<T> expected, long timeout)
308 throws BlueGigaException {
309 Future<T> futureResponse = sendBleRequestAsync(bleCommand, expected);
311 return futureResponse.get(timeout, TimeUnit.MILLISECONDS);
312 } catch (TimeoutException | InterruptedException | ExecutionException e) {
313 futureResponse.cancel(true);
314 throw new BlueGigaException(String.format("Error sending BLE transaction: %s", e.getMessage()), e);
318 public void addEventListener(BlueGigaEventListener listener) {
319 eventListeners.add(listener);
322 public void removeEventListener(BlueGigaEventListener listener) {
323 eventListeners.remove(listener);
327 public void bluegigaFrameReceived(BlueGigaResponse event) {
328 if (event.isEvent()) {
329 notifyEventListeners(event);
331 notifyTransactionComplete(event);
336 * Notify any event listeners when we receive a response.
337 * This uses a separate thread to separate the processing of the event.
339 * @param response the response data received
340 * @return true if the response was processed
342 private void notifyEventListeners(final BlueGigaResponse response) {
343 // Notify the listeners
344 for (final BlueGigaEventListener listener : eventListeners) {
345 executor.submit(() -> listener.bluegigaEventReceived(response));
350 * Notify any internal transaction listeners when we receive a response.
353 * the response data received
355 private void notifyTransactionComplete(final BlueGigaResponse response) {
356 ongoingTransactionId.ifPresent(id -> {
357 boolean processed = false;
358 for (BluetoothListener<? extends BlueGigaResponse> listener : transactionListeners) {
359 if (listener.transactionEvent(response, id)) {
364 logger.debug("No listener found for received response: {}", response);
369 private void notifyTransactionTimeout(final Optional<Integer> transactionId) {
370 transactionId.ifPresent(id -> {
371 boolean processed = false;
372 for (BluetoothListener<? extends BlueGigaResponse> listener : transactionListeners) {
373 if (listener.transactionTimeout(id)) {
378 logger.debug("No listener found for transaction timeout event, transaction id {}", id);