]> git.basschouten.com Git - openhab-addons.git/blob
4d75f9d0802230793542b880fde5ba5135679b2d
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.io.transport.modbus.internal;
14
15 import java.io.IOException;
16 import java.util.LinkedList;
17 import java.util.Map;
18 import java.util.Objects;
19 import java.util.Optional;
20 import java.util.Set;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.ScheduledFuture;
25 import java.util.concurrent.ThreadPoolExecutor;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicReference;
28
29 import javax.imageio.IIOException;
30
31 import org.apache.commons.pool2.KeyedObjectPool;
32 import org.apache.commons.pool2.SwallowedExceptionListener;
33 import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
34 import org.eclipse.jdt.annotation.NonNull;
35 import org.eclipse.jdt.annotation.NonNullByDefault;
36 import org.eclipse.jdt.annotation.Nullable;
37 import org.eclipse.jetty.util.ConcurrentHashSet;
38 import org.openhab.core.common.ThreadPoolManager;
39 import org.openhab.io.transport.modbus.AsyncModbusFailure;
40 import org.openhab.io.transport.modbus.AsyncModbusWriteResult;
41 import org.openhab.io.transport.modbus.ModbusCommunicationInterface;
42 import org.openhab.io.transport.modbus.ModbusFailureCallback;
43 import org.openhab.io.transport.modbus.ModbusManager;
44 import org.openhab.io.transport.modbus.ModbusReadCallback;
45 import org.openhab.io.transport.modbus.ModbusReadRequestBlueprint;
46 import org.openhab.io.transport.modbus.ModbusResultCallback;
47 import org.openhab.io.transport.modbus.ModbusWriteCallback;
48 import org.openhab.io.transport.modbus.ModbusWriteRequestBlueprint;
49 import org.openhab.io.transport.modbus.PollTask;
50 import org.openhab.io.transport.modbus.TaskWithEndpoint;
51 import org.openhab.io.transport.modbus.WriteTask;
52 import org.openhab.io.transport.modbus.endpoint.EndpointPoolConfiguration;
53 import org.openhab.io.transport.modbus.endpoint.ModbusSerialSlaveEndpoint;
54 import org.openhab.io.transport.modbus.endpoint.ModbusSlaveEndpoint;
55 import org.openhab.io.transport.modbus.endpoint.ModbusSlaveEndpointVisitor;
56 import org.openhab.io.transport.modbus.endpoint.ModbusTCPSlaveEndpoint;
57 import org.openhab.io.transport.modbus.endpoint.ModbusUDPSlaveEndpoint;
58 import org.openhab.io.transport.modbus.exception.ModbusConnectionException;
59 import org.openhab.io.transport.modbus.exception.ModbusUnexpectedResponseFunctionCodeException;
60 import org.openhab.io.transport.modbus.exception.ModbusUnexpectedResponseSizeException;
61 import org.openhab.io.transport.modbus.exception.ModbusUnexpectedTransactionIdException;
62 import org.openhab.io.transport.modbus.internal.pooling.ModbusSlaveConnectionFactoryImpl;
63 import org.osgi.service.component.annotations.Activate;
64 import org.osgi.service.component.annotations.Component;
65 import org.osgi.service.component.annotations.Deactivate;
66 import org.slf4j.Logger;
67 import org.slf4j.LoggerFactory;
68
69 import net.wimpi.modbus.Modbus;
70 import net.wimpi.modbus.ModbusException;
71 import net.wimpi.modbus.ModbusIOException;
72 import net.wimpi.modbus.ModbusSlaveException;
73 import net.wimpi.modbus.io.ModbusTransaction;
74 import net.wimpi.modbus.msg.ModbusRequest;
75 import net.wimpi.modbus.msg.ModbusResponse;
76 import net.wimpi.modbus.net.ModbusSlaveConnection;
77
78 /**
79  * Main implementation of ModbusManager
80  *
81  * We use connection pool to ensure that only single transaction is ongoing per each endpoint. This is especially
82  * important with serial slaves but practice has shown that even many tcp slaves have limited
83  * capability to handle many connections at the same time
84  *
85  * @author Sami Salonen - Initial contribution
86  */
87 @Component(service = ModbusManager.class, configurationPid = "transport.modbus")
88 @NonNullByDefault
89 public class ModbusManagerImpl implements ModbusManager {
90
91     static class PollTaskUnregistered extends Exception {
92         public PollTaskUnregistered(String msg) {
93             super(msg);
94         }
95
96         private static final long serialVersionUID = 6939730579178506885L;
97     }
98
99     @FunctionalInterface
100     private interface ModbusOperation<T> {
101
102         /**
103          * Execute the operation.
104          *
105          * All errors should be raised. There should not be any retry mechanism implemented at this level
106          *
107          * @param timer aggregate stop watch for performance profiling
108          * @param task task to execute
109          * @param connection connection to use
110          *
111          * @throws IIOException on generic IO errors
112          * @throws ModbusException on Modbus protocol errors (e.g. ModbusIOException on I/O, ModbusSlaveException on
113          *             slave exception responses)
114          * @throws ModbusUnexpectedTransactionIdException when transaction IDs of the request and
115          *             response do not match
116          * @throws ModbusUnexpectedResponseFunctionCodeException when response function code does not match the request
117          *             (ill-behaving slave)
118          * @throws ModbusUnexpectedResponseSizeException when data length of the response and request do not match
119          */
120         public void accept(AggregateStopWatch timer, T task, ModbusSlaveConnection connection)
121                 throws ModbusException, IIOException, ModbusUnexpectedTransactionIdException,
122                 ModbusUnexpectedResponseFunctionCodeException, ModbusUnexpectedResponseSizeException;
123     }
124
125     /**
126      * Check that transaction id of the response and request match
127      *
128      * @param response response from the slave corresponding to request
129      * @param libRequest modbus request
130      * @param operationId operation id for logging
131      * @throws ModbusUnexpectedTransactionIdException when transaction IDs of the request and
132      *             response do not match
133      */
134     private <R> void checkTransactionId(ModbusResponse response, ModbusRequest libRequest, String operationId)
135             throws ModbusUnexpectedTransactionIdException {
136         // Compare request and response transaction ID. NOTE: ModbusTransaction.getTransactionID() is static and
137         // not safe to use
138         if ((response.getTransactionID() != libRequest.getTransactionID()) && !response.isHeadless()) {
139             throw new ModbusUnexpectedTransactionIdException(libRequest.getTransactionID(),
140                     response.getTransactionID());
141         }
142     }
143
144     /**
145      * Check that function code of the response and request match
146      *
147      * @param response response from the slave corresponding to request
148      * @param libRequest modbus request
149      * @param operationId operation id for logging
150      * @throws ModbusUnexpectedResponseFunctionCodeException when response function code does not match the request
151      *             (ill-behaving slave)
152      */
153     private <R> void checkFunctionCode(ModbusResponse response, ModbusRequest libRequest, String operationId)
154             throws ModbusUnexpectedResponseFunctionCodeException {
155         if ((response.getFunctionCode() != libRequest.getFunctionCode())) {
156             throw new ModbusUnexpectedResponseFunctionCodeException(libRequest.getTransactionID(),
157                     response.getTransactionID());
158         }
159     }
160
161     /**
162      * Check that number of bits/registers/discrete inputs is not less than what was requested.
163      *
164      * According to modbus protocol, we should get always get always equal amount of registers data back as response.
165      * With coils and discrete inputs, we can get more since responses are in 8 bit chunks.
166      *
167      * However, in no case we expect less items in response.
168      *
169      * This is to identify clearly invalid responses which might cause problems downstream when using the data.
170      *
171      * @param response response response from the slave corresponding to request
172      * @param request modbus request
173      * @param operationId operation id for logging
174      * @throws ModbusUnexpectedResponseSizeException when data length of the response and request do not match
175      */
176     private <R> void checkResponseSize(ModbusResponse response, ModbusReadRequestBlueprint request, String operationId)
177             throws ModbusUnexpectedResponseSizeException {
178         final int responseCount = ModbusLibraryWrapper.getNumberOfItemsInResponse(response, request);
179         if (responseCount < request.getDataLength()) {
180             throw new ModbusUnexpectedResponseSizeException(request.getDataLength(), responseCount);
181         }
182     }
183
184     /**
185      * Implementation for the PollTask operation
186      *
187      * @author Sami Salonen - Initial contribution
188      *
189      */
190     private class PollOperation implements ModbusOperation<PollTask> {
191         @Override
192         public void accept(AggregateStopWatch timer, PollTask task, ModbusSlaveConnection connection)
193                 throws ModbusException, ModbusUnexpectedTransactionIdException,
194                 ModbusUnexpectedResponseFunctionCodeException, ModbusUnexpectedResponseSizeException {
195             ModbusSlaveEndpoint endpoint = task.getEndpoint();
196             ModbusReadRequestBlueprint request = task.getRequest();
197             ModbusReadCallback callback = task.getResultCallback();
198             String operationId = timer.operationId;
199
200             ModbusTransaction transaction = ModbusLibraryWrapper.createTransactionForEndpoint(endpoint, connection);
201             ModbusRequest libRequest = ModbusLibraryWrapper.createRequest(request);
202             transaction.setRequest(libRequest);
203
204             logger.trace("Going execute transaction with request request (FC={}): {} [operation ID {}]",
205                     request.getFunctionCode(), libRequest.getHexMessage(), operationId);
206             // Might throw ModbusIOException (I/O error) or ModbusSlaveException (explicit exception response from
207             // slave)
208             timer.transaction.timeRunnableWithModbusException(() -> transaction.execute());
209             ModbusResponse response = transaction.getResponse();
210             logger.trace("Response for read request (FC={}, transaction ID={}): {} [operation ID {}]",
211                     response.getFunctionCode(), response.getTransactionID(), response.getHexMessage(), operationId);
212             checkTransactionId(response, libRequest, operationId);
213             checkFunctionCode(response, libRequest, operationId);
214             checkResponseSize(response, request, operationId);
215             timer.callback
216                     .timeRunnable(() -> ModbusLibraryWrapper.invokeCallbackWithResponse(request, callback, response));
217         }
218     }
219
220     /**
221      * Implementation for WriteTask operation
222      *
223      * @author Sami Salonen - Initial contribution
224      *
225      */
226     private class WriteOperation implements ModbusOperation<WriteTask> {
227         @Override
228         public void accept(AggregateStopWatch timer, WriteTask task, ModbusSlaveConnection connection)
229                 throws ModbusException, ModbusUnexpectedTransactionIdException,
230                 ModbusUnexpectedResponseFunctionCodeException {
231             ModbusSlaveEndpoint endpoint = task.getEndpoint();
232             ModbusWriteRequestBlueprint request = task.getRequest();
233             @Nullable
234             ModbusWriteCallback callback = task.getResultCallback();
235             String operationId = timer.operationId;
236
237             ModbusTransaction transaction = ModbusLibraryWrapper.createTransactionForEndpoint(endpoint, connection);
238             ModbusRequest libRequest = ModbusLibraryWrapper.createRequest(request);
239             transaction.setRequest(libRequest);
240
241             logger.trace("Going execute transaction with read request (FC={}): {} [operation ID {}]",
242                     request.getFunctionCode(), libRequest.getHexMessage(), operationId);
243
244             // Might throw ModbusIOException (I/O error) or ModbusSlaveException (explicit exception response from
245             // slave)
246             timer.transaction.timeRunnableWithModbusException(() -> transaction.execute());
247             ModbusResponse response = transaction.getResponse();
248             logger.trace("Response for write request (FC={}, transaction ID={}): {} [operation ID {}]",
249                     response.getFunctionCode(), response.getTransactionID(), response.getHexMessage(), operationId);
250             checkTransactionId(response, libRequest, operationId);
251             checkFunctionCode(response, libRequest, operationId);
252             timer.callback.timeRunnable(
253                     () -> invokeCallbackWithResponse(request, callback, new ModbusResponseImpl(response)));
254         }
255     }
256
257     private final Logger logger = LoggerFactory.getLogger(ModbusManagerImpl.class);
258     private final Logger pollMonitorLogger = LoggerFactory
259             .getLogger(ModbusManagerImpl.class.getName() + ".PollMonitor");
260
261     /**
262      * Time to wait between connection passive+borrow, i.e. time to wait between
263      * transactions
264      * Default 60ms for TCP slaves, Siemens S7 1212 PLC couldn't handle faster
265      * requests with default settings.
266      */
267     public static final long DEFAULT_TCP_INTER_TRANSACTION_DELAY_MILLIS = 60;
268
269     /**
270      * Time to wait between connection passive+borrow, i.e. time to wait between
271      * transactions
272      * Default 35ms for Serial slaves, motivation discussed
273      * here https://community.openhab.org/t/connection-pooling-in-modbus-binding/5246/111?u=ssalonen
274      */
275     public static final long DEFAULT_SERIAL_INTER_TRANSACTION_DELAY_MILLIS = 35;
276
277     /**
278      * Thread naming for modbus read & write requests. Also used by the monitor thread
279      */
280     private static final String MODBUS_POLLER_THREAD_POOL_NAME = "modbusManagerPollerThreadPool";
281
282     /**
283      * Log message with WARN level if the task queues exceed this limit.
284      *
285      * If the queues grow too large, it might be an issue with consumer of the ModbusManager.
286      *
287      * You can generate large queue by spamming ModbusManager with one-off read or writes (submitOnTimePoll or
288      * submitOneTimeWrite).
289      *
290      * Note that there is no issue registering many regular polls, those do not "queue" the same way.
291      *
292      * Presumably slow callbacks can increase queue size with callbackThreadPool
293      */
294     private static final long WARN_QUEUE_SIZE = 500;
295     private static final long MONITOR_QUEUE_INTERVAL_MILLIS = 10000;
296
297     private final PollOperation pollOperation = new PollOperation();
298     private final WriteOperation writeOperation = new WriteOperation();
299
300     private volatile long lastQueueMonitorLog = -1;
301
302     /**
303      * We use connection pool to ensure that only single transaction is ongoing per each endpoint. This is especially
304      * important with serial slaves but practice has shown that even many tcp slaves have limited
305      * capability to handle many connections at the same time
306      *
307      * Relevant discussion at the time of implementation:
308      * - https://community.openhab.org/t/modbus-connection-problem/6108/
309      * - https://community.openhab.org/t/connection-pooling-in-modbus-binding/5246/
310      */
311
312     private volatile @Nullable KeyedObjectPool<ModbusSlaveEndpoint, ModbusSlaveConnection> connectionPool;
313     private volatile @Nullable ModbusSlaveConnectionFactoryImpl connectionFactory;
314     private volatile Map<PollTask, ScheduledFuture<?>> scheduledPollTasks = new ConcurrentHashMap<>();
315     /**
316      * Executor for requests
317      */
318     private volatile @Nullable ScheduledExecutorService scheduledThreadPoolExecutor;
319     private volatile @Nullable ScheduledFuture<?> monitorFuture;
320     private volatile Set<ModbusCommunicationInterfaceImpl> communicationInterfaces = new ConcurrentHashSet<>();
321
322     private void constructConnectionPool() {
323         ModbusSlaveConnectionFactoryImpl connectionFactory = new ModbusSlaveConnectionFactoryImpl();
324         connectionFactory.setDefaultPoolConfigurationFactory(endpoint -> {
325             return endpoint.accept(new ModbusSlaveEndpointVisitor<EndpointPoolConfiguration>() {
326
327                 @Override
328                 public @NonNull EndpointPoolConfiguration visit(ModbusTCPSlaveEndpoint modbusIPSlavePoolingKey) {
329                     EndpointPoolConfiguration endpointPoolConfig = new EndpointPoolConfiguration();
330                     endpointPoolConfig.setInterTransactionDelayMillis(DEFAULT_TCP_INTER_TRANSACTION_DELAY_MILLIS);
331                     endpointPoolConfig.setConnectMaxTries(Modbus.DEFAULT_RETRIES);
332                     return endpointPoolConfig;
333                 }
334
335                 @Override
336                 public @NonNull EndpointPoolConfiguration visit(ModbusSerialSlaveEndpoint modbusSerialSlavePoolingKey) {
337                     EndpointPoolConfiguration endpointPoolConfig = new EndpointPoolConfiguration();
338                     // never "disconnect" (close/open serial port) serial connection between borrows
339                     endpointPoolConfig.setReconnectAfterMillis(-1);
340                     endpointPoolConfig.setInterTransactionDelayMillis(DEFAULT_SERIAL_INTER_TRANSACTION_DELAY_MILLIS);
341                     endpointPoolConfig.setConnectMaxTries(Modbus.DEFAULT_RETRIES);
342                     return endpointPoolConfig;
343                 }
344
345                 @Override
346                 public @NonNull EndpointPoolConfiguration visit(ModbusUDPSlaveEndpoint modbusUDPSlavePoolingKey) {
347                     EndpointPoolConfiguration endpointPoolConfig = new EndpointPoolConfiguration();
348                     endpointPoolConfig.setInterTransactionDelayMillis(DEFAULT_TCP_INTER_TRANSACTION_DELAY_MILLIS);
349                     endpointPoolConfig.setConnectMaxTries(Modbus.DEFAULT_RETRIES);
350                     return endpointPoolConfig;
351                 }
352             });
353         });
354
355         GenericKeyedObjectPool<ModbusSlaveEndpoint, ModbusSlaveConnection> genericKeyedObjectPool = new ModbusConnectionPool(
356                 connectionFactory);
357         genericKeyedObjectPool.setSwallowedExceptionListener(new SwallowedExceptionListener() {
358
359             @SuppressWarnings("null")
360             @Override
361             public void onSwallowException(@Nullable Exception e) {
362                 LoggerFactory.getLogger(ModbusManagerImpl.class).warn(
363                         "Connection pool swallowed unexpected exception:{} {}",
364                         Optional.ofNullable(e).map(ex -> ex.getClass().getSimpleName()).orElse(""),
365                         Optional.ofNullable(e).map(ex -> ex.getMessage()).orElse("<null>"), e);
366             }
367         });
368         connectionPool = genericKeyedObjectPool;
369         this.connectionFactory = connectionFactory;
370     }
371
372     private Optional<ModbusSlaveConnection> borrowConnection(ModbusSlaveEndpoint endpoint) {
373         Optional<ModbusSlaveConnection> connection = Optional.empty();
374         KeyedObjectPool<ModbusSlaveEndpoint, ModbusSlaveConnection> pool = connectionPool;
375         if (pool == null) {
376             return connection;
377         }
378         long start = System.currentTimeMillis();
379         try {
380             connection = Optional.ofNullable(pool.borrowObject(endpoint));
381         } catch (Exception e) {
382             logger.warn("Error getting a new connection for endpoint {}. Error was: {} {}", endpoint,
383                     e.getClass().getName(), e.getMessage());
384         }
385         if (connection.isPresent()) {
386             ModbusSlaveConnection slaveConnection = connection.get();
387             if (!slaveConnection.isConnected()) {
388                 logger.trace(
389                         "Received connection which is unconnected, preventing use by returning connection to pool.");
390                 returnConnection(endpoint, connection);
391                 connection = Optional.empty();
392             }
393         }
394         logger.trace("borrowing connection (got {}) for endpoint {} took {} ms", connection, endpoint,
395                 System.currentTimeMillis() - start);
396         return connection;
397     }
398
399     private void invalidate(ModbusSlaveEndpoint endpoint, Optional<ModbusSlaveConnection> connection) {
400         KeyedObjectPool<ModbusSlaveEndpoint, ModbusSlaveConnection> pool = connectionPool;
401         if (pool == null) {
402             return;
403         }
404         long start = System.currentTimeMillis();
405         connection.ifPresent(con -> {
406             try {
407                 pool.invalidateObject(endpoint, con);
408             } catch (Exception e) {
409                 logger.warn("Error invalidating connection in pool for endpoint {}. Error was: {} {}", endpoint,
410                         e.getClass().getName(), e.getMessage(), e);
411             }
412         });
413         logger.trace("invalidating connection for endpoint {} took {} ms", endpoint,
414                 System.currentTimeMillis() - start);
415     }
416
417     private void returnConnection(ModbusSlaveEndpoint endpoint, Optional<ModbusSlaveConnection> connection) {
418         KeyedObjectPool<ModbusSlaveEndpoint, ModbusSlaveConnection> pool = connectionPool;
419         if (pool == null) {
420             return;
421         }
422         long start = System.currentTimeMillis();
423         connection.ifPresent(con -> {
424             try {
425                 pool.returnObject(endpoint, con);
426                 logger.trace("returned connection to pool for endpoint {}", endpoint);
427             } catch (Exception e) {
428                 logger.warn("Error returning connection to pool for endpoint {}. Error was: {} {}", endpoint,
429                         e.getClass().getName(), e.getMessage(), e);
430             }
431         });
432         logger.trace("returning connection for endpoint {} took {} ms", endpoint, System.currentTimeMillis() - start);
433     }
434
435     /**
436      * Establishes connection to the endpoint specified by the task
437      *
438      * In case connection cannot be established, callback is called with {@link ModbusConnectionException}
439      *
440      * @param operationId id appened to log messages for identifying the operation
441      * @param oneOffTask whether this is one-off, or execution of previously scheduled poll
442      * @param task task representing the read or write operation
443      * @return {@link ModbusSlaveConnection} to the endpoint as specified by the task, or empty {@link Optional} when
444      *         connection cannot be established
445      * @throws PollTaskUnregistered
446      */
447     private <R, C extends ModbusResultCallback, F extends ModbusFailureCallback<R>, T extends TaskWithEndpoint<R, C, F>> Optional<ModbusSlaveConnection> getConnection(
448             AggregateStopWatch timer, boolean oneOffTask, @NonNull T task) throws PollTaskUnregistered {
449         KeyedObjectPool<ModbusSlaveEndpoint, ModbusSlaveConnection> connectionPool = this.connectionPool;
450         if (connectionPool == null) {
451             return Optional.empty();
452         }
453         String operationId = timer.operationId;
454         logger.trace(
455                 "Executing task {} (oneOff={})! Waiting for connection. Idle connections for this endpoint: {}, and active {} [operation ID {}]",
456                 task, oneOffTask, connectionPool.getNumIdle(task.getEndpoint()),
457                 connectionPool.getNumActive(task.getEndpoint()), operationId);
458         long connectionBorrowStart = System.currentTimeMillis();
459         ModbusFailureCallback<R> failureCallback = task.getFailureCallback();
460         ModbusSlaveEndpoint endpoint = task.getEndpoint();
461
462         R request = task.getRequest();
463         Optional<ModbusSlaveConnection> connection = timer.connection.timeSupplier(() -> borrowConnection(endpoint));
464         logger.trace("Executing task {} (oneOff={})! Connection received in {} ms [operation ID {}]", task, oneOffTask,
465                 System.currentTimeMillis() - connectionBorrowStart, operationId);
466         if (scheduledThreadPoolExecutor == null) {
467             // manager deactivated
468             timer.connection.timeRunnable(() -> invalidate(endpoint, connection));
469             return Optional.empty();
470         }
471         if (!connection.isPresent()) {
472             logger.warn("Could not connect to endpoint {} -- aborting request {} [operation ID {}]", endpoint, request,
473                     operationId);
474             timer.callback.timeRunnable(
475                     () -> invokeCallbackWithError(request, failureCallback, new ModbusConnectionException(endpoint)));
476         }
477         return connection;
478     }
479
480     private <R> void invokeCallbackWithError(R request, ModbusFailureCallback<R> callback, Exception error) {
481         try {
482             logger.trace("Calling error response callback {} for request {}. Error was {} {}", callback, request,
483                     error.getClass().getName(), error.getMessage());
484             callback.handle(new AsyncModbusFailure<R>(request, error));
485         } finally {
486             logger.trace("Called write response callback {} for request {}. Error was {} {}", callback, request,
487                     error.getClass().getName(), error.getMessage());
488         }
489     }
490
491     private void invokeCallbackWithResponse(ModbusWriteRequestBlueprint request, ModbusWriteCallback callback,
492             org.openhab.io.transport.modbus.ModbusResponse response) {
493         try {
494             logger.trace("Calling write response callback {} for request {}. Response was {}", callback, request,
495                     response);
496             callback.handle(new AsyncModbusWriteResult(request, response));
497         } finally {
498             logger.trace("Called write response callback {} for request {}. Response was {}", callback, request,
499                     response);
500         }
501     }
502
503     private void verifyTaskIsRegistered(PollTask task) throws PollTaskUnregistered {
504         if (!this.scheduledPollTasks.containsKey(task)) {
505             String msg = String.format("Poll task %s is unregistered", task);
506             logger.debug(msg);
507             throw new PollTaskUnregistered(msg);
508         }
509     }
510
511     /**
512      * Execute operation using a retry mechanism.
513      *
514      * This is a helper function for executing read and write operations and handling the exceptions in a common way.
515      *
516      * With some connection types, the connection is reseted (disconnected), and new connection is received from the
517      * pool. This means that potentially other operations queuing for the connection can be executed in-between.
518      *
519      * With some other connection types, the operation is retried without reseting the connection type.
520      *
521      * @param task
522      * @param oneOffTask
523      * @param operation
524      */
525     private <R, C extends ModbusResultCallback, F extends ModbusFailureCallback<R>, T extends TaskWithEndpoint<R, C, F>> void executeOperation(
526             T task, boolean oneOffTask, ModbusOperation<T> operation) {
527         AggregateStopWatch timer = new AggregateStopWatch();
528         timer.total.resume();
529         String operationId = timer.operationId;
530
531         ModbusSlaveConnectionFactoryImpl connectionFactory = this.connectionFactory;
532         if (connectionFactory == null) {
533             // deactivated manager
534             logger.trace("Deactivated manager - aborting operation.");
535             return;
536         }
537
538         logTaskQueueInfo();
539         R request = task.getRequest();
540         ModbusSlaveEndpoint endpoint = task.getEndpoint();
541         F failureCallback = task.getFailureCallback();
542         int maxTries = task.getMaxTries();
543         AtomicReference<@Nullable Exception> lastError = new AtomicReference<>();
544         @SuppressWarnings("null") // since cfg in lambda cannot be really null
545         long retryDelay = Optional.ofNullable(connectionFactory.getEndpointPoolConfiguration(endpoint))
546                 .map(cfg -> cfg.getInterTransactionDelayMillis()).orElse(0L);
547
548         if (maxTries <= 0) {
549             throw new IllegalArgumentException("maxTries should be positive");
550         }
551
552         Optional<ModbusSlaveConnection> connection = Optional.empty();
553         try {
554             logger.trace("Starting new operation with task {}. Trying to get connection [operation ID {}]", task,
555                     operationId);
556             connection = getConnection(timer, oneOffTask, task);
557             logger.trace("Operation with task {}. Got a connection {} [operation ID {}]", task,
558                     connection.isPresent() ? "successfully" : "which was unconnected (connection issue)", operationId);
559             if (!connection.isPresent()) {
560                 // Could not acquire connection, time to abort
561                 // Error logged already, error callback called as well
562                 logger.trace("Initial connection was not successful, aborting. [operation ID {}]", operationId);
563                 return;
564             }
565
566             if (scheduledThreadPoolExecutor == null) {
567                 logger.debug("Manager has been shut down, aborting proecssing request {} [operation ID {}]", request,
568                         operationId);
569                 return;
570             }
571
572             int tryIndex = 0;
573             /**
574              * last execution is tracked such that the endpoint is not spammed on retry. First retry can be executed
575              * right away since getConnection ensures enough time has passed since last transaction. More precisely,
576              * ModbusSlaveConnectionFactoryImpl sleeps on activate() (i.e. before returning connection).
577              */
578             @Nullable
579             Long lastTryMillis = null;
580             while (tryIndex < maxTries) {
581                 logger.trace("Try {} out of {} [operation ID {}]", tryIndex + 1, maxTries, operationId);
582                 if (!connection.isPresent()) {
583                     // Connection was likely reseted with previous try, and connection was not successfully
584                     // re-established. Error has been logged, time to abort.
585                     logger.trace("Try {} out of {}. Connection was not successful, aborting. [operation ID {}]",
586                             tryIndex + 1, maxTries, operationId);
587                     return;
588                 }
589                 if (Thread.interrupted()) {
590                     logger.warn("Thread interrupted. Aborting operation [operation ID {}]", operationId);
591                     return;
592                 }
593                 // Check poll task is still registered (this is all asynchronous)
594                 if (!oneOffTask && task instanceof PollTask) {
595                     verifyTaskIsRegistered((PollTask) task);
596                 }
597                 // Let's ensure that enough time is between the retries
598                 logger.trace(
599                         "Ensuring that enough time passes before retrying again. Sleeping if necessary [operation ID {}]",
600                         operationId);
601                 long slept = ModbusSlaveConnectionFactoryImpl.waitAtleast(lastTryMillis, retryDelay);
602                 logger.trace("Sleep ended, slept {} [operation ID {}]", slept, operationId);
603
604                 boolean willRetry = false;
605                 try {
606                     tryIndex++;
607                     willRetry = tryIndex < maxTries;
608                     operation.accept(timer, task, connection.get());
609                     lastError.set(null);
610                     break;
611                 } catch (IOException e) {
612                     lastError.set(new ModbusSlaveIOExceptionImpl(e));
613                     // IO exception occurred, we re-establish new connection hoping it would fix the issue (e.g.
614                     // broken pipe on write)
615                     if (willRetry) {
616                         logger.warn(
617                                 "Try {} out of {} failed when executing request ({}). Will try again soon. Error was I/O error, so reseting the connection. Error details: {} {} [operation ID {}]",
618                                 tryIndex, maxTries, request, e.getClass().getName(), e.getMessage(), operationId);
619                     } else {
620                         logger.error(
621                                 "Last try {} failed when executing request ({}). Aborting. Error was I/O error, so reseting the connection. Error details: {} {} [operation ID {}]",
622                                 tryIndex, request, e.getClass().getName(), e.getMessage(), operationId);
623                     }
624                     // Invalidate connection, and empty (so that new connection is acquired before new retry)
625                     timer.connection.timeConsumer(c -> invalidate(endpoint, c), connection);
626                     connection = Optional.empty();
627                     continue;
628                 } catch (ModbusIOException e) {
629                     lastError.set(new ModbusSlaveIOExceptionImpl(e));
630                     // IO exception occurred, we re-establish new connection hoping it would fix the issue (e.g.
631                     // broken pipe on write)
632                     if (willRetry) {
633                         logger.warn(
634                                 "Try {} out of {} failed when executing request ({}). Will try again soon. Error was I/O error, so reseting the connection. Error details: {} {} [operation ID {}]",
635                                 tryIndex, maxTries, request, e.getClass().getName(), e.getMessage(), operationId);
636                     } else {
637                         logger.error(
638                                 "Last try {} failed when executing request ({}). Aborting. Error was I/O error, so reseting the connection. Error details: {} {} [operation ID {}]",
639                                 tryIndex, request, e.getClass().getName(), e.getMessage(), operationId);
640                     }
641                     // Invalidate connection, and empty (so that new connection is acquired before new retry)
642                     timer.connection.timeConsumer(c -> invalidate(endpoint, c), connection);
643                     connection = Optional.empty();
644                     continue;
645                 } catch (ModbusSlaveException e) {
646                     lastError.set(new ModbusSlaveErrorResponseExceptionImpl(e));
647                     // Slave returned explicit error response, no reason to re-establish new connection
648                     if (willRetry) {
649                         logger.warn(
650                                 "Try {} out of {} failed when executing request ({}). Will try again soon. Error was: {} {} [operation ID {}]",
651                                 tryIndex, maxTries, request, e.getClass().getName(), e.getMessage(), operationId);
652                     } else {
653                         logger.error(
654                                 "Last try {} failed when executing request ({}). Aborting. Error was: {} {} [operation ID {}]",
655                                 tryIndex, request, e.getClass().getName(), e.getMessage(), operationId);
656                     }
657                     continue;
658                 } catch (ModbusUnexpectedTransactionIdException | ModbusUnexpectedResponseFunctionCodeException
659                         | ModbusUnexpectedResponseSizeException e) {
660                     lastError.set(e);
661                     // transaction error details already logged
662                     if (willRetry) {
663                         logger.warn(
664                                 "Try {} out of {} failed when executing request ({}). Will try again soon. The response did not match the request. Reseting the connection. Error details: {} {} [operation ID {}]",
665                                 tryIndex, maxTries, request, e.getClass().getName(), e.getMessage(), operationId);
666                     } else {
667                         logger.error(
668                                 "Last try {} failed when executing request ({}). Aborting. The response did not match the request. Reseting the connection. Error details: {} {} [operation ID {}]",
669                                 tryIndex, request, e.getClass().getName(), e.getMessage(), operationId);
670                     }
671                     // Invalidate connection, and empty (so that new connection is acquired before new retry)
672                     timer.connection.timeConsumer(c -> invalidate(endpoint, c), connection);
673                     connection = Optional.empty();
674                     continue;
675                 } catch (ModbusException e) {
676                     lastError.set(e);
677                     // Some other (unexpected) exception occurred
678                     if (willRetry) {
679                         logger.warn(
680                                 "Try {} out of {} failed when executing request ({}). Will try again soon. Error was unexpected error, so reseting the connection. Error details: {} {} [operation ID {}]",
681                                 tryIndex, maxTries, request, e.getClass().getName(), e.getMessage(), operationId, e);
682                     } else {
683                         logger.error(
684                                 "Last try {} failed when executing request ({}). Aborting. Error was unexpected error, so reseting the connection. Error details: {} {} [operation ID {}]",
685                                 tryIndex, request, e.getClass().getName(), e.getMessage(), operationId, e);
686                     }
687                     // Invalidate connection, and empty (so that new connection is acquired before new retry)
688                     timer.connection.timeConsumer(c -> invalidate(endpoint, c), connection);
689                     connection = Optional.empty();
690                     continue;
691                 } finally {
692                     lastTryMillis = System.currentTimeMillis();
693                     // Connection was reseted in error handling and needs to be reconnected.
694                     // Try to re-establish connection.
695                     if (willRetry && !connection.isPresent()) {
696                         connection = getConnection(timer, oneOffTask, task);
697                     }
698                 }
699             }
700             Exception exception = lastError.get();
701             if (exception != null) {
702                 // All retries failed with some error
703                 timer.callback.timeRunnable(() -> {
704                     invokeCallbackWithError(request, failureCallback, exception);
705                 });
706             }
707         } catch (PollTaskUnregistered e) {
708             logger.warn("Poll task was unregistered -- not executing/proceeding with the poll: {} [operation ID {}]",
709                     e.getMessage(), operationId);
710             return;
711         } catch (InterruptedException e) {
712             logger.warn("Poll task was canceled -- not executing/proceeding with the poll: {} [operation ID {}]",
713                     e.getMessage(), operationId);
714             // Invalidate connection, and empty (so that new connection is acquired before new retry)
715             timer.connection.timeConsumer(c -> invalidate(endpoint, c), connection);
716             connection = Optional.empty();
717         } finally {
718             timer.connection.timeConsumer(c -> returnConnection(endpoint, c), connection);
719             logger.trace("Connection was returned to the pool, ending operation [operation ID {}]", operationId);
720             timer.suspendAllRunning();
721             logger.debug("Modbus operation ended, timing info: {} [operation ID {}]", timer, operationId);
722         }
723     }
724
725     private class ModbusCommunicationInterfaceImpl implements ModbusCommunicationInterface {
726
727         private volatile ModbusSlaveEndpoint endpoint;
728         private volatile Set<PollTask> pollTasksRegisteredByThisCommInterface = new ConcurrentHashSet<>();
729         private volatile boolean closed;
730         private @Nullable EndpointPoolConfiguration configuration;
731
732         @SuppressWarnings("null")
733         public ModbusCommunicationInterfaceImpl(ModbusSlaveEndpoint endpoint,
734                 @Nullable EndpointPoolConfiguration configuration) {
735             this.endpoint = endpoint;
736             this.configuration = configuration;
737             connectionFactory.setEndpointPoolConfiguration(endpoint, configuration);
738         }
739
740         @Override
741         public Future<?> submitOneTimePoll(ModbusReadRequestBlueprint request, ModbusReadCallback resultCallback,
742                 ModbusFailureCallback<ModbusReadRequestBlueprint> failureCallback) {
743             if (closed) {
744                 throw new IllegalStateException("Communication interface is closed already!");
745             }
746             ScheduledExecutorService executor = scheduledThreadPoolExecutor;
747             Objects.requireNonNull(executor, "Not activated!");
748             long scheduleTime = System.currentTimeMillis();
749             BasicPollTask task = new BasicPollTask(endpoint, request, resultCallback, failureCallback);
750             logger.debug("Scheduling one-off poll task {}", task);
751             Future<?> future = executor.submit(() -> {
752                 long millisInThreadPoolWaiting = System.currentTimeMillis() - scheduleTime;
753                 logger.debug("Will now execute one-off poll task {}, waited in thread pool for {}", task,
754                         millisInThreadPoolWaiting);
755                 executeOperation(task, true, pollOperation);
756             });
757             return future;
758         }
759
760         @Override
761         public PollTask registerRegularPoll(ModbusReadRequestBlueprint request, long pollPeriodMillis,
762                 long initialDelayMillis, ModbusReadCallback resultCallback,
763                 ModbusFailureCallback<ModbusReadRequestBlueprint> failureCallback) {
764             synchronized (ModbusManagerImpl.this) {
765                 if (closed) {
766                     throw new IllegalStateException("Communication interface is closed already!");
767                 }
768                 ScheduledExecutorService executor = scheduledThreadPoolExecutor;
769                 Objects.requireNonNull(executor, "Not activated!");
770                 BasicPollTask task = new BasicPollTask(endpoint, request, resultCallback, failureCallback);
771                 logger.trace("Registering poll task {} with period {} using initial delay {}", task, pollPeriodMillis,
772                         initialDelayMillis);
773                 if (scheduledPollTasks.containsKey(task)) {
774                     logger.trace("Unregistering previous poll task (possibly with different period)");
775                     unregisterRegularPoll(task);
776                 }
777                 ScheduledFuture<?> future = executor.scheduleWithFixedDelay(() -> {
778                     long started = System.currentTimeMillis();
779                     logger.debug("Executing scheduled ({}ms) poll task {}. Current millis: {}", pollPeriodMillis, task,
780                             started);
781                     try {
782                         executeOperation(task, false, pollOperation);
783                     } catch (RuntimeException e) {
784                         // We want to catch all unexpected exceptions since all unhandled exceptions make
785                         // ScheduledExecutorService halt the polling. It is better to print out the exception, and try
786                         // again
787                         // (on next poll cycle)
788                         logger.warn(
789                                 "Execution of scheduled ({}ms) poll task {} failed unexpectedly. Ignoring exception, polling again according to poll interval.",
790                                 pollPeriodMillis, task, e);
791                     }
792                     long finished = System.currentTimeMillis();
793                     logger.debug(
794                             "Execution of scheduled ({}ms) poll task {} finished at {}. Was started at millis: {} (=duration of {} millis)",
795                             pollPeriodMillis, task, finished, started, finished - started);
796                 }, initialDelayMillis, pollPeriodMillis, TimeUnit.MILLISECONDS);
797
798                 scheduledPollTasks.put(task, future);
799                 pollTasksRegisteredByThisCommInterface.add(task);
800                 logger.trace("Registered poll task {} with period {} using initial delay {}", task, pollPeriodMillis,
801                         initialDelayMillis);
802                 return task;
803             }
804         }
805
806         @SuppressWarnings({ "null", "unused" })
807         @Override
808         public boolean unregisterRegularPoll(PollTask task) {
809             synchronized (ModbusManagerImpl.this) {
810                 if (closed) {
811                     // Closed already, nothing to unregister
812                     return false;
813                 }
814                 pollTasksRegisteredByThisCommInterface.remove(task);
815                 ModbusSlaveConnectionFactoryImpl localConnectionFactory = connectionFactory;
816                 Objects.requireNonNull(localConnectionFactory, "Not activated!");
817
818                 // cancel poller
819                 @Nullable
820                 ScheduledFuture<?> future = scheduledPollTasks.remove(task);
821                 if (future == null) {
822                     // No such poll task
823                     logger.warn("Caller tried to unregister nonexisting poll task {}", task);
824                     return false;
825                 }
826                 logger.debug("Unregistering regular poll task {} (interrupting if necessary)", task);
827                 future.cancel(true);
828                 logger.debug("Poll task {} canceled", task);
829                 return true;
830             }
831         }
832
833         @Override
834         public Future<?> submitOneTimeWrite(ModbusWriteRequestBlueprint request, ModbusWriteCallback resultCallback,
835                 ModbusFailureCallback<ModbusWriteRequestBlueprint> failureCallback) {
836             if (closed) {
837                 throw new IllegalStateException("Communication interface is closed already!");
838             }
839             ScheduledExecutorService localScheduledThreadPoolExecutor = scheduledThreadPoolExecutor;
840             Objects.requireNonNull(localScheduledThreadPoolExecutor, "Not activated!");
841             WriteTask task = new BasicWriteTask(endpoint, request, resultCallback, failureCallback);
842             long scheduleTime = System.currentTimeMillis();
843             logger.debug("Scheduling one-off write task {}", task);
844             Future<?> future = localScheduledThreadPoolExecutor.submit(() -> {
845                 long millisInThreadPoolWaiting = System.currentTimeMillis() - scheduleTime;
846                 logger.debug("Will now execute one-off write task {}, waited in thread pool for {}", task,
847                         millisInThreadPoolWaiting);
848                 executeOperation(task, true, writeOperation);
849             });
850             return future;
851         }
852
853         @Override
854         public void close() throws Exception {
855             synchronized (ModbusManagerImpl.this) {
856                 if (closed) {
857                     // Closed already, nothing to unregister
858                     return;
859                 }
860                 // Iterate over all tasks registered by this communication interface, and unregister those
861                 // We copy pollTasksRegisteredByThisCommInterface temporarily so that unregisterRegularPoll can
862                 // remove entries from pollTasksRegisteredByThisCommInterface
863                 Iterable<PollTask> tasksToUnregister = new LinkedList<>(pollTasksRegisteredByThisCommInterface);
864                 for (PollTask task : tasksToUnregister) {
865                     unregisterRegularPoll(task);
866                 }
867                 unregisterCommunicationInterface(this);
868                 closed = true;
869             }
870         }
871
872         @Override
873         public ModbusSlaveEndpoint getEndpoint() {
874             return endpoint;
875         }
876     }
877
878     @Override
879     public ModbusCommunicationInterface newModbusCommunicationInterface(ModbusSlaveEndpoint endpoint,
880             @Nullable EndpointPoolConfiguration configuration) throws IllegalArgumentException {
881         boolean openCommFoundWithSameEndpointDifferentConfig = communicationInterfaces.stream()
882                 .filter(comm -> comm.endpoint.equals(endpoint))
883                 .anyMatch(comm -> comm.configuration != null && !comm.configuration.equals(configuration));
884         if (openCommFoundWithSameEndpointDifferentConfig) {
885             throw new IllegalArgumentException(
886                     "Communication interface is already open with different configuration to this same endpoint");
887         }
888
889         ModbusCommunicationInterfaceImpl comm = new ModbusCommunicationInterfaceImpl(endpoint, configuration);
890         communicationInterfaces.add(comm);
891         return comm;
892     }
893
894     @Override
895     public @Nullable EndpointPoolConfiguration getEndpointPoolConfiguration(ModbusSlaveEndpoint endpoint) {
896         Objects.requireNonNull(connectionFactory, "Not activated!");
897         return connectionFactory.getEndpointPoolConfiguration(endpoint);
898     }
899
900     private void unregisterCommunicationInterface(ModbusCommunicationInterface commInterface) {
901         communicationInterfaces.remove(commInterface);
902         maybeCloseConnections(commInterface.getEndpoint());
903     }
904
905     private void maybeCloseConnections(ModbusSlaveEndpoint endpoint) {
906         boolean lastCommWithThisEndpointWasRemoved = communicationInterfaces.stream()
907                 .filter(comm -> comm.endpoint.equals(endpoint)).count() == 0L;
908         if (lastCommWithThisEndpointWasRemoved) {
909             // Since last communication interface pointing to this endpoint was closed, we can clean up resources
910             // and disconnect connections.
911
912             // Make sure connections to this endpoint are closed when they are returned to pool (which
913             // is usually pretty soon as transactions should be relatively short-lived)
914             ModbusSlaveConnectionFactoryImpl localConnectionFactory = connectionFactory;
915             if (localConnectionFactory != null) {
916                 localConnectionFactory.disconnectOnReturn(endpoint, System.currentTimeMillis());
917                 try {
918                     // Close all idle connections as well (they will be reconnected if necessary on borrow)
919                     if (connectionPool != null) {
920                         connectionPool.clear(endpoint);
921                     }
922                 } catch (Exception e) {
923                     logger.warn("Could not clear endpoint {}. Stack trace follows", endpoint, e);
924                 }
925             }
926         }
927     }
928
929     @Activate
930     protected void activate(Map<String, Object> configProperties) {
931         synchronized (this) {
932             logger.info("Modbus manager activated");
933             if (connectionPool == null) {
934                 constructConnectionPool();
935             }
936             ScheduledExecutorService scheduledThreadPoolExecutor = this.scheduledThreadPoolExecutor;
937             if (scheduledThreadPoolExecutor == null) {
938                 this.scheduledThreadPoolExecutor = scheduledThreadPoolExecutor = ThreadPoolManager
939                         .getScheduledPool(MODBUS_POLLER_THREAD_POOL_NAME);
940             }
941             if (scheduledThreadPoolExecutor.isShutdown()) {
942                 logger.warn("Thread pool is shut down! Aborting activation of ModbusMangerImpl");
943                 throw new IllegalStateException("Thread pool(s) shut down! Aborting activation of ModbusMangerImpl");
944             }
945             monitorFuture = scheduledThreadPoolExecutor.scheduleWithFixedDelay(this::logTaskQueueInfo, 0,
946                     MONITOR_QUEUE_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
947         }
948     }
949
950     @Deactivate
951     protected void deactivate() {
952         synchronized (this) {
953             KeyedObjectPool<ModbusSlaveEndpoint, ModbusSlaveConnection> connectionPool = this.connectionPool;
954             if (connectionPool != null) {
955
956                 for (ModbusCommunicationInterface commInterface : this.communicationInterfaces) {
957                     try {
958                         commInterface.close();
959                     } catch (Exception e) {
960                         logger.warn("Error when closing communication interface", e);
961                     }
962                 }
963
964                 connectionPool.close();
965                 this.connectionPool = connectionPool = null;
966             }
967
968             if (monitorFuture != null) {
969                 monitorFuture.cancel(true);
970                 monitorFuture = null;
971             }
972             // Note that it is not allowed to shutdown the executor, since they will be reused when
973             // when pool is received from ThreadPoolManager is called
974             scheduledThreadPoolExecutor = null;
975             connectionFactory = null;
976             logger.debug("Modbus manager deactivated");
977         }
978     }
979
980     private void logTaskQueueInfo() {
981         synchronized (pollMonitorLogger) {
982             ScheduledExecutorService scheduledThreadPoolExecutor = this.scheduledThreadPoolExecutor;
983             if (scheduledThreadPoolExecutor == null) {
984                 return;
985             }
986             // Avoid excessive spamming with queue monitor when many tasks are executed
987             if (System.currentTimeMillis() - lastQueueMonitorLog < MONITOR_QUEUE_INTERVAL_MILLIS) {
988                 return;
989             }
990             lastQueueMonitorLog = System.currentTimeMillis();
991             pollMonitorLogger.trace("<POLL MONITOR>");
992             this.scheduledPollTasks.forEach((task, future) -> {
993                 pollMonitorLogger.trace(
994                         "POLL MONITOR: scheduled poll task. FC: {}, start {}, length {}, done: {}, canceled: {}, delay: {}. Full task {}",
995                         task.getRequest().getFunctionCode(), task.getRequest().getReference(),
996                         task.getRequest().getDataLength(), future.isDone(), future.isCancelled(),
997                         future.getDelay(TimeUnit.MILLISECONDS), task);
998             });
999             if (scheduledThreadPoolExecutor instanceof ThreadPoolExecutor) {
1000                 ThreadPoolExecutor executor = ((ThreadPoolExecutor) scheduledThreadPoolExecutor);
1001                 pollMonitorLogger.trace(
1002                         "POLL MONITOR: scheduledThreadPoolExecutor queue size: {}, remaining space {}. Active threads {}",
1003                         executor.getQueue().size(), executor.getQueue().remainingCapacity(), executor.getActiveCount());
1004                 if (executor.getQueue().size() >= WARN_QUEUE_SIZE) {
1005                     pollMonitorLogger.warn(
1006                             "Many ({}) tasks queued in scheduledThreadPoolExecutor! This might be sign of bad design or bug in the binding code.",
1007                             executor.getQueue().size());
1008                 }
1009             }
1010
1011             pollMonitorLogger.trace("</POLL MONITOR>");
1012         }
1013     }
1014 }