2 * Copyright (c) 2010-2020 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.io.transport.modbus.internal;
15 import java.io.IOException;
16 import java.util.LinkedList;
18 import java.util.Objects;
19 import java.util.Optional;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.ScheduledFuture;
25 import java.util.concurrent.ThreadPoolExecutor;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicReference;
29 import javax.imageio.IIOException;
31 import org.apache.commons.pool2.KeyedObjectPool;
32 import org.apache.commons.pool2.SwallowedExceptionListener;
33 import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
34 import org.eclipse.jdt.annotation.NonNull;
35 import org.eclipse.jdt.annotation.NonNullByDefault;
36 import org.eclipse.jdt.annotation.Nullable;
37 import org.eclipse.jetty.util.ConcurrentHashSet;
38 import org.openhab.core.common.ThreadPoolManager;
39 import org.openhab.io.transport.modbus.AsyncModbusFailure;
40 import org.openhab.io.transport.modbus.AsyncModbusWriteResult;
41 import org.openhab.io.transport.modbus.ModbusCommunicationInterface;
42 import org.openhab.io.transport.modbus.ModbusFailureCallback;
43 import org.openhab.io.transport.modbus.ModbusManager;
44 import org.openhab.io.transport.modbus.ModbusReadCallback;
45 import org.openhab.io.transport.modbus.ModbusReadRequestBlueprint;
46 import org.openhab.io.transport.modbus.ModbusResultCallback;
47 import org.openhab.io.transport.modbus.ModbusWriteCallback;
48 import org.openhab.io.transport.modbus.ModbusWriteRequestBlueprint;
49 import org.openhab.io.transport.modbus.PollTask;
50 import org.openhab.io.transport.modbus.TaskWithEndpoint;
51 import org.openhab.io.transport.modbus.WriteTask;
52 import org.openhab.io.transport.modbus.endpoint.EndpointPoolConfiguration;
53 import org.openhab.io.transport.modbus.endpoint.ModbusSerialSlaveEndpoint;
54 import org.openhab.io.transport.modbus.endpoint.ModbusSlaveEndpoint;
55 import org.openhab.io.transport.modbus.endpoint.ModbusSlaveEndpointVisitor;
56 import org.openhab.io.transport.modbus.endpoint.ModbusTCPSlaveEndpoint;
57 import org.openhab.io.transport.modbus.endpoint.ModbusUDPSlaveEndpoint;
58 import org.openhab.io.transport.modbus.exception.ModbusConnectionException;
59 import org.openhab.io.transport.modbus.exception.ModbusUnexpectedResponseFunctionCodeException;
60 import org.openhab.io.transport.modbus.exception.ModbusUnexpectedResponseSizeException;
61 import org.openhab.io.transport.modbus.exception.ModbusUnexpectedTransactionIdException;
62 import org.openhab.io.transport.modbus.internal.pooling.ModbusSlaveConnectionFactoryImpl;
63 import org.osgi.service.component.annotations.Activate;
64 import org.osgi.service.component.annotations.Component;
65 import org.osgi.service.component.annotations.Deactivate;
66 import org.slf4j.Logger;
67 import org.slf4j.LoggerFactory;
69 import net.wimpi.modbus.Modbus;
70 import net.wimpi.modbus.ModbusException;
71 import net.wimpi.modbus.ModbusIOException;
72 import net.wimpi.modbus.ModbusSlaveException;
73 import net.wimpi.modbus.io.ModbusTransaction;
74 import net.wimpi.modbus.msg.ModbusRequest;
75 import net.wimpi.modbus.msg.ModbusResponse;
76 import net.wimpi.modbus.net.ModbusSlaveConnection;
79 * Main implementation of ModbusManager
81 * We use connection pool to ensure that only single transaction is ongoing per each endpoint. This is especially
82 * important with serial slaves but practice has shown that even many tcp slaves have limited
83 * capability to handle many connections at the same time
85 * @author Sami Salonen - Initial contribution
87 @Component(service = ModbusManager.class, immediate = true, configurationPid = "transport.modbus")
89 public class ModbusManagerImpl implements ModbusManager {
91 static class PollTaskUnregistered extends Exception {
92 public PollTaskUnregistered(String msg) {
96 private static final long serialVersionUID = 6939730579178506885L;
100 private interface ModbusOperation<T> {
103 * Execute the operation.
105 * All errors should be raised. There should not be any retry mechanism implemented at this level
107 * @param timer aggregate stop watch for performance profiling
108 * @param task task to execute
109 * @param connection connection to use
111 * @throws IIOException on generic IO errors
112 * @throws ModbusException on Modbus protocol errors (e.g. ModbusIOException on I/O, ModbusSlaveException on
113 * slave exception responses)
114 * @throws ModbusUnexpectedTransactionIdException when transaction IDs of the request and
115 * response do not match
116 * @throws ModbusUnexpectedResponseFunctionCodeException when response function code does not match the request
117 * (ill-behaving slave)
118 * @throws ModbusUnexpectedResponseSizeException when data length of the response and request do not match
120 public void accept(AggregateStopWatch timer, T task, ModbusSlaveConnection connection)
121 throws ModbusException, IIOException, ModbusUnexpectedTransactionIdException,
122 ModbusUnexpectedResponseFunctionCodeException, ModbusUnexpectedResponseSizeException;
126 * Check that transaction id of the response and request match
128 * @param response response from the slave corresponding to request
129 * @param libRequest modbus request
130 * @param operationId operation id for logging
131 * @throws ModbusUnexpectedTransactionIdException when transaction IDs of the request and
132 * response do not match
134 private <R> void checkTransactionId(ModbusResponse response, ModbusRequest libRequest, String operationId)
135 throws ModbusUnexpectedTransactionIdException {
136 // Compare request and response transaction ID. NOTE: ModbusTransaction.getTransactionID() is static and
138 if ((response.getTransactionID() != libRequest.getTransactionID()) && !response.isHeadless()) {
139 throw new ModbusUnexpectedTransactionIdException(libRequest.getTransactionID(),
140 response.getTransactionID());
145 * Check that function code of the response and request match
147 * @param response response from the slave corresponding to request
148 * @param libRequest modbus request
149 * @param operationId operation id for logging
150 * @throws ModbusUnexpectedResponseFunctionCodeException when response function code does not match the request
151 * (ill-behaving slave)
153 private <R> void checkFunctionCode(ModbusResponse response, ModbusRequest libRequest, String operationId)
154 throws ModbusUnexpectedResponseFunctionCodeException {
155 if ((response.getFunctionCode() != libRequest.getFunctionCode())) {
156 throw new ModbusUnexpectedResponseFunctionCodeException(libRequest.getTransactionID(),
157 response.getTransactionID());
162 * Check that number of bits/registers/discrete inputs is not less than what was requested.
164 * According to modbus protocol, we should get always get always equal amount of registers data back as response.
165 * With coils and discrete inputs, we can get more since responses are in 8 bit chunks.
167 * However, in no case we expect less items in response.
169 * This is to identify clearly invalid responses which might cause problems downstream when using the data.
171 * @param response response response from the slave corresponding to request
172 * @param request modbus request
173 * @param operationId operation id for logging
174 * @throws ModbusUnexpectedResponseSizeException when data length of the response and request do not match
176 private <R> void checkResponseSize(ModbusResponse response, ModbusReadRequestBlueprint request, String operationId)
177 throws ModbusUnexpectedResponseSizeException {
178 final int responseCount = ModbusLibraryWrapper.getNumberOfItemsInResponse(response, request);
179 if (responseCount < request.getDataLength()) {
180 throw new ModbusUnexpectedResponseSizeException(request.getDataLength(), responseCount);
185 * Implementation for the PollTask operation
187 * @author Sami Salonen - Initial contribution
190 private class PollOperation implements ModbusOperation<PollTask> {
192 public void accept(AggregateStopWatch timer, PollTask task, ModbusSlaveConnection connection)
193 throws ModbusException, ModbusUnexpectedTransactionIdException,
194 ModbusUnexpectedResponseFunctionCodeException, ModbusUnexpectedResponseSizeException {
195 ModbusSlaveEndpoint endpoint = task.getEndpoint();
196 ModbusReadRequestBlueprint request = task.getRequest();
197 ModbusReadCallback callback = task.getResultCallback();
198 String operationId = timer.operationId;
200 ModbusTransaction transaction = ModbusLibraryWrapper.createTransactionForEndpoint(endpoint, connection);
201 ModbusRequest libRequest = ModbusLibraryWrapper.createRequest(request);
202 transaction.setRequest(libRequest);
204 logger.trace("Going execute transaction with request request (FC={}): {} [operation ID {}]",
205 request.getFunctionCode(), libRequest.getHexMessage(), operationId);
206 // Might throw ModbusIOException (I/O error) or ModbusSlaveException (explicit exception response from
208 timer.transaction.timeRunnableWithModbusException(() -> transaction.execute());
209 ModbusResponse response = transaction.getResponse();
210 logger.trace("Response for read request (FC={}, transaction ID={}): {} [operation ID {}]",
211 response.getFunctionCode(), response.getTransactionID(), response.getHexMessage(), operationId);
212 checkTransactionId(response, libRequest, operationId);
213 checkFunctionCode(response, libRequest, operationId);
214 checkResponseSize(response, request, operationId);
216 .timeRunnable(() -> ModbusLibraryWrapper.invokeCallbackWithResponse(request, callback, response));
221 * Implementation for WriteTask operation
223 * @author Sami Salonen - Initial contribution
226 private class WriteOperation implements ModbusOperation<WriteTask> {
228 public void accept(AggregateStopWatch timer, WriteTask task, ModbusSlaveConnection connection)
229 throws ModbusException, ModbusUnexpectedTransactionIdException,
230 ModbusUnexpectedResponseFunctionCodeException {
231 ModbusSlaveEndpoint endpoint = task.getEndpoint();
232 ModbusWriteRequestBlueprint request = task.getRequest();
234 ModbusWriteCallback callback = task.getResultCallback();
235 String operationId = timer.operationId;
237 ModbusTransaction transaction = ModbusLibraryWrapper.createTransactionForEndpoint(endpoint, connection);
238 ModbusRequest libRequest = ModbusLibraryWrapper.createRequest(request);
239 transaction.setRequest(libRequest);
241 logger.trace("Going execute transaction with read request (FC={}): {} [operation ID {}]",
242 request.getFunctionCode(), libRequest.getHexMessage(), operationId);
244 // Might throw ModbusIOException (I/O error) or ModbusSlaveException (explicit exception response from
246 timer.transaction.timeRunnableWithModbusException(() -> transaction.execute());
247 ModbusResponse response = transaction.getResponse();
248 logger.trace("Response for write request (FC={}, transaction ID={}): {} [operation ID {}]",
249 response.getFunctionCode(), response.getTransactionID(), response.getHexMessage(), operationId);
250 checkTransactionId(response, libRequest, operationId);
251 checkFunctionCode(response, libRequest, operationId);
252 timer.callback.timeRunnable(
253 () -> invokeCallbackWithResponse(request, callback, new ModbusResponseImpl(response)));
257 private final Logger logger = LoggerFactory.getLogger(ModbusManagerImpl.class);
258 private final Logger pollMonitorLogger = LoggerFactory
259 .getLogger(ModbusManagerImpl.class.getName() + ".PollMonitor");
262 * Time to wait between connection passive+borrow, i.e. time to wait between
264 * Default 60ms for TCP slaves, Siemens S7 1212 PLC couldn't handle faster
265 * requests with default settings.
267 public static final long DEFAULT_TCP_INTER_TRANSACTION_DELAY_MILLIS = 60;
270 * Time to wait between connection passive+borrow, i.e. time to wait between
272 * Default 35ms for Serial slaves, motivation discussed
273 * here https://community.openhab.org/t/connection-pooling-in-modbus-binding/5246/111?u=ssalonen
275 public static final long DEFAULT_SERIAL_INTER_TRANSACTION_DELAY_MILLIS = 35;
278 * Thread naming for modbus read & write requests. Also used by the monitor thread
280 private static final String MODBUS_POLLER_THREAD_POOL_NAME = "modbusManagerPollerThreadPool";
283 * Log message with WARN level if the task queues exceed this limit.
285 * If the queues grow too large, it might be an issue with consumer of the ModbusManager.
287 * You can generate large queue by spamming ModbusManager with one-off read or writes (submitOnTimePoll or
288 * submitOneTimeWrite).
290 * Note that there is no issue registering many regular polls, those do not "queue" the same way.
292 * Presumably slow callbacks can increase queue size with callbackThreadPool
294 private static final long WARN_QUEUE_SIZE = 500;
295 private static final long MONITOR_QUEUE_INTERVAL_MILLIS = 10000;
297 private final PollOperation pollOperation = new PollOperation();
298 private final WriteOperation writeOperation = new WriteOperation();
300 private volatile long lastQueueMonitorLog = -1;
303 * We use connection pool to ensure that only single transaction is ongoing per each endpoint. This is especially
304 * important with serial slaves but practice has shown that even many tcp slaves have limited
305 * capability to handle many connections at the same time
307 * Relevant discussion at the time of implementation:
308 * - https://community.openhab.org/t/modbus-connection-problem/6108/
309 * - https://community.openhab.org/t/connection-pooling-in-modbus-binding/5246/
312 private volatile @Nullable KeyedObjectPool<ModbusSlaveEndpoint, ModbusSlaveConnection> connectionPool;
313 private volatile @Nullable ModbusSlaveConnectionFactoryImpl connectionFactory;
314 private volatile Map<PollTask, ScheduledFuture<?>> scheduledPollTasks = new ConcurrentHashMap<>();
316 * Executor for requests
318 private volatile @Nullable ScheduledExecutorService scheduledThreadPoolExecutor;
319 private volatile @Nullable ScheduledFuture<?> monitorFuture;
320 private volatile Set<ModbusCommunicationInterfaceImpl> communicationInterfaces = new ConcurrentHashSet<>();
322 private void constructConnectionPool() {
323 ModbusSlaveConnectionFactoryImpl connectionFactory = new ModbusSlaveConnectionFactoryImpl();
324 connectionFactory.setDefaultPoolConfigurationFactory(endpoint -> {
325 return endpoint.accept(new ModbusSlaveEndpointVisitor<EndpointPoolConfiguration>() {
328 public @NonNull EndpointPoolConfiguration visit(ModbusTCPSlaveEndpoint modbusIPSlavePoolingKey) {
329 EndpointPoolConfiguration endpointPoolConfig = new EndpointPoolConfiguration();
330 endpointPoolConfig.setInterTransactionDelayMillis(DEFAULT_TCP_INTER_TRANSACTION_DELAY_MILLIS);
331 endpointPoolConfig.setConnectMaxTries(Modbus.DEFAULT_RETRIES);
332 return endpointPoolConfig;
336 public @NonNull EndpointPoolConfiguration visit(ModbusSerialSlaveEndpoint modbusSerialSlavePoolingKey) {
337 EndpointPoolConfiguration endpointPoolConfig = new EndpointPoolConfiguration();
338 // never "disconnect" (close/open serial port) serial connection between borrows
339 endpointPoolConfig.setReconnectAfterMillis(-1);
340 endpointPoolConfig.setInterTransactionDelayMillis(DEFAULT_SERIAL_INTER_TRANSACTION_DELAY_MILLIS);
341 endpointPoolConfig.setConnectMaxTries(Modbus.DEFAULT_RETRIES);
342 return endpointPoolConfig;
346 public @NonNull EndpointPoolConfiguration visit(ModbusUDPSlaveEndpoint modbusUDPSlavePoolingKey) {
347 EndpointPoolConfiguration endpointPoolConfig = new EndpointPoolConfiguration();
348 endpointPoolConfig.setInterTransactionDelayMillis(DEFAULT_TCP_INTER_TRANSACTION_DELAY_MILLIS);
349 endpointPoolConfig.setConnectMaxTries(Modbus.DEFAULT_RETRIES);
350 return endpointPoolConfig;
355 GenericKeyedObjectPool<ModbusSlaveEndpoint, ModbusSlaveConnection> genericKeyedObjectPool = new ModbusConnectionPool(
357 genericKeyedObjectPool.setSwallowedExceptionListener(new SwallowedExceptionListener() {
359 @SuppressWarnings("null")
361 public void onSwallowException(@Nullable Exception e) {
362 LoggerFactory.getLogger(ModbusManagerImpl.class).warn(
363 "Connection pool swallowed unexpected exception:{} {}",
364 Optional.ofNullable(e).map(ex -> ex.getClass().getSimpleName()).orElse(""),
365 Optional.ofNullable(e).map(ex -> ex.getMessage()).orElse("<null>"), e);
368 connectionPool = genericKeyedObjectPool;
369 this.connectionFactory = connectionFactory;
372 private Optional<ModbusSlaveConnection> borrowConnection(ModbusSlaveEndpoint endpoint) {
373 Optional<ModbusSlaveConnection> connection = Optional.empty();
374 KeyedObjectPool<ModbusSlaveEndpoint, ModbusSlaveConnection> pool = connectionPool;
378 long start = System.currentTimeMillis();
380 connection = Optional.ofNullable(pool.borrowObject(endpoint));
381 } catch (Exception e) {
382 logger.warn("Error getting a new connection for endpoint {}. Error was: {} {}", endpoint,
383 e.getClass().getName(), e.getMessage());
385 if (connection.isPresent()) {
386 ModbusSlaveConnection slaveConnection = connection.get();
387 if (!slaveConnection.isConnected()) {
389 "Received connection which is unconnected, preventing use by returning connection to pool.");
390 returnConnection(endpoint, connection);
391 connection = Optional.empty();
394 logger.trace("borrowing connection (got {}) for endpoint {} took {} ms", connection, endpoint,
395 System.currentTimeMillis() - start);
399 private void invalidate(ModbusSlaveEndpoint endpoint, Optional<ModbusSlaveConnection> connection) {
400 KeyedObjectPool<ModbusSlaveEndpoint, ModbusSlaveConnection> pool = connectionPool;
404 long start = System.currentTimeMillis();
405 connection.ifPresent(con -> {
407 pool.invalidateObject(endpoint, con);
408 } catch (Exception e) {
409 logger.warn("Error invalidating connection in pool for endpoint {}. Error was: {} {}", endpoint,
410 e.getClass().getName(), e.getMessage(), e);
413 logger.trace("invalidating connection for endpoint {} took {} ms", endpoint,
414 System.currentTimeMillis() - start);
417 private void returnConnection(ModbusSlaveEndpoint endpoint, Optional<ModbusSlaveConnection> connection) {
418 KeyedObjectPool<ModbusSlaveEndpoint, ModbusSlaveConnection> pool = connectionPool;
422 long start = System.currentTimeMillis();
423 connection.ifPresent(con -> {
425 pool.returnObject(endpoint, con);
426 logger.trace("returned connection to pool for endpoint {}", endpoint);
427 } catch (Exception e) {
428 logger.warn("Error returning connection to pool for endpoint {}. Error was: {} {}", endpoint,
429 e.getClass().getName(), e.getMessage(), e);
432 logger.trace("returning connection for endpoint {} took {} ms", endpoint, System.currentTimeMillis() - start);
436 * Establishes connection to the endpoint specified by the task
438 * In case connection cannot be established, callback is called with {@link ModbusConnectionException}
440 * @param operationId id appened to log messages for identifying the operation
441 * @param oneOffTask whether this is one-off, or execution of previously scheduled poll
442 * @param task task representing the read or write operation
443 * @return {@link ModbusSlaveConnection} to the endpoint as specified by the task, or empty {@link Optional} when
444 * connection cannot be established
445 * @throws PollTaskUnregistered
447 private <R, C extends ModbusResultCallback, F extends ModbusFailureCallback<R>, T extends TaskWithEndpoint<R, C, F>> Optional<ModbusSlaveConnection> getConnection(
448 AggregateStopWatch timer, boolean oneOffTask, @NonNull T task) throws PollTaskUnregistered {
449 KeyedObjectPool<ModbusSlaveEndpoint, ModbusSlaveConnection> connectionPool = this.connectionPool;
450 if (connectionPool == null) {
451 return Optional.empty();
453 String operationId = timer.operationId;
455 "Executing task {} (oneOff={})! Waiting for connection. Idle connections for this endpoint: {}, and active {} [operation ID {}]",
456 task, oneOffTask, connectionPool.getNumIdle(task.getEndpoint()),
457 connectionPool.getNumActive(task.getEndpoint()), operationId);
458 long connectionBorrowStart = System.currentTimeMillis();
459 ModbusFailureCallback<R> failureCallback = task.getFailureCallback();
460 ModbusSlaveEndpoint endpoint = task.getEndpoint();
462 R request = task.getRequest();
463 Optional<ModbusSlaveConnection> connection = timer.connection.timeSupplier(() -> borrowConnection(endpoint));
464 logger.trace("Executing task {} (oneOff={})! Connection received in {} ms [operation ID {}]", task, oneOffTask,
465 System.currentTimeMillis() - connectionBorrowStart, operationId);
466 if (scheduledThreadPoolExecutor == null) {
467 // manager deactivated
468 timer.connection.timeRunnable(() -> invalidate(endpoint, connection));
469 return Optional.empty();
471 if (!connection.isPresent()) {
472 logger.warn("Could not connect to endpoint {} -- aborting request {} [operation ID {}]", endpoint, request,
474 timer.callback.timeRunnable(
475 () -> invokeCallbackWithError(request, failureCallback, new ModbusConnectionException(endpoint)));
480 private <R> void invokeCallbackWithError(R request, ModbusFailureCallback<R> callback, Exception error) {
482 logger.trace("Calling error response callback {} for request {}. Error was {} {}", callback, request,
483 error.getClass().getName(), error.getMessage());
484 callback.handle(new AsyncModbusFailure<R>(request, error));
486 logger.trace("Called write response callback {} for request {}. Error was {} {}", callback, request,
487 error.getClass().getName(), error.getMessage());
491 private void invokeCallbackWithResponse(ModbusWriteRequestBlueprint request, ModbusWriteCallback callback,
492 org.openhab.io.transport.modbus.ModbusResponse response) {
494 logger.trace("Calling write response callback {} for request {}. Response was {}", callback, request,
496 callback.handle(new AsyncModbusWriteResult(request, response));
498 logger.trace("Called write response callback {} for request {}. Response was {}", callback, request,
503 private void verifyTaskIsRegistered(PollTask task) throws PollTaskUnregistered {
504 if (!this.scheduledPollTasks.containsKey(task)) {
505 String msg = String.format("Poll task %s is unregistered", task);
507 throw new PollTaskUnregistered(msg);
512 * Execute operation using a retry mechanism.
514 * This is a helper function for executing read and write operations and handling the exceptions in a common way.
516 * With some connection types, the connection is reseted (disconnected), and new connection is received from the
517 * pool. This means that potentially other operations queuing for the connection can be executed in-between.
519 * With some other connection types, the operation is retried without reseting the connection type.
525 private <R, C extends ModbusResultCallback, F extends ModbusFailureCallback<R>, T extends TaskWithEndpoint<R, C, F>> void executeOperation(
526 T task, boolean oneOffTask, ModbusOperation<T> operation) {
527 AggregateStopWatch timer = new AggregateStopWatch();
528 timer.total.resume();
529 String operationId = timer.operationId;
531 ModbusSlaveConnectionFactoryImpl connectionFactory = this.connectionFactory;
532 if (connectionFactory == null) {
533 // deactivated manager
534 logger.trace("Deactivated manager - aborting operation.");
539 R request = task.getRequest();
540 ModbusSlaveEndpoint endpoint = task.getEndpoint();
541 F failureCallback = task.getFailureCallback();
542 int maxTries = task.getMaxTries();
543 AtomicReference<@Nullable Exception> lastError = new AtomicReference<>();
544 @SuppressWarnings("null") // since cfg in lambda cannot be really null
545 long retryDelay = Optional.ofNullable(connectionFactory.getEndpointPoolConfiguration(endpoint))
546 .map(cfg -> cfg.getInterTransactionDelayMillis()).orElse(0L);
549 throw new IllegalArgumentException("maxTries should be positive");
552 Optional<ModbusSlaveConnection> connection = Optional.empty();
554 logger.trace("Starting new operation with task {}. Trying to get connection [operation ID {}]", task,
556 connection = getConnection(timer, oneOffTask, task);
557 logger.trace("Operation with task {}. Got a connection {} [operation ID {}]", task,
558 connection.isPresent() ? "successfully" : "which was unconnected (connection issue)", operationId);
559 if (!connection.isPresent()) {
560 // Could not acquire connection, time to abort
561 // Error logged already, error callback called as well
562 logger.trace("Initial connection was not successful, aborting. [operation ID {}]", operationId);
566 if (scheduledThreadPoolExecutor == null) {
567 logger.debug("Manager has been shut down, aborting proecssing request {} [operation ID {}]", request,
574 * last execution is tracked such that the endpoint is not spammed on retry. First retry can be executed
575 * right away since getConnection ensures enough time has passed since last transaction. More precisely,
576 * ModbusSlaveConnectionFactoryImpl sleeps on activate() (i.e. before returning connection).
579 Long lastTryMillis = null;
580 while (tryIndex < maxTries) {
581 logger.trace("Try {} out of {} [operation ID {}]", tryIndex + 1, maxTries, operationId);
582 if (!connection.isPresent()) {
583 // Connection was likely reseted with previous try, and connection was not successfully
584 // re-established. Error has been logged, time to abort.
585 logger.trace("Try {} out of {}. Connection was not successful, aborting. [operation ID {}]",
586 tryIndex + 1, maxTries, operationId);
589 if (Thread.interrupted()) {
590 logger.warn("Thread interrupted. Aborting operation [operation ID {}]", operationId);
593 // Check poll task is still registered (this is all asynchronous)
594 if (!oneOffTask && task instanceof PollTask) {
595 verifyTaskIsRegistered((PollTask) task);
597 // Let's ensure that enough time is between the retries
599 "Ensuring that enough time passes before retrying again. Sleeping if necessary [operation ID {}]",
601 long slept = ModbusSlaveConnectionFactoryImpl.waitAtleast(lastTryMillis, retryDelay);
602 logger.trace("Sleep ended, slept {} [operation ID {}]", slept, operationId);
604 boolean willRetry = false;
607 willRetry = tryIndex < maxTries;
608 operation.accept(timer, task, connection.get());
611 } catch (IOException e) {
612 lastError.set(new ModbusSlaveIOExceptionImpl(e));
613 // IO exception occurred, we re-establish new connection hoping it would fix the issue (e.g.
614 // broken pipe on write)
617 "Try {} out of {} failed when executing request ({}). Will try again soon. Error was I/O error, so reseting the connection. Error details: {} {} [operation ID {}]",
618 tryIndex, maxTries, request, e.getClass().getName(), e.getMessage(), operationId);
621 "Last try {} failed when executing request ({}). Aborting. Error was I/O error, so reseting the connection. Error details: {} {} [operation ID {}]",
622 tryIndex, request, e.getClass().getName(), e.getMessage(), operationId);
624 // Invalidate connection, and empty (so that new connection is acquired before new retry)
625 timer.connection.timeConsumer(c -> invalidate(endpoint, c), connection);
626 connection = Optional.empty();
628 } catch (ModbusIOException e) {
629 lastError.set(new ModbusSlaveIOExceptionImpl(e));
630 // IO exception occurred, we re-establish new connection hoping it would fix the issue (e.g.
631 // broken pipe on write)
634 "Try {} out of {} failed when executing request ({}). Will try again soon. Error was I/O error, so reseting the connection. Error details: {} {} [operation ID {}]",
635 tryIndex, maxTries, request, e.getClass().getName(), e.getMessage(), operationId);
638 "Last try {} failed when executing request ({}). Aborting. Error was I/O error, so reseting the connection. Error details: {} {} [operation ID {}]",
639 tryIndex, request, e.getClass().getName(), e.getMessage(), operationId);
641 // Invalidate connection, and empty (so that new connection is acquired before new retry)
642 timer.connection.timeConsumer(c -> invalidate(endpoint, c), connection);
643 connection = Optional.empty();
645 } catch (ModbusSlaveException e) {
646 lastError.set(new ModbusSlaveErrorResponseExceptionImpl(e));
647 // Slave returned explicit error response, no reason to re-establish new connection
650 "Try {} out of {} failed when executing request ({}). Will try again soon. Error was: {} {} [operation ID {}]",
651 tryIndex, maxTries, request, e.getClass().getName(), e.getMessage(), operationId);
654 "Last try {} failed when executing request ({}). Aborting. Error was: {} {} [operation ID {}]",
655 tryIndex, request, e.getClass().getName(), e.getMessage(), operationId);
658 } catch (ModbusUnexpectedTransactionIdException | ModbusUnexpectedResponseFunctionCodeException
659 | ModbusUnexpectedResponseSizeException e) {
661 // transaction error details already logged
664 "Try {} out of {} failed when executing request ({}). Will try again soon. The response did not match the request. Reseting the connection. Error details: {} {} [operation ID {}]",
665 tryIndex, maxTries, request, e.getClass().getName(), e.getMessage(), operationId);
668 "Last try {} failed when executing request ({}). Aborting. The response did not match the request. Reseting the connection. Error details: {} {} [operation ID {}]",
669 tryIndex, request, e.getClass().getName(), e.getMessage(), operationId);
671 // Invalidate connection, and empty (so that new connection is acquired before new retry)
672 timer.connection.timeConsumer(c -> invalidate(endpoint, c), connection);
673 connection = Optional.empty();
675 } catch (ModbusException e) {
677 // Some other (unexpected) exception occurred
680 "Try {} out of {} failed when executing request ({}). Will try again soon. Error was unexpected error, so reseting the connection. Error details: {} {} [operation ID {}]",
681 tryIndex, maxTries, request, e.getClass().getName(), e.getMessage(), operationId, e);
684 "Last try {} failed when executing request ({}). Aborting. Error was unexpected error, so reseting the connection. Error details: {} {} [operation ID {}]",
685 tryIndex, request, e.getClass().getName(), e.getMessage(), operationId, e);
687 // Invalidate connection, and empty (so that new connection is acquired before new retry)
688 timer.connection.timeConsumer(c -> invalidate(endpoint, c), connection);
689 connection = Optional.empty();
692 lastTryMillis = System.currentTimeMillis();
693 // Connection was reseted in error handling and needs to be reconnected.
694 // Try to re-establish connection.
695 if (willRetry && !connection.isPresent()) {
696 connection = getConnection(timer, oneOffTask, task);
700 Exception exception = lastError.get();
701 if (exception != null) {
702 // All retries failed with some error
703 timer.callback.timeRunnable(() -> {
704 invokeCallbackWithError(request, failureCallback, exception);
707 } catch (PollTaskUnregistered e) {
708 logger.warn("Poll task was unregistered -- not executing/proceeding with the poll: {} [operation ID {}]",
709 e.getMessage(), operationId);
711 } catch (InterruptedException e) {
712 logger.warn("Poll task was canceled -- not executing/proceeding with the poll: {} [operation ID {}]",
713 e.getMessage(), operationId);
714 // Invalidate connection, and empty (so that new connection is acquired before new retry)
715 timer.connection.timeConsumer(c -> invalidate(endpoint, c), connection);
716 connection = Optional.empty();
718 timer.connection.timeConsumer(c -> returnConnection(endpoint, c), connection);
719 logger.trace("Connection was returned to the pool, ending operation [operation ID {}]", operationId);
720 timer.suspendAllRunning();
721 logger.debug("Modbus operation ended, timing info: {} [operation ID {}]", timer, operationId);
725 private class ModbusCommunicationInterfaceImpl implements ModbusCommunicationInterface {
727 private volatile ModbusSlaveEndpoint endpoint;
728 private volatile Set<PollTask> pollTasksRegisteredByThisCommInterface = new ConcurrentHashSet<>();
729 private volatile boolean closed;
730 private @Nullable EndpointPoolConfiguration configuration;
732 @SuppressWarnings("null")
733 public ModbusCommunicationInterfaceImpl(ModbusSlaveEndpoint endpoint,
734 @Nullable EndpointPoolConfiguration configuration) {
735 this.endpoint = endpoint;
736 this.configuration = configuration;
737 connectionFactory.setEndpointPoolConfiguration(endpoint, configuration);
741 public Future<?> submitOneTimePoll(ModbusReadRequestBlueprint request, ModbusReadCallback resultCallback,
742 ModbusFailureCallback<ModbusReadRequestBlueprint> failureCallback) {
744 throw new IllegalStateException("Communication interface is closed already!");
746 ScheduledExecutorService executor = scheduledThreadPoolExecutor;
747 Objects.requireNonNull(executor, "Not activated!");
748 long scheduleTime = System.currentTimeMillis();
749 BasicPollTask task = new BasicPollTask(endpoint, request, resultCallback, failureCallback);
750 logger.debug("Scheduling one-off poll task {}", task);
751 Future<?> future = executor.submit(() -> {
752 long millisInThreadPoolWaiting = System.currentTimeMillis() - scheduleTime;
753 logger.debug("Will now execute one-off poll task {}, waited in thread pool for {}", task,
754 millisInThreadPoolWaiting);
755 executeOperation(task, true, pollOperation);
761 public PollTask registerRegularPoll(ModbusReadRequestBlueprint request, long pollPeriodMillis,
762 long initialDelayMillis, ModbusReadCallback resultCallback,
763 ModbusFailureCallback<ModbusReadRequestBlueprint> failureCallback) {
764 synchronized (ModbusManagerImpl.this) {
766 throw new IllegalStateException("Communication interface is closed already!");
768 ScheduledExecutorService executor = scheduledThreadPoolExecutor;
769 Objects.requireNonNull(executor, "Not activated!");
770 BasicPollTask task = new BasicPollTask(endpoint, request, resultCallback, failureCallback);
771 logger.trace("Registering poll task {} with period {} using initial delay {}", task, pollPeriodMillis,
773 if (scheduledPollTasks.containsKey(task)) {
774 logger.trace("Unregistering previous poll task (possibly with different period)");
775 unregisterRegularPoll(task);
777 ScheduledFuture<?> future = executor.scheduleWithFixedDelay(() -> {
778 long started = System.currentTimeMillis();
779 logger.debug("Executing scheduled ({}ms) poll task {}. Current millis: {}", pollPeriodMillis, task,
782 executeOperation(task, false, pollOperation);
783 } catch (RuntimeException e) {
784 // We want to catch all unexpected exceptions since all unhandled exceptions make
785 // ScheduledExecutorService halt the polling. It is better to print out the exception, and try
787 // (on next poll cycle)
789 "Execution of scheduled ({}ms) poll task {} failed unexpectedly. Ignoring exception, polling again according to poll interval.",
790 pollPeriodMillis, task, e);
792 long finished = System.currentTimeMillis();
794 "Execution of scheduled ({}ms) poll task {} finished at {}. Was started at millis: {} (=duration of {} millis)",
795 pollPeriodMillis, task, finished, started, finished - started);
796 }, initialDelayMillis, pollPeriodMillis, TimeUnit.MILLISECONDS);
798 scheduledPollTasks.put(task, future);
799 pollTasksRegisteredByThisCommInterface.add(task);
800 logger.trace("Registered poll task {} with period {} using initial delay {}", task, pollPeriodMillis,
806 @SuppressWarnings({ "null", "unused" })
808 public boolean unregisterRegularPoll(PollTask task) {
809 synchronized (ModbusManagerImpl.this) {
811 // Closed already, nothing to unregister
814 pollTasksRegisteredByThisCommInterface.remove(task);
815 ModbusSlaveConnectionFactoryImpl localConnectionFactory = connectionFactory;
816 Objects.requireNonNull(localConnectionFactory, "Not activated!");
820 ScheduledFuture<?> future = scheduledPollTasks.remove(task);
821 if (future == null) {
823 logger.warn("Caller tried to unregister nonexisting poll task {}", task);
826 logger.debug("Unregistering regular poll task {} (interrupting if necessary)", task);
828 logger.debug("Poll task {} canceled", task);
834 public Future<?> submitOneTimeWrite(ModbusWriteRequestBlueprint request, ModbusWriteCallback resultCallback,
835 ModbusFailureCallback<ModbusWriteRequestBlueprint> failureCallback) {
837 throw new IllegalStateException("Communication interface is closed already!");
839 ScheduledExecutorService localScheduledThreadPoolExecutor = scheduledThreadPoolExecutor;
840 Objects.requireNonNull(localScheduledThreadPoolExecutor, "Not activated!");
841 WriteTask task = new BasicWriteTask(endpoint, request, resultCallback, failureCallback);
842 long scheduleTime = System.currentTimeMillis();
843 logger.debug("Scheduling one-off write task {}", task);
844 Future<?> future = localScheduledThreadPoolExecutor.submit(() -> {
845 long millisInThreadPoolWaiting = System.currentTimeMillis() - scheduleTime;
846 logger.debug("Will now execute one-off write task {}, waited in thread pool for {}", task,
847 millisInThreadPoolWaiting);
848 executeOperation(task, true, writeOperation);
854 public void close() throws Exception {
855 synchronized (ModbusManagerImpl.this) {
857 // Closed already, nothing to unregister
860 // Iterate over all tasks registered by this communication interface, and unregister those
861 // We copy pollTasksRegisteredByThisCommInterface temporarily so that unregisterRegularPoll can
862 // remove entries from pollTasksRegisteredByThisCommInterface
863 Iterable<PollTask> tasksToUnregister = new LinkedList<>(pollTasksRegisteredByThisCommInterface);
864 for (PollTask task : tasksToUnregister) {
865 unregisterRegularPoll(task);
867 unregisterCommunicationInterface(this);
873 public ModbusSlaveEndpoint getEndpoint() {
879 public ModbusCommunicationInterface newModbusCommunicationInterface(ModbusSlaveEndpoint endpoint,
880 @Nullable EndpointPoolConfiguration configuration) throws IllegalArgumentException {
881 boolean openCommFoundWithSameEndpointDifferentConfig = communicationInterfaces.stream()
882 .filter(comm -> comm.endpoint.equals(endpoint))
883 .anyMatch(comm -> comm.configuration != null && !comm.configuration.equals(configuration));
884 if (openCommFoundWithSameEndpointDifferentConfig) {
885 throw new IllegalArgumentException(
886 "Communication interface is already open with different configuration to this same endpoint");
889 ModbusCommunicationInterfaceImpl comm = new ModbusCommunicationInterfaceImpl(endpoint, configuration);
890 communicationInterfaces.add(comm);
895 public @Nullable EndpointPoolConfiguration getEndpointPoolConfiguration(ModbusSlaveEndpoint endpoint) {
896 Objects.requireNonNull(connectionFactory, "Not activated!");
897 return connectionFactory.getEndpointPoolConfiguration(endpoint);
900 private void unregisterCommunicationInterface(ModbusCommunicationInterface commInterface) {
901 communicationInterfaces.remove(commInterface);
902 maybeCloseConnections(commInterface.getEndpoint());
905 private void maybeCloseConnections(ModbusSlaveEndpoint endpoint) {
906 boolean lastCommWithThisEndpointWasRemoved = communicationInterfaces.stream()
907 .filter(comm -> comm.endpoint.equals(endpoint)).count() == 0L;
908 if (lastCommWithThisEndpointWasRemoved) {
909 // Since last communication interface pointing to this endpoint was closed, we can clean up resources
910 // and disconnect connections.
912 // Make sure connections to this endpoint are closed when they are returned to pool (which
913 // is usually pretty soon as transactions should be relatively short-lived)
914 ModbusSlaveConnectionFactoryImpl localConnectionFactory = connectionFactory;
915 if (localConnectionFactory != null) {
916 localConnectionFactory.disconnectOnReturn(endpoint, System.currentTimeMillis());
918 // Close all idle connections as well (they will be reconnected if necessary on borrow)
919 if (connectionPool != null) {
920 connectionPool.clear(endpoint);
922 } catch (Exception e) {
923 logger.warn("Could not clear endpoint {}. Stack trace follows", endpoint, e);
930 protected void activate(Map<String, Object> configProperties) {
931 synchronized (this) {
932 logger.info("Modbus manager activated");
933 if (connectionPool == null) {
934 constructConnectionPool();
936 ScheduledExecutorService scheduledThreadPoolExecutor = this.scheduledThreadPoolExecutor;
937 if (scheduledThreadPoolExecutor == null) {
938 this.scheduledThreadPoolExecutor = scheduledThreadPoolExecutor = ThreadPoolManager
939 .getScheduledPool(MODBUS_POLLER_THREAD_POOL_NAME);
941 if (scheduledThreadPoolExecutor.isShutdown()) {
942 logger.warn("Thread pool is shut down! Aborting activation of ModbusMangerImpl");
943 throw new IllegalStateException("Thread pool(s) shut down! Aborting activation of ModbusMangerImpl");
945 monitorFuture = scheduledThreadPoolExecutor.scheduleWithFixedDelay(this::logTaskQueueInfo, 0,
946 MONITOR_QUEUE_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
951 protected void deactivate() {
952 synchronized (this) {
953 KeyedObjectPool<ModbusSlaveEndpoint, ModbusSlaveConnection> connectionPool = this.connectionPool;
954 if (connectionPool != null) {
956 for (ModbusCommunicationInterface commInterface : this.communicationInterfaces) {
958 commInterface.close();
959 } catch (Exception e) {
960 logger.warn("Error when closing communication interface", e);
964 connectionPool.close();
965 this.connectionPool = connectionPool = null;
968 if (monitorFuture != null) {
969 monitorFuture.cancel(true);
970 monitorFuture = null;
972 // Note that it is not allowed to shutdown the executor, since they will be reused when
973 // when pool is received from ThreadPoolManager is called
974 scheduledThreadPoolExecutor = null;
975 connectionFactory = null;
976 logger.debug("Modbus manager deactivated");
980 private void logTaskQueueInfo() {
981 synchronized (pollMonitorLogger) {
982 ScheduledExecutorService scheduledThreadPoolExecutor = this.scheduledThreadPoolExecutor;
983 if (scheduledThreadPoolExecutor == null) {
986 // Avoid excessive spamming with queue monitor when many tasks are executed
987 if (System.currentTimeMillis() - lastQueueMonitorLog < MONITOR_QUEUE_INTERVAL_MILLIS) {
990 lastQueueMonitorLog = System.currentTimeMillis();
991 pollMonitorLogger.trace("<POLL MONITOR>");
992 this.scheduledPollTasks.forEach((task, future) -> {
993 pollMonitorLogger.trace(
994 "POLL MONITOR: scheduled poll task. FC: {}, start {}, length {}, done: {}, canceled: {}, delay: {}. Full task {}",
995 task.getRequest().getFunctionCode(), task.getRequest().getReference(),
996 task.getRequest().getDataLength(), future.isDone(), future.isCancelled(),
997 future.getDelay(TimeUnit.MILLISECONDS), task);
999 if (scheduledThreadPoolExecutor instanceof ThreadPoolExecutor) {
1000 ThreadPoolExecutor executor = ((ThreadPoolExecutor) scheduledThreadPoolExecutor);
1001 pollMonitorLogger.trace(
1002 "POLL MONITOR: scheduledThreadPoolExecutor queue size: {}, remaining space {}. Active threads {}",
1003 executor.getQueue().size(), executor.getQueue().remainingCapacity(), executor.getActiveCount());
1004 if (executor.getQueue().size() >= WARN_QUEUE_SIZE) {
1005 pollMonitorLogger.warn(
1006 "Many ({}) tasks queued in scheduledThreadPoolExecutor! This might be sign of bad design or bug in the binding code.",
1007 executor.getQueue().size());
1011 pollMonitorLogger.trace("</POLL MONITOR>");