2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.bluetooth.bluegiga.internal;
15 import java.util.LinkedList;
16 import java.util.List;
17 import java.util.Optional;
18 import java.util.Queue;
20 import java.util.concurrent.Callable;
21 import java.util.concurrent.CopyOnWriteArrayList;
22 import java.util.concurrent.CopyOnWriteArraySet;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.ScheduledExecutorService;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.TimeoutException;
28 import java.util.concurrent.atomic.AtomicInteger;
30 import org.eclipse.jdt.annotation.NonNullByDefault;
31 import org.eclipse.jdt.annotation.Nullable;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
36 * This class provides transaction management and queuing of {@link BlueGigaCommand} frames.
38 * @author Pauli Anttila - Initial contribution
42 public class BlueGigaTransactionManager implements BlueGigaSerialEventListener {
44 private static final int TRANSACTION_TIMEOUT_PERIOD_MS = 100;
46 private final Logger logger = LoggerFactory.getLogger(BlueGigaTransactionManager.class);
49 * Unique transaction id for request and response correlation
51 private AtomicInteger transactionId = new AtomicInteger();
54 * Ongoing transaction id. If not present, no ongoing transaction.
56 private volatile Optional<Integer> ongoingTransactionId = Optional.empty();
59 * Transaction listeners are used internally to correlate the commands and responses
61 private final List<BluetoothListener<? extends BlueGigaResponse>> transactionListeners = new CopyOnWriteArrayList<>();
64 * The event listeners will be notified of any asynchronous events
66 private final Set<BlueGigaEventListener> eventListeners = new CopyOnWriteArraySet<>();
68 private final Queue<BlueGigaUniqueCommand> sendQueue = new LinkedList<>();
69 private final ScheduledExecutorService executor;
70 private final BlueGigaSerialHandler serialHandler;
72 private @Nullable Future<?> transactionTimeoutTimer;
75 * Internal interface for transaction listeners.
77 interface BluetoothListener<T extends BlueGigaResponse> {
78 boolean transactionEvent(BlueGigaResponse response, int transactionId);
80 boolean transactionTimeout(int transactionId);
83 public BlueGigaTransactionManager(BlueGigaSerialHandler serialHandler, ScheduledExecutorService executor) {
84 this.serialHandler = serialHandler;
85 this.executor = executor;
86 serialHandler.addEventListener(this);
90 * Close transaction manager.
93 serialHandler.removeEventListener(this);
94 cancelTransactionTimer();
96 transactionListeners.clear();
97 eventListeners.clear();
98 logger.debug("Closed");
101 private void startTransactionTimer() {
102 transactionTimeoutTimer = executor.schedule(() -> {
103 notifyTransactionTimeout(ongoingTransactionId);
104 }, TRANSACTION_TIMEOUT_PERIOD_MS, TimeUnit.MILLISECONDS);
107 private void cancelTransactionTimer() {
109 Future<?> transTimer = transactionTimeoutTimer;
110 if (transTimer != null) {
111 transTimer.cancel(true);
116 private void sendNextFrame() {
117 getNextFrame().ifPresent(frame -> {
118 cancelTransactionTimer();
119 logger.debug("Send frame #{}: {}", frame.getTransactionId(), frame.getMessage());
120 ongoingTransactionId = Optional.of(frame.getTransactionId());
121 serialHandler.sendFrame(frame.getMessage());
122 startTransactionTimer();
126 private Optional<BlueGigaUniqueCommand> getNextFrame() {
127 while (!sendQueue.isEmpty()) {
129 BlueGigaUniqueCommand frame = sendQueue.poll();
131 return Optional.of(frame);
133 logger.debug("Null frame found from queue, skip it");
137 return Optional.empty();
141 * Add a {@link BlueGigaUniqueCommand} frame to the send queue. The sendQueue is a
142 * FIFO queue. This method queues a {@link BlueGigaCommand} frame without
143 * waiting for a response.
146 * {@link BlueGigaUniqueCommand}
148 public void queueFrame(BlueGigaUniqueCommand request) {
149 logger.trace("Queue TX BLE frame: {}", request);
150 sendQueue.add(request);
151 logger.trace("TX BLE queue size: {}", sendQueue.size());
154 private void sendNextTransactionIfNoOngoing() {
155 synchronized (this) {
156 logger.trace("Send next transaction if no ongoing");
157 if (ongoingTransactionId.isEmpty()) {
163 private void clearOngoingTransactionAndSendNext() {
164 synchronized (this) {
165 logger.trace("Clear ongoing transaction and send next frame from queue");
166 ongoingTransactionId = Optional.empty();
171 private void addTransactionListener(BluetoothListener<? extends BlueGigaResponse> listener) {
172 if (transactionListeners.contains(listener)) {
176 transactionListeners.add(listener);
179 private void removeTransactionListener(BluetoothListener<?> listener) {
180 transactionListeners.remove(listener);
184 * Sends a BlueGiga request without waiting for the response.
186 * @param bleCommand {@link BlueGigaCommand}
187 * @return response {@link Future} {@link BlueGigaResponse}
189 private <T extends BlueGigaResponse> Future<T> sendBleRequestAsync(final BlueGigaCommand bleCommand,
190 final Class<T> expected) {
191 class TransactionWaiter implements Callable<T>, BluetoothListener<T> {
192 private volatile boolean complete;
193 private Optional<BlueGigaResponse> response = Optional.empty();
194 private BlueGigaUniqueCommand query = new BlueGigaUniqueCommand(bleCommand,
195 transactionId.getAndIncrement());
197 @SuppressWarnings("unchecked")
199 public T call() throws TimeoutException {
200 // Register a listener
201 addTransactionListener(this);
203 // Send the transaction
205 sendNextTransactionIfNoOngoing();
207 // Wait transaction completed or timeout
208 synchronized (this) {
212 } catch (InterruptedException e) {
218 cancelTransactionTimer();
220 // Remove the listener
221 removeTransactionListener(this);
223 // Send next transaction if any
224 executor.submit(BlueGigaTransactionManager.this::clearOngoingTransactionAndSendNext);
226 if (response.isPresent()) {
227 return (T) response.get();
229 throw new TimeoutException("No response from BlueGiga controller");
234 public boolean transactionEvent(BlueGigaResponse bleResponse, int transactionId) {
235 logger.trace("Expected transactionId: {}, received transactionId: {}", query.getTransactionId(),
238 if (transactionId != query.getTransactionId()) {
239 logger.trace("Ignore frame as received transaction Id {} doesn't match expected transaction Id {}.",
240 transactionId, query.getTransactionId());
244 logger.trace("Expected frame: {}, received frame: {}", expected.getSimpleName(), bleResponse);
246 if (bleCommand instanceof BlueGigaDeviceCommand devCommand
247 && bleResponse instanceof BlueGigaDeviceResponse devResponse) {
248 logger.trace("Expected connection id: {}, received connection id: {}", devCommand.getConnection(),
249 devResponse.getConnection());
251 if (devCommand.getConnection() != devResponse.getConnection()) {
252 logger.trace("Ignore response as received connection id {} doesn't match expected id {}.",
253 devResponse.getConnection(), devCommand.getConnection());
258 if (!expected.isInstance(bleResponse)) {
259 logger.trace("Ignoring {} frame which has not been requested.",
260 bleResponse.getClass().getSimpleName());
264 // Response received, notify waiter
265 response = Optional.of(bleResponse);
267 logger.debug("Received frame #{}: {}", transactionId, bleResponse);
268 synchronized (this) {
275 public boolean transactionTimeout(int transactionId) {
276 if (transactionId != query.getTransactionId()) {
279 logger.debug("Timeout, no response received for transaction {}", query.getTransactionId());
281 synchronized (this) {
288 Callable<T> worker = new TransactionWaiter();
289 return executor.submit(worker);
293 * Sends a {@link BlueGigaCommand} request to the NCP and waits for the response for specified period of time.
294 * The response is correlated with the request and the returned {@link BlueGigaResponse}
295 * contains the request and response data.
297 * @param bleCommand {@link BlueGigaCommand}
298 * @param timeout milliseconds to wait until {@link TimeoutException} is thrown
299 * @return response {@link BlueGigaResponse}
300 * @throws BlueGigaException when any error occurred
302 public <T extends BlueGigaResponse> T sendTransaction(BlueGigaCommand bleCommand, Class<T> expected, long timeout)
303 throws BlueGigaException {
304 Future<T> futureResponse = sendBleRequestAsync(bleCommand, expected);
306 return futureResponse.get(timeout, TimeUnit.MILLISECONDS);
307 } catch (TimeoutException | InterruptedException | ExecutionException e) {
308 futureResponse.cancel(true);
309 throw new BlueGigaException(String.format("Error sending BLE transaction: %s", e.getMessage()), e);
313 public void addEventListener(BlueGigaEventListener listener) {
314 eventListeners.add(listener);
317 public void removeEventListener(BlueGigaEventListener listener) {
318 eventListeners.remove(listener);
322 public void bluegigaFrameReceived(BlueGigaResponse event) {
323 if (event.isEvent()) {
324 notifyEventListeners(event);
326 notifyTransactionComplete(event);
331 * Notify any event listeners when we receive a response.
332 * This uses a separate thread to separate the processing of the event.
334 * @param response the response data received
335 * @return true if the response was processed
337 private void notifyEventListeners(final BlueGigaResponse response) {
338 // Notify the listeners
339 for (final BlueGigaEventListener listener : eventListeners) {
340 executor.submit(() -> listener.bluegigaEventReceived(response));
345 * Notify any internal transaction listeners when we receive a response.
348 * the response data received
350 private void notifyTransactionComplete(final BlueGigaResponse response) {
351 ongoingTransactionId.ifPresent(id -> {
352 boolean processed = false;
353 for (BluetoothListener<? extends BlueGigaResponse> listener : transactionListeners) {
354 if (listener.transactionEvent(response, id)) {
359 logger.debug("No listener found for received response: {}", response);
364 private void notifyTransactionTimeout(final Optional<Integer> transactionId) {
365 transactionId.ifPresent(id -> {
366 boolean processed = false;
367 for (BluetoothListener<? extends BlueGigaResponse> listener : transactionListeners) {
368 if (listener.transactionTimeout(id)) {
373 logger.debug("No listener found for transaction timeout event, transaction id {}", id);