]> git.basschouten.com Git - openhab-addons.git/blob
c0c886d49276f26862df4d2b7efa65d4de46d078
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.io.transport.modbus.test;
14
15 import static org.hamcrest.CoreMatchers.*;
16 import static org.junit.Assert.*;
17 import static org.junit.Assume.*;
18
19 import java.io.IOException;
20 import java.lang.reflect.Constructor;
21 import java.lang.reflect.Method;
22 import java.net.InetAddress;
23 import java.net.Socket;
24 import java.net.SocketImpl;
25 import java.net.SocketImplFactory;
26 import java.net.UnknownHostException;
27 import java.util.BitSet;
28 import java.util.Optional;
29 import java.util.Queue;
30 import java.util.concurrent.ConcurrentLinkedQueue;
31 import java.util.concurrent.CountDownLatch;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.atomic.AtomicReference;
35
36 import org.apache.commons.lang.StringUtils;
37 import org.eclipse.jdt.annotation.NonNull;
38 import org.junit.Before;
39 import org.junit.Test;
40 import org.openhab.io.transport.modbus.BitArray;
41 import org.openhab.io.transport.modbus.ModbusCommunicationInterface;
42 import org.openhab.io.transport.modbus.ModbusReadFunctionCode;
43 import org.openhab.io.transport.modbus.ModbusReadRequestBlueprint;
44 import org.openhab.io.transport.modbus.ModbusRegisterArray;
45 import org.openhab.io.transport.modbus.ModbusResponse;
46 import org.openhab.io.transport.modbus.ModbusWriteCoilRequestBlueprint;
47 import org.openhab.io.transport.modbus.PollTask;
48 import org.openhab.io.transport.modbus.endpoint.EndpointPoolConfiguration;
49 import org.openhab.io.transport.modbus.endpoint.ModbusSlaveEndpoint;
50 import org.openhab.io.transport.modbus.endpoint.ModbusTCPSlaveEndpoint;
51 import org.openhab.io.transport.modbus.exception.ModbusConnectionException;
52 import org.openhab.io.transport.modbus.exception.ModbusSlaveErrorResponseException;
53 import org.openhab.io.transport.modbus.exception.ModbusSlaveIOException;
54 import org.slf4j.LoggerFactory;
55
56 import net.wimpi.modbus.msg.ModbusRequest;
57 import net.wimpi.modbus.msg.WriteCoilRequest;
58 import net.wimpi.modbus.msg.WriteMultipleCoilsRequest;
59 import net.wimpi.modbus.procimg.SimpleDigitalIn;
60 import net.wimpi.modbus.procimg.SimpleDigitalOut;
61 import net.wimpi.modbus.procimg.SimpleRegister;
62 import net.wimpi.modbus.util.BitVector;
63
64 /**
65  * @author Sami Salonen - Initial contribution
66  */
67 public class SmokeTest extends IntegrationTestSupport {
68
69     private static final int COIL_EVERY_N_TRUE = 2;
70     private static final int DISCRETE_EVERY_N_TRUE = 3;
71     private static final int HOLDING_REGISTER_MULTIPLIER = 1;
72     private static final int INPUT_REGISTER_MULTIPLIER = 10;
73     private static final SpyingSocketFactory socketSpy = new SpyingSocketFactory();
74     static {
75         try {
76             Socket.setSocketImplFactory(socketSpy);
77         } catch (IOException e) {
78             fail("Could not install socket spy in SmokeTest");
79         }
80     }
81
82     /**
83      * Whether tests are run in Continuous Integration environment, i.e. Jenkins or Travis CI
84      *
85      * Travis CI is detected using CI environment variable, see https://docs.travis-ci.com/user/environment-variables/
86      * Jenkins CI is detected using JENKINS_HOME environment variable
87      *
88      * @return
89      */
90     private boolean isRunningInCI() {
91         return "true".equals(System.getenv("CI")) || StringUtils.isNotBlank(System.getenv("JENKINS_HOME"));
92     }
93
94     private void generateData() {
95         for (int i = 0; i < 100; i++) {
96             spi.addRegister(new SimpleRegister(i * HOLDING_REGISTER_MULTIPLIER));
97             spi.addInputRegister(new SimpleRegister(i * INPUT_REGISTER_MULTIPLIER));
98             spi.addDigitalOut(new SimpleDigitalOut(i % COIL_EVERY_N_TRUE == 0));
99             spi.addDigitalIn(new SimpleDigitalIn(i % DISCRETE_EVERY_N_TRUE == 0));
100         }
101     }
102
103     private void testCoilValues(BitArray bits, int offsetInBitArray) {
104         for (int i = 0; i < bits.size(); i++) {
105             boolean expected = (i + offsetInBitArray) % COIL_EVERY_N_TRUE == 0;
106             assertThat(String.format("i=%d, expecting %b, got %b", i, bits.getBit(i), expected), bits.getBit(i),
107                     is(equalTo(expected)));
108         }
109     }
110
111     private void testDiscreteValues(BitArray bits, int offsetInBitArray) {
112         for (int i = 0; i < bits.size(); i++) {
113             boolean expected = (i + offsetInBitArray) % DISCRETE_EVERY_N_TRUE == 0;
114             assertThat(String.format("i=%d, expecting %b, got %b", i, bits.getBit(i), expected), bits.getBit(i),
115                     is(equalTo(expected)));
116         }
117     }
118
119     private void testHoldingValues(ModbusRegisterArray registers, int offsetInRegisters) {
120         for (int i = 0; i < registers.size(); i++) {
121             int expected = (i + offsetInRegisters) * HOLDING_REGISTER_MULTIPLIER;
122             assertThat(String.format("i=%d, expecting %d, got %d", i, registers.getRegister(i).toUnsignedShort(),
123                     expected), registers.getRegister(i).toUnsignedShort(), is(equalTo(expected)));
124         }
125     }
126
127     private void testInputValues(ModbusRegisterArray registers, int offsetInRegisters) {
128         for (int i = 0; i < registers.size(); i++) {
129             int expected = (i + offsetInRegisters) * INPUT_REGISTER_MULTIPLIER;
130             assertThat(String.format("i=%d, expecting %d, got %d", i, registers.getRegister(i).toUnsignedShort(),
131                     expected), registers.getRegister(i).toUnsignedShort(), is(equalTo(expected)));
132         }
133     }
134
135     @Before
136     public void setUpSocketSpy() throws IOException {
137         socketSpy.sockets.clear();
138     }
139
140     /**
141      * Test handling of slave error responses. In this case, error code = 2, illegal data address, since no data.
142      *
143      * @throws Exception
144      */
145     @Test
146     public void testSlaveReadErrorResponse() throws Exception {
147         ModbusSlaveEndpoint endpoint = getEndpoint();
148         AtomicInteger okCount = new AtomicInteger();
149         AtomicInteger errorCount = new AtomicInteger();
150         CountDownLatch callbackCalled = new CountDownLatch(1);
151         AtomicReference<Exception> lastError = new AtomicReference<>();
152         try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, null)) {
153             comms.submitOneTimePoll(new ModbusReadRequestBlueprint(SLAVE_UNIT_ID,
154                     ModbusReadFunctionCode.READ_MULTIPLE_REGISTERS, 0, 5, 1), result -> {
155                         assert result.getRegisters().isPresent();
156                         okCount.incrementAndGet();
157                         callbackCalled.countDown();
158                     }, failure -> {
159                         errorCount.incrementAndGet();
160                         lastError.set(failure.getCause());
161                         callbackCalled.countDown();
162                     });
163             assertTrue(callbackCalled.await(60, TimeUnit.SECONDS));
164
165             assertThat(okCount.get(), is(equalTo(0)));
166             assertThat(errorCount.get(), is(equalTo(1)));
167             assertTrue(lastError.toString(), lastError.get() instanceof ModbusSlaveErrorResponseException);
168         }
169     }
170
171     /**
172      * Test handling of connection error responses.
173      *
174      * @throws Exception
175      */
176     @Test
177     public void testSlaveConnectionError() throws Exception {
178         // In the test we have non-responding slave (see http://stackoverflow.com/a/904609), and we use short connection
179         // timeout
180         ModbusSlaveEndpoint endpoint = new ModbusTCPSlaveEndpoint("10.255.255.1", 9999);
181         EndpointPoolConfiguration configuration = new EndpointPoolConfiguration();
182         configuration.setConnectTimeoutMillis(100);
183
184         AtomicInteger okCount = new AtomicInteger();
185         AtomicInteger errorCount = new AtomicInteger();
186         CountDownLatch callbackCalled = new CountDownLatch(1);
187         AtomicReference<Exception> lastError = new AtomicReference<>();
188         try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint,
189                 configuration)) {
190             comms.submitOneTimePoll(new ModbusReadRequestBlueprint(SLAVE_UNIT_ID,
191                     ModbusReadFunctionCode.READ_MULTIPLE_REGISTERS, 0, 5, 1), result -> {
192                         assert result.getRegisters().isPresent();
193                         okCount.incrementAndGet();
194                         callbackCalled.countDown();
195                     }, failure -> {
196                         errorCount.incrementAndGet();
197                         lastError.set(failure.getCause());
198                         callbackCalled.countDown();
199                     });
200             assertTrue(callbackCalled.await(60, TimeUnit.SECONDS));
201
202             assertThat(okCount.get(), is(equalTo(0)));
203             assertThat(errorCount.get(), is(equalTo(1)));
204             assertTrue(lastError.toString(), lastError.get() instanceof ModbusConnectionException);
205         }
206     }
207
208     /**
209      * Have super slow connection response, eventually resulting as timeout (due to default timeout of 3 s in
210      * net.wimpi.modbus.Modbus.DEFAULT_TIMEOUT)
211      *
212      * @throws Exception
213      */
214     @Test
215     public void testIOError() throws Exception {
216         artificialServerWait = 60000;
217         ModbusSlaveEndpoint endpoint = getEndpoint();
218
219         AtomicInteger okCount = new AtomicInteger();
220         AtomicInteger errorCount = new AtomicInteger();
221         CountDownLatch callbackCalled = new CountDownLatch(1);
222         AtomicReference<Exception> lastError = new AtomicReference<>();
223         try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, null)) {
224             comms.submitOneTimePoll(new ModbusReadRequestBlueprint(SLAVE_UNIT_ID,
225                     ModbusReadFunctionCode.READ_MULTIPLE_REGISTERS, 0, 5, 1), result -> {
226                         assert result.getRegisters().isPresent();
227                         okCount.incrementAndGet();
228                         callbackCalled.countDown();
229                     }, failure -> {
230                         errorCount.incrementAndGet();
231                         lastError.set(failure.getCause());
232                         callbackCalled.countDown();
233                     });
234             assertTrue(callbackCalled.await(15, TimeUnit.SECONDS));
235             assertThat(okCount.get(), is(equalTo(0)));
236             assertThat(lastError.toString(), errorCount.get(), is(equalTo(1)));
237             assertTrue(lastError.toString(), lastError.get() instanceof ModbusSlaveIOException);
238         }
239     }
240
241     public void testOneOffReadWithDiscreteOrCoils(ModbusReadFunctionCode functionCode, int count) throws Exception {
242         assertThat(functionCode, is(anyOf(equalTo(ModbusReadFunctionCode.READ_INPUT_DISCRETES),
243                 equalTo(ModbusReadFunctionCode.READ_COILS))));
244         generateData();
245         ModbusSlaveEndpoint endpoint = getEndpoint();
246
247         AtomicInteger unexpectedCount = new AtomicInteger();
248         CountDownLatch callbackCalled = new CountDownLatch(1);
249         AtomicReference<Object> lastData = new AtomicReference<>();
250
251         final int offset = 1;
252
253         try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, null)) {
254             comms.submitOneTimePoll(new ModbusReadRequestBlueprint(SLAVE_UNIT_ID, functionCode, offset, count, 1),
255                     result -> {
256                         Optional<@NonNull BitArray> bitsOptional = result.getBits();
257                         if (bitsOptional.isPresent()) {
258                             lastData.set(bitsOptional.get());
259                         } else {
260                             unexpectedCount.incrementAndGet();
261                         }
262                         callbackCalled.countDown();
263                     }, failure -> {
264                         unexpectedCount.incrementAndGet();
265                         callbackCalled.countDown();
266                     });
267             assertTrue(callbackCalled.await(60, TimeUnit.SECONDS));
268
269             assertThat(unexpectedCount.get(), is(equalTo(0)));
270             BitArray bits = (BitArray) lastData.get();
271             assertThat(bits, notNullValue());
272             assertThat(bits.size(), is(equalTo(count)));
273             if (functionCode == ModbusReadFunctionCode.READ_INPUT_DISCRETES) {
274                 testDiscreteValues(bits, offset);
275             } else {
276                 testCoilValues(bits, offset);
277             }
278         }
279     }
280
281     @Test
282     public void testOneOffReadWithDiscrete1() throws Exception {
283         testOneOffReadWithDiscreteOrCoils(ModbusReadFunctionCode.READ_INPUT_DISCRETES, 1);
284     }
285
286     @Test
287     public void testOneOffReadWithDiscrete7() throws Exception {
288         // less than byte
289         testOneOffReadWithDiscreteOrCoils(ModbusReadFunctionCode.READ_INPUT_DISCRETES, 7);
290     }
291
292     @Test
293     public void testOneOffReadWithDiscrete8() throws Exception {
294         // exactly one byte
295         testOneOffReadWithDiscreteOrCoils(ModbusReadFunctionCode.READ_INPUT_DISCRETES, 8);
296     }
297
298     @Test
299     public void testOneOffReadWithDiscrete13() throws Exception {
300         // larger than byte, less than word (16 bit)
301         testOneOffReadWithDiscreteOrCoils(ModbusReadFunctionCode.READ_INPUT_DISCRETES, 13);
302     }
303
304     @Test
305     public void testOneOffReadWithDiscrete18() throws Exception {
306         // larger than word (16 bit)
307         testOneOffReadWithDiscreteOrCoils(ModbusReadFunctionCode.READ_INPUT_DISCRETES, 18);
308     }
309
310     @Test
311     public void testOneOffReadWithCoils1() throws Exception {
312         testOneOffReadWithDiscreteOrCoils(ModbusReadFunctionCode.READ_COILS, 1);
313     }
314
315     @Test
316     public void testOneOffReadWithCoils7() throws Exception {
317         // less than byte
318         testOneOffReadWithDiscreteOrCoils(ModbusReadFunctionCode.READ_COILS, 7);
319     }
320
321     @Test
322     public void testOneOffReadWithCoils8() throws Exception {
323         // exactly one byte
324         testOneOffReadWithDiscreteOrCoils(ModbusReadFunctionCode.READ_COILS, 8);
325     }
326
327     @Test
328     public void testOneOffReadWithCoils13() throws Exception {
329         // larger than byte, less than word (16 bit)
330         testOneOffReadWithDiscreteOrCoils(ModbusReadFunctionCode.READ_COILS, 13);
331     }
332
333     @Test
334     public void testOneOffReadWithCoils18() throws Exception {
335         // larger than word (16 bit)
336         testOneOffReadWithDiscreteOrCoils(ModbusReadFunctionCode.READ_COILS, 18);
337     }
338
339     /**
340      *
341      * @throws Exception
342      */
343     @Test
344     public void testOneOffReadWithHolding() throws Exception {
345         generateData();
346         ModbusSlaveEndpoint endpoint = getEndpoint();
347
348         AtomicInteger unexpectedCount = new AtomicInteger();
349         CountDownLatch callbackCalled = new CountDownLatch(1);
350         AtomicReference<Object> lastData = new AtomicReference<>();
351
352         try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, null)) {
353             comms.submitOneTimePoll(new ModbusReadRequestBlueprint(SLAVE_UNIT_ID,
354                     ModbusReadFunctionCode.READ_MULTIPLE_REGISTERS, 1, 15, 1), result -> {
355                         Optional<@NonNull ModbusRegisterArray> registersOptional = result.getRegisters();
356                         if (registersOptional.isPresent()) {
357                             lastData.set(registersOptional.get());
358                         } else {
359                             unexpectedCount.incrementAndGet();
360                         }
361                         callbackCalled.countDown();
362                     }, failure -> {
363                         unexpectedCount.incrementAndGet();
364                         callbackCalled.countDown();
365                     });
366             assertTrue(callbackCalled.await(60, TimeUnit.SECONDS));
367
368             assertThat(unexpectedCount.get(), is(equalTo(0)));
369             ModbusRegisterArray registers = (ModbusRegisterArray) lastData.get();
370             assertThat(registers.size(), is(equalTo(15)));
371             testHoldingValues(registers, 1);
372         }
373     }
374
375     /**
376      *
377      * @throws Exception
378      */
379     @Test
380     public void testOneOffReadWithInput() throws Exception {
381         generateData();
382         ModbusSlaveEndpoint endpoint = getEndpoint();
383
384         AtomicInteger unexpectedCount = new AtomicInteger();
385         CountDownLatch callbackCalled = new CountDownLatch(1);
386         AtomicReference<Object> lastData = new AtomicReference<>();
387         try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, null)) {
388             comms.submitOneTimePoll(new ModbusReadRequestBlueprint(SLAVE_UNIT_ID,
389                     ModbusReadFunctionCode.READ_INPUT_REGISTERS, 1, 15, 1), result -> {
390                         Optional<@NonNull ModbusRegisterArray> registersOptional = result.getRegisters();
391                         if (registersOptional.isPresent()) {
392                             lastData.set(registersOptional.get());
393                         } else {
394                             unexpectedCount.incrementAndGet();
395                         }
396                         callbackCalled.countDown();
397                     }, failure -> {
398                         unexpectedCount.incrementAndGet();
399                         callbackCalled.countDown();
400                     });
401             assertTrue(callbackCalled.await(60, TimeUnit.SECONDS));
402
403             assertThat(unexpectedCount.get(), is(equalTo(0)));
404             ModbusRegisterArray registers = (ModbusRegisterArray) lastData.get();
405             assertThat(registers.size(), is(equalTo(15)));
406             testInputValues(registers, 1);
407         }
408     }
409
410     /**
411      *
412      * @throws Exception
413      */
414     @Test
415     public void testOneOffWriteMultipleCoil() throws Exception {
416         LoggerFactory.getLogger(this.getClass()).error("STARTING MULTIPLE");
417         generateData();
418         ModbusSlaveEndpoint endpoint = getEndpoint();
419
420         AtomicInteger unexpectedCount = new AtomicInteger();
421         AtomicReference<Object> lastData = new AtomicReference<>();
422
423         BitArray bits = new BitArray(true, true, false, false, true, true);
424         try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, null)) {
425             comms.submitOneTimeWrite(new ModbusWriteCoilRequestBlueprint(SLAVE_UNIT_ID, 3, bits, true, 1), result -> {
426                 lastData.set(result.getResponse());
427             }, failure -> {
428                 unexpectedCount.incrementAndGet();
429             });
430             waitForAssert(() -> {
431                 assertThat(unexpectedCount.get(), is(equalTo(0)));
432                 assertThat(lastData.get(), is(notNullValue()));
433
434                 ModbusResponse response = (ModbusResponse) lastData.get();
435                 assertThat(response.getFunctionCode(), is(equalTo(15)));
436
437                 assertThat(modbustRequestCaptor.getAllReturnValues().size(), is(equalTo(1)));
438                 ModbusRequest request = modbustRequestCaptor.getAllReturnValues().get(0);
439                 assertThat(request.getFunctionCode(), is(equalTo(15)));
440                 assertThat(((WriteMultipleCoilsRequest) request).getReference(), is(equalTo(3)));
441                 assertThat(((WriteMultipleCoilsRequest) request).getBitCount(), is(equalTo(bits.size())));
442                 BitVector writeRequestCoils = ((WriteMultipleCoilsRequest) request).getCoils();
443                 BitArray writtenBits = new BitArray(BitSet.valueOf(writeRequestCoils.getBytes()), bits.size());
444                 assertThat(writtenBits, is(equalTo(bits)));
445             }, 6000, 10);
446         }
447         LoggerFactory.getLogger(this.getClass()).error("ENDINGMULTIPLE");
448     }
449
450     /**
451      * Write is out-of-bounds, slave should return error
452      *
453      * @throws Exception
454      */
455     @Test
456     public void testOneOffWriteMultipleCoilError() throws Exception {
457         generateData();
458         ModbusSlaveEndpoint endpoint = getEndpoint();
459
460         AtomicInteger unexpectedCount = new AtomicInteger();
461         CountDownLatch callbackCalled = new CountDownLatch(1);
462         AtomicReference<Exception> lastError = new AtomicReference<>();
463
464         BitArray bits = new BitArray(500);
465         try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, null)) {
466             comms.submitOneTimeWrite(new ModbusWriteCoilRequestBlueprint(SLAVE_UNIT_ID, 3, bits, true, 1), result -> {
467                 unexpectedCount.incrementAndGet();
468                 callbackCalled.countDown();
469             }, failure -> {
470                 lastError.set(failure.getCause());
471                 callbackCalled.countDown();
472             });
473             assertTrue(callbackCalled.await(60, TimeUnit.SECONDS));
474
475             assertThat(unexpectedCount.get(), is(equalTo(0)));
476             assertTrue(lastError.toString(), lastError.get() instanceof ModbusSlaveErrorResponseException);
477
478             assertThat(modbustRequestCaptor.getAllReturnValues().size(), is(equalTo(1)));
479             ModbusRequest request = modbustRequestCaptor.getAllReturnValues().get(0);
480             assertThat(request.getFunctionCode(), is(equalTo(15)));
481             assertThat(((WriteMultipleCoilsRequest) request).getReference(), is(equalTo(3)));
482             assertThat(((WriteMultipleCoilsRequest) request).getBitCount(), is(equalTo(bits.size())));
483             BitVector writeRequestCoils = ((WriteMultipleCoilsRequest) request).getCoils();
484             BitArray writtenBits = new BitArray(BitSet.valueOf(writeRequestCoils.getBytes()), bits.size());
485             assertThat(writtenBits, is(equalTo(bits)));
486         }
487     }
488
489     /**
490      *
491      * @throws Exception
492      */
493     @Test
494     public void testOneOffWriteSingleCoil() throws Exception {
495         generateData();
496         ModbusSlaveEndpoint endpoint = getEndpoint();
497
498         AtomicInteger unexpectedCount = new AtomicInteger();
499         CountDownLatch callbackCalled = new CountDownLatch(1);
500         AtomicReference<Object> lastData = new AtomicReference<>();
501
502         BitArray bits = new BitArray(true);
503         try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, null)) {
504             comms.submitOneTimeWrite(new ModbusWriteCoilRequestBlueprint(SLAVE_UNIT_ID, 3, bits, false, 1), result -> {
505                 lastData.set(result.getResponse());
506                 callbackCalled.countDown();
507             }, failure -> {
508                 unexpectedCount.incrementAndGet();
509                 callbackCalled.countDown();
510             });
511             assertTrue(callbackCalled.await(60, TimeUnit.SECONDS));
512
513             assertThat(unexpectedCount.get(), is(equalTo(0)));
514             ModbusResponse response = (ModbusResponse) lastData.get();
515             assertThat(response.getFunctionCode(), is(equalTo(5)));
516
517             assertThat(modbustRequestCaptor.getAllReturnValues().size(), is(equalTo(1)));
518             ModbusRequest request = modbustRequestCaptor.getAllReturnValues().get(0);
519             assertThat(request.getFunctionCode(), is(equalTo(5)));
520             assertThat(((WriteCoilRequest) request).getReference(), is(equalTo(3)));
521             assertThat(((WriteCoilRequest) request).getCoil(), is(equalTo(true)));
522         }
523     }
524
525     /**
526      *
527      * Write is out-of-bounds, slave should return error
528      *
529      * @throws Exception
530      */
531     @Test
532     public void testOneOffWriteSingleCoilError() throws Exception {
533         generateData();
534         ModbusSlaveEndpoint endpoint = getEndpoint();
535
536         AtomicInteger unexpectedCount = new AtomicInteger();
537         CountDownLatch callbackCalled = new CountDownLatch(1);
538         AtomicReference<Exception> lastError = new AtomicReference<>();
539
540         BitArray bits = new BitArray(true);
541         try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, null)) {
542             comms.submitOneTimeWrite(new ModbusWriteCoilRequestBlueprint(SLAVE_UNIT_ID, 300, bits, false, 1),
543                     result -> {
544                         unexpectedCount.incrementAndGet();
545                         callbackCalled.countDown();
546                     }, failure -> {
547                         lastError.set(failure.getCause());
548                         callbackCalled.countDown();
549                     });
550             assertTrue(callbackCalled.await(60, TimeUnit.SECONDS));
551
552             assertThat(unexpectedCount.get(), is(equalTo(0)));
553             assertTrue(lastError.toString(), lastError.get() instanceof ModbusSlaveErrorResponseException);
554
555             assertThat(modbustRequestCaptor.getAllReturnValues().size(), is(equalTo(1)));
556             ModbusRequest request = modbustRequestCaptor.getAllReturnValues().get(0);
557             assertThat(request.getFunctionCode(), is(equalTo(5)));
558             assertThat(((WriteCoilRequest) request).getReference(), is(equalTo(300)));
559             assertThat(((WriteCoilRequest) request).getCoil(), is(equalTo(true)));
560         }
561     }
562
563     /**
564      * Testing regular polling of coils
565      *
566      * Amount of requests is timed, and average poll period is checked
567      *
568      * @throws Exception
569      */
570     @Test
571     public void testRegularReadEvery150msWithCoil() throws Exception {
572         generateData();
573         ModbusSlaveEndpoint endpoint = getEndpoint();
574
575         AtomicInteger unexpectedCount = new AtomicInteger();
576         CountDownLatch callbackCalled = new CountDownLatch(5);
577         AtomicInteger dataReceived = new AtomicInteger();
578
579         long start = System.currentTimeMillis();
580         try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, null)) {
581             comms.registerRegularPoll(
582                     new ModbusReadRequestBlueprint(SLAVE_UNIT_ID, ModbusReadFunctionCode.READ_COILS, 1, 15, 1), 150, 0,
583                     result -> {
584                         Optional<@NonNull BitArray> bitsOptional = result.getBits();
585                         if (bitsOptional.isPresent()) {
586                             BitArray bits = bitsOptional.get();
587                             dataReceived.incrementAndGet();
588                             try {
589                                 assertThat(bits.size(), is(equalTo(15)));
590                                 testCoilValues(bits, 1);
591                             } catch (AssertionError e) {
592                                 unexpectedCount.incrementAndGet();
593                             }
594                         } else {
595                             unexpectedCount.incrementAndGet();
596                         }
597                         callbackCalled.countDown();
598                     }, failure -> {
599                         unexpectedCount.incrementAndGet();
600                         callbackCalled.countDown();
601                     });
602             assertTrue(callbackCalled.await(60, TimeUnit.SECONDS));
603
604             long end = System.currentTimeMillis();
605             assertPollDetails(unexpectedCount, dataReceived, start, end, 145, 500);
606         }
607     }
608
609     /**
610      * Testing regular polling of holding registers
611      *
612      * Amount of requests is timed, and average poll period is checked
613      *
614      * @throws Exception
615      */
616     @Test
617     public void testRegularReadEvery150msWithHolding() throws Exception {
618         generateData();
619         ModbusSlaveEndpoint endpoint = getEndpoint();
620
621         AtomicInteger unexpectedCount = new AtomicInteger();
622         CountDownLatch callbackCalled = new CountDownLatch(5);
623         AtomicInteger dataReceived = new AtomicInteger();
624
625         long start = System.currentTimeMillis();
626         try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, null)) {
627             comms.registerRegularPoll(new ModbusReadRequestBlueprint(SLAVE_UNIT_ID,
628                     ModbusReadFunctionCode.READ_MULTIPLE_REGISTERS, 1, 15, 1), 150, 0, result -> {
629                         Optional<@NonNull ModbusRegisterArray> registersOptional = result.getRegisters();
630                         if (registersOptional.isPresent()) {
631                             ModbusRegisterArray registers = registersOptional.get();
632                             dataReceived.incrementAndGet();
633                             try {
634                                 assertThat(registers.size(), is(equalTo(15)));
635                                 testHoldingValues(registers, 1);
636                             } catch (AssertionError e) {
637                                 unexpectedCount.incrementAndGet();
638                             }
639                         } else {
640                             unexpectedCount.incrementAndGet();
641                         }
642                         callbackCalled.countDown();
643                     }, failure -> {
644                         unexpectedCount.incrementAndGet();
645                         callbackCalled.countDown();
646                     });
647             assertTrue(callbackCalled.await(60, TimeUnit.SECONDS));
648             long end = System.currentTimeMillis();
649             assertPollDetails(unexpectedCount, dataReceived, start, end, 145, 500);
650         }
651     }
652
653     @Test
654     public void testRegularReadFirstErrorThenOK() throws Exception {
655         generateData();
656         ModbusSlaveEndpoint endpoint = getEndpoint();
657
658         AtomicInteger unexpectedCount = new AtomicInteger();
659         CountDownLatch callbackCalled = new CountDownLatch(5);
660         AtomicInteger dataReceived = new AtomicInteger();
661
662         long start = System.currentTimeMillis();
663         try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, null)) {
664             comms.registerRegularPoll(new ModbusReadRequestBlueprint(SLAVE_UNIT_ID,
665                     ModbusReadFunctionCode.READ_MULTIPLE_REGISTERS, 1, 15, 1), 150, 0, result -> {
666                         Optional<@NonNull ModbusRegisterArray> registersOptional = result.getRegisters();
667                         if (registersOptional.isPresent()) {
668                             ModbusRegisterArray registers = registersOptional.get();
669                             dataReceived.incrementAndGet();
670                             try {
671                                 assertThat(registers.size(), is(equalTo(15)));
672                                 testHoldingValues(registers, 1);
673                             } catch (AssertionError e) {
674                                 unexpectedCount.incrementAndGet();
675                             }
676
677                         } else {
678                             unexpectedCount.incrementAndGet();
679                         }
680                         callbackCalled.countDown();
681                     }, failure -> {
682                         unexpectedCount.incrementAndGet();
683                         callbackCalled.countDown();
684                     });
685             assertTrue(callbackCalled.await(60, TimeUnit.SECONDS));
686             long end = System.currentTimeMillis();
687             assertPollDetails(unexpectedCount, dataReceived, start, end, 145, 500);
688         }
689     }
690
691     /**
692      *
693      * @param unexpectedCount number of unexpected callback calls
694      * @param callbackCalled number of callback calls (including unexpected)
695      * @param dataReceived number of expected callback calls (onBits or onRegisters)
696      * @param pollStartMillis poll start time in milliepoch
697      * @param expectedPollAverageMin average poll period should be at least greater than this
698      * @param expectedPollAverageMax average poll period less than this
699      * @throws InterruptedException
700      */
701     private void assertPollDetails(AtomicInteger unexpectedCount, AtomicInteger expectedCount, long pollStartMillis,
702             long pollEndMillis, int expectedPollAverageMin, int expectedPollAverageMax) throws InterruptedException {
703         int responses = expectedCount.get();
704         assertThat(unexpectedCount.get(), is(equalTo(0)));
705         assertTrue(responses > 1);
706
707         // Rest of the (timing-sensitive) assertions are not run in CI
708         assumeFalse("Running in CI! Will not test timing-sensitive details", isRunningInCI());
709         float averagePollPeriodMillis = ((float) (pollEndMillis - pollStartMillis)) / (responses - 1);
710         assertTrue(String.format(
711                 "Measured avarage poll period %f ms (%d responses in %d ms) is not withing expected limits [%d, %d]",
712                 averagePollPeriodMillis, responses, pollEndMillis - pollStartMillis, expectedPollAverageMin,
713                 expectedPollAverageMax),
714                 averagePollPeriodMillis > expectedPollAverageMin && averagePollPeriodMillis < expectedPollAverageMax);
715     }
716
717     @Test
718     public void testUnregisterPollingOnClose() throws Exception {
719         ModbusSlaveEndpoint endpoint = getEndpoint();
720
721         AtomicInteger unexpectedCount = new AtomicInteger();
722         AtomicInteger errorCount = new AtomicInteger();
723         CountDownLatch successfulCountDownLatch = new CountDownLatch(3);
724         AtomicInteger expectedReceived = new AtomicInteger();
725
726         long start = System.currentTimeMillis();
727         try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, null)) {
728             comms.registerRegularPoll(new ModbusReadRequestBlueprint(SLAVE_UNIT_ID,
729                     ModbusReadFunctionCode.READ_MULTIPLE_REGISTERS, 1, 15, 1), 200, 0, result -> {
730                         Optional<@NonNull ModbusRegisterArray> registersOptional = result.getRegisters();
731                         if (registersOptional.isPresent()) {
732                             expectedReceived.incrementAndGet();
733                             successfulCountDownLatch.countDown();
734                         } else {
735                             // bits
736                             unexpectedCount.incrementAndGet();
737                         }
738                     }, failure -> {
739                         if (spi.getDigitalInCount() > 0) {
740                             // No errors expected after server filled with data
741                             unexpectedCount.incrementAndGet();
742                         } else {
743                             expectedReceived.incrementAndGet();
744                             errorCount.incrementAndGet();
745                             generateData();
746                             successfulCountDownLatch.countDown();
747                         }
748                     });
749             // Wait for N successful responses before proceeding with assertions of poll rate
750             assertTrue(successfulCountDownLatch.await(60, TimeUnit.SECONDS));
751
752             long end = System.currentTimeMillis();
753             assertPollDetails(unexpectedCount, expectedReceived, start, end, 190, 600);
754
755             // wait some more and ensure nothing comes back
756             Thread.sleep(500);
757             assertThat(unexpectedCount.get(), is(equalTo(0)));
758         }
759     }
760
761     @Test
762     public void testUnregisterPollingExplicit() throws Exception {
763         ModbusSlaveEndpoint endpoint = getEndpoint();
764
765         AtomicInteger unexpectedCount = new AtomicInteger();
766         AtomicInteger errorCount = new AtomicInteger();
767         CountDownLatch callbackCalled = new CountDownLatch(3);
768         AtomicInteger expectedReceived = new AtomicInteger();
769
770         long start = System.currentTimeMillis();
771         try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, null)) {
772             PollTask task = comms.registerRegularPoll(new ModbusReadRequestBlueprint(SLAVE_UNIT_ID,
773                     ModbusReadFunctionCode.READ_MULTIPLE_REGISTERS, 1, 15, 1), 200, 0, result -> {
774                         Optional<@NonNull ModbusRegisterArray> registersOptional = result.getRegisters();
775                         if (registersOptional.isPresent()) {
776                             expectedReceived.incrementAndGet();
777                         } else {
778                             // bits
779                             unexpectedCount.incrementAndGet();
780                         }
781                         callbackCalled.countDown();
782                     }, failure -> {
783                         if (spi.getDigitalInCount() > 0) {
784                             // No errors expected after server filled with data
785                             unexpectedCount.incrementAndGet();
786                         } else {
787                             expectedReceived.incrementAndGet();
788                             errorCount.incrementAndGet();
789                             generateData();
790                         }
791                     });
792             assertTrue(callbackCalled.await(60, TimeUnit.SECONDS));
793             long end = System.currentTimeMillis();
794             assertPollDetails(unexpectedCount, expectedReceived, start, end, 190, 600);
795
796             // Explicitly unregister the regular poll
797             comms.unregisterRegularPoll(task);
798
799             // wait some more and ensure nothing comes back
800             Thread.sleep(500);
801             assertThat(unexpectedCount.get(), is(equalTo(0)));
802         }
803     }
804
805     @SuppressWarnings("null")
806     @Test
807     public void testPoolConfigurationWithoutListener() throws Exception {
808         EndpointPoolConfiguration defaultConfig = modbusManager.getEndpointPoolConfiguration(getEndpoint());
809         assertThat(defaultConfig, is(notNullValue()));
810
811         EndpointPoolConfiguration newConfig = new EndpointPoolConfiguration();
812         newConfig.setConnectMaxTries(5);
813         try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(getEndpoint(),
814                 newConfig)) {
815             // Sets configuration for the endpoint implicitly
816         }
817
818         assertThat(modbusManager.getEndpointPoolConfiguration(getEndpoint()).getConnectMaxTries(), is(equalTo(5)));
819         assertThat(modbusManager.getEndpointPoolConfiguration(getEndpoint()), is(not(equalTo(defaultConfig))));
820
821         // Reset config
822         try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(getEndpoint(), null)) {
823             // Sets configuration for the endpoint implicitly
824         }
825         // Should match the default
826         assertThat(modbusManager.getEndpointPoolConfiguration(getEndpoint()), is(equalTo(defaultConfig)));
827     }
828
829     @Test
830     public void testConnectionCloseAfterLastCommunicationInterfaceClosed() throws IllegalArgumentException, Exception {
831         assumeFalse("Running in CI! Will not test timing-sensitive details", isRunningInCI());
832         ModbusSlaveEndpoint endpoint = getEndpoint();
833         assumeTrue("Connection closing test supported only with TCP slaves",
834                 endpoint instanceof ModbusTCPSlaveEndpoint);
835
836         // Generate server data
837         generateData();
838
839         EndpointPoolConfiguration config = new EndpointPoolConfiguration();
840         config.setReconnectAfterMillis(9_000_000);
841
842         // 1. capture open connections at this point
843         long openSocketsBefore = getNumberOfOpenClients(socketSpy);
844         assertThat(openSocketsBefore, is(equalTo(0L)));
845
846         // 2. make poll, binding opens the tcp connection
847         try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, config)) {
848             {
849                 CountDownLatch latch = new CountDownLatch(1);
850                 comms.submitOneTimePoll(new ModbusReadRequestBlueprint(1, ModbusReadFunctionCode.READ_COILS, 0, 1, 1),
851                         response -> {
852                             latch.countDown();
853                         }, failure -> {
854                             latch.countDown();
855                         });
856                 assertTrue(latch.await(60, TimeUnit.SECONDS));
857             }
858             waitForAssert(() -> {
859                 // 3. ensure one open connection
860                 long openSocketsAfter = getNumberOfOpenClients(socketSpy);
861                 assertThat(openSocketsAfter, is(equalTo(1L)));
862             });
863             try (ModbusCommunicationInterface comms2 = modbusManager.newModbusCommunicationInterface(endpoint,
864                     config)) {
865                 {
866                     CountDownLatch latch = new CountDownLatch(1);
867                     comms.submitOneTimePoll(
868                             new ModbusReadRequestBlueprint(1, ModbusReadFunctionCode.READ_COILS, 0, 1, 1), response -> {
869                                 latch.countDown();
870                             }, failure -> {
871                                 latch.countDown();
872                             });
873                     assertTrue(latch.await(60, TimeUnit.SECONDS));
874                 }
875                 assertThat(getNumberOfOpenClients(socketSpy), is(equalTo(1L)));
876                 // wait for moment (to check that no connections are closed)
877                 Thread.sleep(1000);
878                 // no more than 1 connection, even though requests are going through
879                 assertThat(getNumberOfOpenClients(socketSpy), is(equalTo(1L)));
880             }
881             Thread.sleep(1000);
882             // Still one connection open even after closing second connection
883             assertThat(getNumberOfOpenClients(socketSpy), is(equalTo(1L)));
884
885         } // 4. close (the last) comms
886           // ensure that open connections are closed
887           // (despite huge "reconnect after millis")
888         waitForAssert(() -> {
889             long openSocketsAfterClose = getNumberOfOpenClients(socketSpy);
890             assertThat(openSocketsAfterClose, is(equalTo(0L)));
891         });
892     }
893
894     @Test
895     public void testConnectionCloseAfterOneOffPoll() throws IllegalArgumentException, Exception {
896         assumeFalse("Running in CI! Will not test timing-sensitive details", isRunningInCI());
897         ModbusSlaveEndpoint endpoint = getEndpoint();
898         assumeTrue("Connection closing test supported only with TCP slaves",
899                 endpoint instanceof ModbusTCPSlaveEndpoint);
900
901         // Generate server data
902         generateData();
903
904         EndpointPoolConfiguration config = new EndpointPoolConfiguration();
905         config.setReconnectAfterMillis(2_000);
906
907         // 1. capture open connections at this point
908         long openSocketsBefore = getNumberOfOpenClients(socketSpy);
909         assertThat(openSocketsBefore, is(equalTo(0L)));
910
911         // 2. make poll, binding opens the tcp connection
912         try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, config)) {
913             {
914                 CountDownLatch latch = new CountDownLatch(1);
915                 comms.submitOneTimePoll(new ModbusReadRequestBlueprint(1, ModbusReadFunctionCode.READ_COILS, 0, 1, 1),
916                         response -> {
917                             latch.countDown();
918                         }, failure -> {
919                             latch.countDown();
920                         });
921                 assertTrue(latch.await(60, TimeUnit.SECONDS));
922             }
923             // Right after the poll we should have one connection open
924             waitForAssert(() -> {
925                 // 3. ensure one open connection
926                 long openSocketsAfter = getNumberOfOpenClients(socketSpy);
927                 assertThat(openSocketsAfter, is(equalTo(1L)));
928             });
929             // 4. Connection should close itself by the commons pool eviction policy (checking for old idle connection
930             // every now and then)
931             waitForAssert(() -> {
932                 // 3. ensure one open connection
933                 long openSocketsAfter = getNumberOfOpenClients(socketSpy);
934                 assertThat(openSocketsAfter, is(equalTo(0L)));
935             }, 60_000, 50);
936
937         }
938     }
939
940     private long getNumberOfOpenClients(SpyingSocketFactory socketSpy) {
941         final InetAddress testServerAddress;
942         try {
943             testServerAddress = localAddress();
944         } catch (UnknownHostException e) {
945             throw new RuntimeException(e);
946         }
947         return socketSpy.sockets.stream().filter(socketImpl -> {
948             Socket socket = getSocketOfSocketImpl(socketImpl);
949             return socket.getPort() == tcpModbusPort && socket.isConnected()
950                     && socket.getLocalAddress().equals(testServerAddress);
951         }).count();
952     }
953
954     /**
955      * Spy all sockets that are created
956      *
957      * @author Sami Salonen
958      *
959      */
960     private static class SpyingSocketFactory implements SocketImplFactory {
961
962         Queue<SocketImpl> sockets = new ConcurrentLinkedQueue<SocketImpl>();
963
964         @Override
965         public SocketImpl createSocketImpl() {
966             SocketImpl socket = newSocksSocketImpl();
967             sockets.add(socket);
968             return socket;
969         }
970     }
971
972     private static SocketImpl newSocksSocketImpl() {
973         try {
974             Class<?> defaultSocketImpl = Class.forName("java.net.SocksSocketImpl");
975             Constructor<?> constructor = defaultSocketImpl.getDeclaredConstructor();
976             constructor.setAccessible(true);
977             return (SocketImpl) constructor.newInstance();
978         } catch (Exception e) {
979             throw new RuntimeException(e);
980         }
981     }
982
983     /**
984      * Get Socket corresponding to SocketImpl using reflection
985      */
986     private static Socket getSocketOfSocketImpl(SocketImpl impl) {
987         try {
988             Method getSocket = SocketImpl.class.getDeclaredMethod("getSocket");
989             getSocket.setAccessible(true);
990             return (Socket) getSocket.invoke(impl);
991         } catch (Exception e) {
992             throw new RuntimeException(e);
993         }
994     }
995 }