]> git.basschouten.com Git - openhab-addons.git/blob
60d4404731c3730a6e04cf5db99f605c059e3782
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.bluetooth.bluegiga.internal;
14
15 import java.util.LinkedList;
16 import java.util.List;
17 import java.util.Optional;
18 import java.util.Queue;
19 import java.util.Set;
20 import java.util.concurrent.Callable;
21 import java.util.concurrent.CopyOnWriteArrayList;
22 import java.util.concurrent.CopyOnWriteArraySet;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.ScheduledExecutorService;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.TimeoutException;
28 import java.util.concurrent.atomic.AtomicInteger;
29
30 import org.eclipse.jdt.annotation.NonNullByDefault;
31 import org.eclipse.jdt.annotation.Nullable;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /**
36  * This class provides transaction management and queuing of {@link BlueGigaCommand} frames.
37  *
38  * @author Pauli Anttila - Initial contribution
39  *
40  */
41 @NonNullByDefault
42 public class BlueGigaTransactionManager implements BlueGigaSerialEventListener {
43
44     private static final int TRANSACTION_TIMEOUT_PERIOD_MS = 100;
45
46     private final Logger logger = LoggerFactory.getLogger(BlueGigaTransactionManager.class);
47
48     /**
49      * Unique transaction id for request and response correlation
50      */
51     private AtomicInteger transactionId = new AtomicInteger();
52
53     /**
54      * Ongoing transaction id. If not present, no ongoing transaction.
55      */
56     private volatile Optional<Integer> ongoingTransactionId = Optional.empty();
57
58     /**
59      * Transaction listeners are used internally to correlate the commands and responses
60      */
61     private final List<BluetoothListener<? extends BlueGigaResponse>> transactionListeners = new CopyOnWriteArrayList<>();
62
63     /**
64      * The event listeners will be notified of any asynchronous events
65      */
66     private final Set<BlueGigaEventListener> eventListeners = new CopyOnWriteArraySet<>();
67
68     private final Queue<BlueGigaUniqueCommand> sendQueue = new LinkedList<>();
69     private final ScheduledExecutorService executor;
70     private final BlueGigaSerialHandler serialHandler;
71
72     private @Nullable Future<?> transactionTimeoutTimer;
73
74     /**
75      * Internal interface for transaction listeners.
76      */
77     interface BluetoothListener<T extends BlueGigaResponse> {
78         boolean transactionEvent(BlueGigaResponse response, int transactionId);
79
80         boolean transactionTimeout(int transactionId);
81     }
82
83     public BlueGigaTransactionManager(BlueGigaSerialHandler serialHandler, ScheduledExecutorService executor) {
84         this.serialHandler = serialHandler;
85         this.executor = executor;
86         serialHandler.addEventListener(this);
87     }
88
89     /**
90      * Close transaction manager.
91      */
92     public void close() {
93         serialHandler.removeEventListener(this);
94         cancelTransactionTimer();
95         sendQueue.clear();
96         transactionListeners.clear();
97         eventListeners.clear();
98         logger.debug("Closed");
99     }
100
101     private void startTransactionTimer() {
102         transactionTimeoutTimer = executor.schedule(() -> {
103             notifyTransactionTimeout(ongoingTransactionId);
104         }, TRANSACTION_TIMEOUT_PERIOD_MS, TimeUnit.MILLISECONDS);
105     }
106
107     private void cancelTransactionTimer() {
108         if (transactionTimeoutTimer != null) {
109             transactionTimeoutTimer.cancel(true);
110             transactionTimeoutTimer = null;
111         }
112     }
113
114     private void sendNextFrame() {
115         getNextFrame().ifPresent(frame -> {
116             cancelTransactionTimer();
117             logger.debug("Send frame #{}: {}", frame.getTransactionId(), frame.getMessage());
118             ongoingTransactionId = Optional.of(frame.getTransactionId());
119             serialHandler.sendFrame(frame.getMessage());
120             startTransactionTimer();
121         });
122     }
123
124     @SuppressWarnings({ "null", "unused" })
125     private Optional<BlueGigaUniqueCommand> getNextFrame() {
126         while (!sendQueue.isEmpty()) {
127             BlueGigaUniqueCommand frame = sendQueue.poll();
128             if (frame != null) {
129                 if (frame.getMessage() != null) {
130                     return Optional.of(frame);
131                 } else {
132                     logger.debug("Null message found from queue, skip it");
133                     continue;
134                 }
135             } else {
136                 logger.debug("Null frame found from queue, skip it");
137                 continue;
138             }
139         }
140         return Optional.empty();
141     }
142
143     /**
144      * Add a {@link BlueGigaUniqueCommand} frame to the send queue. The sendQueue is a
145      * FIFO queue. This method queues a {@link BlueGigaCommand} frame without
146      * waiting for a response.
147      *
148      * @param transaction
149      *            {@link BlueGigaUniqueCommand}
150      */
151     public void queueFrame(BlueGigaUniqueCommand request) {
152         logger.trace("Queue TX BLE frame: {}", request);
153         sendQueue.add(request);
154         logger.trace("TX BLE queue size: {}", sendQueue.size());
155     }
156
157     private void sendNextTransactionIfNoOngoing() {
158         synchronized (this) {
159             logger.trace("Send next transaction if no ongoing");
160             if (!ongoingTransactionId.isPresent()) {
161                 sendNextFrame();
162             }
163         }
164     }
165
166     private void clearOngoingTransactionAndSendNext() {
167         synchronized (this) {
168             logger.trace("Clear ongoing transaction and send next frame from queue");
169             ongoingTransactionId = Optional.empty();
170             sendNextFrame();
171         }
172     }
173
174     private void addTransactionListener(BluetoothListener<? extends BlueGigaResponse> listener) {
175         if (transactionListeners.contains(listener)) {
176             return;
177         }
178
179         transactionListeners.add(listener);
180     }
181
182     private void removeTransactionListener(BluetoothListener<?> listener) {
183         transactionListeners.remove(listener);
184     }
185
186     /**
187      * Sends a BlueGiga request without waiting for the response.
188      *
189      * @param bleCommand {@link BlueGigaCommand}
190      * @return response {@link Future} {@link BlueGigaResponse}
191      */
192     private <T extends BlueGigaResponse> Future<T> sendBleRequestAsync(final BlueGigaCommand bleCommand,
193             final Class<T> expected) {
194         class TransactionWaiter implements Callable<T>, BluetoothListener<T> {
195             private volatile boolean complete;
196             private Optional<BlueGigaResponse> response = Optional.empty();
197             private BlueGigaUniqueCommand query = new BlueGigaUniqueCommand(bleCommand,
198                     transactionId.getAndIncrement());
199
200             @SuppressWarnings("unchecked")
201             @Override
202             public T call() throws TimeoutException {
203                 // Register a listener
204                 addTransactionListener(this);
205
206                 // Send the transaction
207                 queueFrame(query);
208                 sendNextTransactionIfNoOngoing();
209
210                 // Wait transaction completed or timeout
211                 synchronized (this) {
212                     while (!complete) {
213                         try {
214                             wait();
215                         } catch (InterruptedException e) {
216                             complete = true;
217                         }
218                     }
219                 }
220
221                 cancelTransactionTimer();
222
223                 // Remove the listener
224                 removeTransactionListener(this);
225
226                 // Send next transaction if any
227                 executor.submit(BlueGigaTransactionManager.this::clearOngoingTransactionAndSendNext);
228
229                 if (response.isPresent()) {
230                     return (T) response.get();
231                 } else {
232                     throw new TimeoutException("No response from BlueGiga controller");
233                 }
234             }
235
236             @Override
237             public boolean transactionEvent(BlueGigaResponse bleResponse, int transactionId) {
238                 logger.trace("Expected transactionId: {}, received transactionId: {}", query.getTransactionId(),
239                         transactionId);
240
241                 if (transactionId != query.getTransactionId()) {
242                     logger.trace("Ignore frame as received transaction Id {} doesn't match expected transaction Id {}.",
243                             transactionId, query.getTransactionId());
244                     return false;
245                 }
246
247                 logger.trace("Expected frame: {}, received frame: {}", expected.getSimpleName(), bleResponse);
248
249                 if (bleCommand instanceof BlueGigaDeviceCommand && bleResponse instanceof BlueGigaDeviceResponse) {
250                     BlueGigaDeviceCommand devCommand = (BlueGigaDeviceCommand) bleCommand;
251                     BlueGigaDeviceResponse devResponse = (BlueGigaDeviceResponse) bleResponse;
252
253                     logger.trace("Expected connection id: {}, received connection id: {}", devCommand.getConnection(),
254                             devResponse.getConnection());
255
256                     if (devCommand.getConnection() != devResponse.getConnection()) {
257                         logger.trace("Ignore response as received connection id {} doesn't match expected id {}.",
258                                 devResponse.getConnection(), devCommand.getConnection());
259                         return false;
260                     }
261                 }
262
263                 if (!expected.isInstance(bleResponse)) {
264                     logger.trace("Ignoring {} frame which has not been requested.",
265                             bleResponse.getClass().getSimpleName());
266                     return false;
267                 }
268
269                 // Response received, notify waiter
270                 response = Optional.of(bleResponse);
271                 complete = true;
272                 logger.debug("Received frame #{}: {}", transactionId, bleResponse);
273                 synchronized (this) {
274                     notify();
275                 }
276                 return true;
277             }
278
279             @Override
280             public boolean transactionTimeout(int transactionId) {
281                 if (transactionId != query.getTransactionId()) {
282                     return false;
283                 }
284                 logger.debug("Timeout, no response received for transaction {}", query.getTransactionId());
285                 complete = true;
286                 synchronized (this) {
287                     notify();
288                 }
289                 return true;
290             }
291         }
292
293         Callable<T> worker = new TransactionWaiter();
294         return executor.submit(worker);
295     }
296
297     /**
298      * Sends a {@link BlueGigaCommand} request to the NCP and waits for the response for specified period of time.
299      * The response is correlated with the request and the returned {@link BlueGigaResponse}
300      * contains the request and response data.
301      *
302      * @param bleCommand {@link BlueGigaCommand}
303      * @param timeout milliseconds to wait until {@link TimeoutException} is thrown
304      * @return response {@link BlueGigaResponse}
305      * @throws BlueGigaException when any error occurred
306      */
307     public <T extends BlueGigaResponse> T sendTransaction(BlueGigaCommand bleCommand, Class<T> expected, long timeout)
308             throws BlueGigaException {
309         Future<T> futureResponse = sendBleRequestAsync(bleCommand, expected);
310         try {
311             return futureResponse.get(timeout, TimeUnit.MILLISECONDS);
312         } catch (TimeoutException | InterruptedException | ExecutionException e) {
313             futureResponse.cancel(true);
314             throw new BlueGigaException(String.format("Error sending BLE transaction: %s", e.getMessage()), e);
315         }
316     }
317
318     public void addEventListener(BlueGigaEventListener listener) {
319         eventListeners.add(listener);
320     }
321
322     public void removeEventListener(BlueGigaEventListener listener) {
323         eventListeners.remove(listener);
324     }
325
326     @Override
327     public void bluegigaFrameReceived(BlueGigaResponse event) {
328         if (event.isEvent()) {
329             notifyEventListeners(event);
330         } else {
331             notifyTransactionComplete(event);
332         }
333     }
334
335     /**
336      * Notify any event listeners when we receive a response.
337      * This uses a separate thread to separate the processing of the event.
338      *
339      * @param response the response data received
340      * @return true if the response was processed
341      */
342     private void notifyEventListeners(final BlueGigaResponse response) {
343         // Notify the listeners
344         for (final BlueGigaEventListener listener : eventListeners) {
345             executor.submit(() -> listener.bluegigaEventReceived(response));
346         }
347     }
348
349     /**
350      * Notify any internal transaction listeners when we receive a response.
351      *
352      * @param response
353      *            the response data received
354      */
355     private void notifyTransactionComplete(final BlueGigaResponse response) {
356         ongoingTransactionId.ifPresent(id -> {
357             boolean processed = false;
358             for (BluetoothListener<? extends BlueGigaResponse> listener : transactionListeners) {
359                 if (listener.transactionEvent(response, id)) {
360                     processed = true;
361                 }
362             }
363             if (!processed) {
364                 logger.debug("No listener found for received response: {}", response);
365             }
366         });
367     }
368
369     private void notifyTransactionTimeout(final Optional<Integer> transactionId) {
370         transactionId.ifPresent(id -> {
371             boolean processed = false;
372             for (BluetoothListener<? extends BlueGigaResponse> listener : transactionListeners) {
373                 if (listener.transactionTimeout(id)) {
374                     processed = true;
375                 }
376             }
377             if (!processed) {
378                 logger.debug("No listener found for transaction timeout event, transaction id {}", id);
379             }
380         });
381     }
382 }