]> git.basschouten.com Git - openhab-addons.git/blob
912175e4d5d6e355b0638e588b2002369ae7164c
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.io.transport.modbus.test;
14
15 import static org.hamcrest.CoreMatchers.*;
16 import static org.hamcrest.MatcherAssert.assertThat;
17 import static org.junit.jupiter.api.Assertions.*;
18 import static org.junit.jupiter.api.Assumptions.*;
19
20 import java.io.IOException;
21 import java.lang.reflect.Constructor;
22 import java.lang.reflect.Method;
23 import java.net.InetAddress;
24 import java.net.Socket;
25 import java.net.SocketImpl;
26 import java.net.SocketImplFactory;
27 import java.net.UnknownHostException;
28 import java.util.BitSet;
29 import java.util.Optional;
30 import java.util.Queue;
31 import java.util.concurrent.ConcurrentLinkedQueue;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.atomic.AtomicInteger;
35 import java.util.concurrent.atomic.AtomicReference;
36
37 import org.apache.commons.lang.StringUtils;
38 import org.eclipse.jdt.annotation.NonNull;
39 import org.junit.jupiter.api.BeforeEach;
40 import org.junit.jupiter.api.Test;
41 import org.openhab.io.transport.modbus.BitArray;
42 import org.openhab.io.transport.modbus.ModbusCommunicationInterface;
43 import org.openhab.io.transport.modbus.ModbusReadFunctionCode;
44 import org.openhab.io.transport.modbus.ModbusReadRequestBlueprint;
45 import org.openhab.io.transport.modbus.ModbusRegisterArray;
46 import org.openhab.io.transport.modbus.ModbusResponse;
47 import org.openhab.io.transport.modbus.ModbusWriteCoilRequestBlueprint;
48 import org.openhab.io.transport.modbus.PollTask;
49 import org.openhab.io.transport.modbus.endpoint.EndpointPoolConfiguration;
50 import org.openhab.io.transport.modbus.endpoint.ModbusSlaveEndpoint;
51 import org.openhab.io.transport.modbus.endpoint.ModbusTCPSlaveEndpoint;
52 import org.openhab.io.transport.modbus.exception.ModbusConnectionException;
53 import org.openhab.io.transport.modbus.exception.ModbusSlaveErrorResponseException;
54 import org.openhab.io.transport.modbus.exception.ModbusSlaveIOException;
55 import org.slf4j.LoggerFactory;
56
57 import net.wimpi.modbus.msg.ModbusRequest;
58 import net.wimpi.modbus.msg.WriteCoilRequest;
59 import net.wimpi.modbus.msg.WriteMultipleCoilsRequest;
60 import net.wimpi.modbus.procimg.SimpleDigitalIn;
61 import net.wimpi.modbus.procimg.SimpleDigitalOut;
62 import net.wimpi.modbus.procimg.SimpleRegister;
63 import net.wimpi.modbus.util.BitVector;
64
65 /**
66  * @author Sami Salonen - Initial contribution
67  */
68 public class SmokeTest extends IntegrationTestSupport {
69
70     private static final int COIL_EVERY_N_TRUE = 2;
71     private static final int DISCRETE_EVERY_N_TRUE = 3;
72     private static final int HOLDING_REGISTER_MULTIPLIER = 1;
73     private static final int INPUT_REGISTER_MULTIPLIER = 10;
74     private static final SpyingSocketFactory socketSpy = new SpyingSocketFactory();
75     static {
76         try {
77             Socket.setSocketImplFactory(socketSpy);
78         } catch (IOException e) {
79             fail("Could not install socket spy in SmokeTest");
80         }
81     }
82
83     /**
84      * Whether tests are run in Continuous Integration environment, i.e. Jenkins or Travis CI
85      *
86      * Travis CI is detected using CI environment variable, see https://docs.travis-ci.com/user/environment-variables/
87      * Jenkins CI is detected using JENKINS_HOME environment variable
88      *
89      * @return
90      */
91     private boolean isRunningInCI() {
92         return "true".equals(System.getenv("CI")) || StringUtils.isNotBlank(System.getenv("JENKINS_HOME"));
93     }
94
95     private void generateData() {
96         for (int i = 0; i < 100; i++) {
97             spi.addRegister(new SimpleRegister(i * HOLDING_REGISTER_MULTIPLIER));
98             spi.addInputRegister(new SimpleRegister(i * INPUT_REGISTER_MULTIPLIER));
99             spi.addDigitalOut(new SimpleDigitalOut(i % COIL_EVERY_N_TRUE == 0));
100             spi.addDigitalIn(new SimpleDigitalIn(i % DISCRETE_EVERY_N_TRUE == 0));
101         }
102     }
103
104     private void testCoilValues(BitArray bits, int offsetInBitArray) {
105         for (int i = 0; i < bits.size(); i++) {
106             boolean expected = (i + offsetInBitArray) % COIL_EVERY_N_TRUE == 0;
107             assertThat(String.format("i=%d, expecting %b, got %b", i, bits.getBit(i), expected), bits.getBit(i),
108                     is(equalTo(expected)));
109         }
110     }
111
112     private void testDiscreteValues(BitArray bits, int offsetInBitArray) {
113         for (int i = 0; i < bits.size(); i++) {
114             boolean expected = (i + offsetInBitArray) % DISCRETE_EVERY_N_TRUE == 0;
115             assertThat(String.format("i=%d, expecting %b, got %b", i, bits.getBit(i), expected), bits.getBit(i),
116                     is(equalTo(expected)));
117         }
118     }
119
120     private void testHoldingValues(ModbusRegisterArray registers, int offsetInRegisters) {
121         for (int i = 0; i < registers.size(); i++) {
122             int expected = (i + offsetInRegisters) * HOLDING_REGISTER_MULTIPLIER;
123             assertThat(String.format("i=%d, expecting %d, got %d", i, registers.getRegister(i).toUnsignedShort(),
124                     expected), registers.getRegister(i).toUnsignedShort(), is(equalTo(expected)));
125         }
126     }
127
128     private void testInputValues(ModbusRegisterArray registers, int offsetInRegisters) {
129         for (int i = 0; i < registers.size(); i++) {
130             int expected = (i + offsetInRegisters) * INPUT_REGISTER_MULTIPLIER;
131             assertThat(String.format("i=%d, expecting %d, got %d", i, registers.getRegister(i).toUnsignedShort(),
132                     expected), registers.getRegister(i).toUnsignedShort(), is(equalTo(expected)));
133         }
134     }
135
136     @BeforeEach
137     public void setUpSocketSpy() throws IOException {
138         socketSpy.sockets.clear();
139     }
140
141     /**
142      * Test handling of slave error responses. In this case, error code = 2, illegal data address, since no data.
143      *
144      * @throws Exception
145      */
146     @Test
147     public void testSlaveReadErrorResponse() throws Exception {
148         ModbusSlaveEndpoint endpoint = getEndpoint();
149         AtomicInteger okCount = new AtomicInteger();
150         AtomicInteger errorCount = new AtomicInteger();
151         CountDownLatch callbackCalled = new CountDownLatch(1);
152         AtomicReference<Exception> lastError = new AtomicReference<>();
153         try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, null)) {
154             comms.submitOneTimePoll(new ModbusReadRequestBlueprint(SLAVE_UNIT_ID,
155                     ModbusReadFunctionCode.READ_MULTIPLE_REGISTERS, 0, 5, 1), result -> {
156                         assert result.getRegisters().isPresent();
157                         okCount.incrementAndGet();
158                         callbackCalled.countDown();
159                     }, failure -> {
160                         errorCount.incrementAndGet();
161                         lastError.set(failure.getCause());
162                         callbackCalled.countDown();
163                     });
164             assertTrue(callbackCalled.await(60, TimeUnit.SECONDS));
165
166             assertThat(okCount.get(), is(equalTo(0)));
167             assertThat(errorCount.get(), is(equalTo(1)));
168             assertTrue(lastError.get() instanceof ModbusSlaveErrorResponseException, lastError.toString());
169         }
170     }
171
172     /**
173      * Test handling of connection error responses.
174      *
175      * @throws Exception
176      */
177     @Test
178     public void testSlaveConnectionError() throws Exception {
179         // In the test we have non-responding slave (see http://stackoverflow.com/a/904609), and we use short connection
180         // timeout
181         ModbusSlaveEndpoint endpoint = new ModbusTCPSlaveEndpoint("10.255.255.1", 9999);
182         EndpointPoolConfiguration configuration = new EndpointPoolConfiguration();
183         configuration.setConnectTimeoutMillis(100);
184
185         AtomicInteger okCount = new AtomicInteger();
186         AtomicInteger errorCount = new AtomicInteger();
187         CountDownLatch callbackCalled = new CountDownLatch(1);
188         AtomicReference<Exception> lastError = new AtomicReference<>();
189         try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint,
190                 configuration)) {
191             comms.submitOneTimePoll(new ModbusReadRequestBlueprint(SLAVE_UNIT_ID,
192                     ModbusReadFunctionCode.READ_MULTIPLE_REGISTERS, 0, 5, 1), result -> {
193                         assert result.getRegisters().isPresent();
194                         okCount.incrementAndGet();
195                         callbackCalled.countDown();
196                     }, failure -> {
197                         errorCount.incrementAndGet();
198                         lastError.set(failure.getCause());
199                         callbackCalled.countDown();
200                     });
201             assertTrue(callbackCalled.await(60, TimeUnit.SECONDS));
202
203             assertThat(okCount.get(), is(equalTo(0)));
204             assertThat(errorCount.get(), is(equalTo(1)));
205             assertTrue(lastError.get() instanceof ModbusConnectionException, lastError.toString());
206         }
207     }
208
209     /**
210      * Have super slow connection response, eventually resulting as timeout (due to default timeout of 3 s in
211      * net.wimpi.modbus.Modbus.DEFAULT_TIMEOUT)
212      *
213      * @throws Exception
214      */
215     @Test
216     public void testIOError() throws Exception {
217         artificialServerWait = 60000;
218         ModbusSlaveEndpoint endpoint = getEndpoint();
219
220         AtomicInteger okCount = new AtomicInteger();
221         AtomicInteger errorCount = new AtomicInteger();
222         CountDownLatch callbackCalled = new CountDownLatch(1);
223         AtomicReference<Exception> lastError = new AtomicReference<>();
224         try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, null)) {
225             comms.submitOneTimePoll(new ModbusReadRequestBlueprint(SLAVE_UNIT_ID,
226                     ModbusReadFunctionCode.READ_MULTIPLE_REGISTERS, 0, 5, 1), result -> {
227                         assert result.getRegisters().isPresent();
228                         okCount.incrementAndGet();
229                         callbackCalled.countDown();
230                     }, failure -> {
231                         errorCount.incrementAndGet();
232                         lastError.set(failure.getCause());
233                         callbackCalled.countDown();
234                     });
235             assertTrue(callbackCalled.await(15, TimeUnit.SECONDS));
236             assertThat(okCount.get(), is(equalTo(0)));
237             assertThat(lastError.toString(), errorCount.get(), is(equalTo(1)));
238             assertTrue(lastError.get() instanceof ModbusSlaveIOException, lastError.toString());
239         }
240     }
241
242     public void testOneOffReadWithDiscreteOrCoils(ModbusReadFunctionCode functionCode, int count) throws Exception {
243         assertThat(functionCode, is(anyOf(equalTo(ModbusReadFunctionCode.READ_INPUT_DISCRETES),
244                 equalTo(ModbusReadFunctionCode.READ_COILS))));
245         generateData();
246         ModbusSlaveEndpoint endpoint = getEndpoint();
247
248         AtomicInteger unexpectedCount = new AtomicInteger();
249         CountDownLatch callbackCalled = new CountDownLatch(1);
250         AtomicReference<Object> lastData = new AtomicReference<>();
251
252         final int offset = 1;
253
254         try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, null)) {
255             comms.submitOneTimePoll(new ModbusReadRequestBlueprint(SLAVE_UNIT_ID, functionCode, offset, count, 1),
256                     result -> {
257                         Optional<@NonNull BitArray> bitsOptional = result.getBits();
258                         if (bitsOptional.isPresent()) {
259                             lastData.set(bitsOptional.get());
260                         } else {
261                             unexpectedCount.incrementAndGet();
262                         }
263                         callbackCalled.countDown();
264                     }, failure -> {
265                         unexpectedCount.incrementAndGet();
266                         callbackCalled.countDown();
267                     });
268             assertTrue(callbackCalled.await(60, TimeUnit.SECONDS));
269
270             assertThat(unexpectedCount.get(), is(equalTo(0)));
271             BitArray bits = (BitArray) lastData.get();
272             assertThat(bits, notNullValue());
273             assertThat(bits.size(), is(equalTo(count)));
274             if (functionCode == ModbusReadFunctionCode.READ_INPUT_DISCRETES) {
275                 testDiscreteValues(bits, offset);
276             } else {
277                 testCoilValues(bits, offset);
278             }
279         }
280     }
281
282     @Test
283     public void testOneOffReadWithDiscrete1() throws Exception {
284         testOneOffReadWithDiscreteOrCoils(ModbusReadFunctionCode.READ_INPUT_DISCRETES, 1);
285     }
286
287     @Test
288     public void testOneOffReadWithDiscrete7() throws Exception {
289         // less than byte
290         testOneOffReadWithDiscreteOrCoils(ModbusReadFunctionCode.READ_INPUT_DISCRETES, 7);
291     }
292
293     @Test
294     public void testOneOffReadWithDiscrete8() throws Exception {
295         // exactly one byte
296         testOneOffReadWithDiscreteOrCoils(ModbusReadFunctionCode.READ_INPUT_DISCRETES, 8);
297     }
298
299     @Test
300     public void testOneOffReadWithDiscrete13() throws Exception {
301         // larger than byte, less than word (16 bit)
302         testOneOffReadWithDiscreteOrCoils(ModbusReadFunctionCode.READ_INPUT_DISCRETES, 13);
303     }
304
305     @Test
306     public void testOneOffReadWithDiscrete18() throws Exception {
307         // larger than word (16 bit)
308         testOneOffReadWithDiscreteOrCoils(ModbusReadFunctionCode.READ_INPUT_DISCRETES, 18);
309     }
310
311     @Test
312     public void testOneOffReadWithCoils1() throws Exception {
313         testOneOffReadWithDiscreteOrCoils(ModbusReadFunctionCode.READ_COILS, 1);
314     }
315
316     @Test
317     public void testOneOffReadWithCoils7() throws Exception {
318         // less than byte
319         testOneOffReadWithDiscreteOrCoils(ModbusReadFunctionCode.READ_COILS, 7);
320     }
321
322     @Test
323     public void testOneOffReadWithCoils8() throws Exception {
324         // exactly one byte
325         testOneOffReadWithDiscreteOrCoils(ModbusReadFunctionCode.READ_COILS, 8);
326     }
327
328     @Test
329     public void testOneOffReadWithCoils13() throws Exception {
330         // larger than byte, less than word (16 bit)
331         testOneOffReadWithDiscreteOrCoils(ModbusReadFunctionCode.READ_COILS, 13);
332     }
333
334     @Test
335     public void testOneOffReadWithCoils18() throws Exception {
336         // larger than word (16 bit)
337         testOneOffReadWithDiscreteOrCoils(ModbusReadFunctionCode.READ_COILS, 18);
338     }
339
340     /**
341      *
342      * @throws Exception
343      */
344     @Test
345     public void testOneOffReadWithHolding() throws Exception {
346         generateData();
347         ModbusSlaveEndpoint endpoint = getEndpoint();
348
349         AtomicInteger unexpectedCount = new AtomicInteger();
350         CountDownLatch callbackCalled = new CountDownLatch(1);
351         AtomicReference<Object> lastData = new AtomicReference<>();
352
353         try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, null)) {
354             comms.submitOneTimePoll(new ModbusReadRequestBlueprint(SLAVE_UNIT_ID,
355                     ModbusReadFunctionCode.READ_MULTIPLE_REGISTERS, 1, 15, 1), result -> {
356                         Optional<@NonNull ModbusRegisterArray> registersOptional = result.getRegisters();
357                         if (registersOptional.isPresent()) {
358                             lastData.set(registersOptional.get());
359                         } else {
360                             unexpectedCount.incrementAndGet();
361                         }
362                         callbackCalled.countDown();
363                     }, failure -> {
364                         unexpectedCount.incrementAndGet();
365                         callbackCalled.countDown();
366                     });
367             assertTrue(callbackCalled.await(60, TimeUnit.SECONDS));
368
369             assertThat(unexpectedCount.get(), is(equalTo(0)));
370             ModbusRegisterArray registers = (ModbusRegisterArray) lastData.get();
371             assertThat(registers.size(), is(equalTo(15)));
372             testHoldingValues(registers, 1);
373         }
374     }
375
376     /**
377      *
378      * @throws Exception
379      */
380     @Test
381     public void testOneOffReadWithInput() throws Exception {
382         generateData();
383         ModbusSlaveEndpoint endpoint = getEndpoint();
384
385         AtomicInteger unexpectedCount = new AtomicInteger();
386         CountDownLatch callbackCalled = new CountDownLatch(1);
387         AtomicReference<Object> lastData = new AtomicReference<>();
388         try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, null)) {
389             comms.submitOneTimePoll(new ModbusReadRequestBlueprint(SLAVE_UNIT_ID,
390                     ModbusReadFunctionCode.READ_INPUT_REGISTERS, 1, 15, 1), result -> {
391                         Optional<@NonNull ModbusRegisterArray> registersOptional = result.getRegisters();
392                         if (registersOptional.isPresent()) {
393                             lastData.set(registersOptional.get());
394                         } else {
395                             unexpectedCount.incrementAndGet();
396                         }
397                         callbackCalled.countDown();
398                     }, failure -> {
399                         unexpectedCount.incrementAndGet();
400                         callbackCalled.countDown();
401                     });
402             assertTrue(callbackCalled.await(60, TimeUnit.SECONDS));
403
404             assertThat(unexpectedCount.get(), is(equalTo(0)));
405             ModbusRegisterArray registers = (ModbusRegisterArray) lastData.get();
406             assertThat(registers.size(), is(equalTo(15)));
407             testInputValues(registers, 1);
408         }
409     }
410
411     /**
412      *
413      * @throws Exception
414      */
415     @Test
416     public void testOneOffWriteMultipleCoil() throws Exception {
417         LoggerFactory.getLogger(this.getClass()).error("STARTING MULTIPLE");
418         generateData();
419         ModbusSlaveEndpoint endpoint = getEndpoint();
420
421         AtomicInteger unexpectedCount = new AtomicInteger();
422         AtomicReference<Object> lastData = new AtomicReference<>();
423
424         BitArray bits = new BitArray(true, true, false, false, true, true);
425         try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, null)) {
426             comms.submitOneTimeWrite(new ModbusWriteCoilRequestBlueprint(SLAVE_UNIT_ID, 3, bits, true, 1), result -> {
427                 lastData.set(result.getResponse());
428             }, failure -> {
429                 unexpectedCount.incrementAndGet();
430             });
431             waitForAssert(() -> {
432                 assertThat(unexpectedCount.get(), is(equalTo(0)));
433                 assertThat(lastData.get(), is(notNullValue()));
434
435                 ModbusResponse response = (ModbusResponse) lastData.get();
436                 assertThat(response.getFunctionCode(), is(equalTo(15)));
437
438                 assertThat(modbustRequestCaptor.getAllReturnValues().size(), is(equalTo(1)));
439                 ModbusRequest request = modbustRequestCaptor.getAllReturnValues().get(0);
440                 assertThat(request.getFunctionCode(), is(equalTo(15)));
441                 assertThat(((WriteMultipleCoilsRequest) request).getReference(), is(equalTo(3)));
442                 assertThat(((WriteMultipleCoilsRequest) request).getBitCount(), is(equalTo(bits.size())));
443                 BitVector writeRequestCoils = ((WriteMultipleCoilsRequest) request).getCoils();
444                 BitArray writtenBits = new BitArray(BitSet.valueOf(writeRequestCoils.getBytes()), bits.size());
445                 assertThat(writtenBits, is(equalTo(bits)));
446             }, 6000, 10);
447         }
448         LoggerFactory.getLogger(this.getClass()).error("ENDINGMULTIPLE");
449     }
450
451     /**
452      * Write is out-of-bounds, slave should return error
453      *
454      * @throws Exception
455      */
456     @Test
457     public void testOneOffWriteMultipleCoilError() throws Exception {
458         generateData();
459         ModbusSlaveEndpoint endpoint = getEndpoint();
460
461         AtomicInteger unexpectedCount = new AtomicInteger();
462         CountDownLatch callbackCalled = new CountDownLatch(1);
463         AtomicReference<Exception> lastError = new AtomicReference<>();
464
465         BitArray bits = new BitArray(500);
466         try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, null)) {
467             comms.submitOneTimeWrite(new ModbusWriteCoilRequestBlueprint(SLAVE_UNIT_ID, 3, bits, true, 1), result -> {
468                 unexpectedCount.incrementAndGet();
469                 callbackCalled.countDown();
470             }, failure -> {
471                 lastError.set(failure.getCause());
472                 callbackCalled.countDown();
473             });
474             assertTrue(callbackCalled.await(60, TimeUnit.SECONDS));
475
476             assertThat(unexpectedCount.get(), is(equalTo(0)));
477             assertTrue(lastError.get() instanceof ModbusSlaveErrorResponseException, lastError.toString());
478
479             assertThat(modbustRequestCaptor.getAllReturnValues().size(), is(equalTo(1)));
480             ModbusRequest request = modbustRequestCaptor.getAllReturnValues().get(0);
481             assertThat(request.getFunctionCode(), is(equalTo(15)));
482             assertThat(((WriteMultipleCoilsRequest) request).getReference(), is(equalTo(3)));
483             assertThat(((WriteMultipleCoilsRequest) request).getBitCount(), is(equalTo(bits.size())));
484             BitVector writeRequestCoils = ((WriteMultipleCoilsRequest) request).getCoils();
485             BitArray writtenBits = new BitArray(BitSet.valueOf(writeRequestCoils.getBytes()), bits.size());
486             assertThat(writtenBits, is(equalTo(bits)));
487         }
488     }
489
490     /**
491      *
492      * @throws Exception
493      */
494     @Test
495     public void testOneOffWriteSingleCoil() throws Exception {
496         generateData();
497         ModbusSlaveEndpoint endpoint = getEndpoint();
498
499         AtomicInteger unexpectedCount = new AtomicInteger();
500         CountDownLatch callbackCalled = new CountDownLatch(1);
501         AtomicReference<Object> lastData = new AtomicReference<>();
502
503         BitArray bits = new BitArray(true);
504         try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, null)) {
505             comms.submitOneTimeWrite(new ModbusWriteCoilRequestBlueprint(SLAVE_UNIT_ID, 3, bits, false, 1), result -> {
506                 lastData.set(result.getResponse());
507                 callbackCalled.countDown();
508             }, failure -> {
509                 unexpectedCount.incrementAndGet();
510                 callbackCalled.countDown();
511             });
512             assertTrue(callbackCalled.await(60, TimeUnit.SECONDS));
513
514             assertThat(unexpectedCount.get(), is(equalTo(0)));
515             ModbusResponse response = (ModbusResponse) lastData.get();
516             assertThat(response.getFunctionCode(), is(equalTo(5)));
517
518             assertThat(modbustRequestCaptor.getAllReturnValues().size(), is(equalTo(1)));
519             ModbusRequest request = modbustRequestCaptor.getAllReturnValues().get(0);
520             assertThat(request.getFunctionCode(), is(equalTo(5)));
521             assertThat(((WriteCoilRequest) request).getReference(), is(equalTo(3)));
522             assertThat(((WriteCoilRequest) request).getCoil(), is(equalTo(true)));
523         }
524     }
525
526     /**
527      *
528      * Write is out-of-bounds, slave should return error
529      *
530      * @throws Exception
531      */
532     @Test
533     public void testOneOffWriteSingleCoilError() throws Exception {
534         generateData();
535         ModbusSlaveEndpoint endpoint = getEndpoint();
536
537         AtomicInteger unexpectedCount = new AtomicInteger();
538         CountDownLatch callbackCalled = new CountDownLatch(1);
539         AtomicReference<Exception> lastError = new AtomicReference<>();
540
541         BitArray bits = new BitArray(true);
542         try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, null)) {
543             comms.submitOneTimeWrite(new ModbusWriteCoilRequestBlueprint(SLAVE_UNIT_ID, 300, bits, false, 1),
544                     result -> {
545                         unexpectedCount.incrementAndGet();
546                         callbackCalled.countDown();
547                     }, failure -> {
548                         lastError.set(failure.getCause());
549                         callbackCalled.countDown();
550                     });
551             assertTrue(callbackCalled.await(60, TimeUnit.SECONDS));
552
553             assertThat(unexpectedCount.get(), is(equalTo(0)));
554             assertTrue(lastError.get() instanceof ModbusSlaveErrorResponseException, lastError.toString());
555
556             assertThat(modbustRequestCaptor.getAllReturnValues().size(), is(equalTo(1)));
557             ModbusRequest request = modbustRequestCaptor.getAllReturnValues().get(0);
558             assertThat(request.getFunctionCode(), is(equalTo(5)));
559             assertThat(((WriteCoilRequest) request).getReference(), is(equalTo(300)));
560             assertThat(((WriteCoilRequest) request).getCoil(), is(equalTo(true)));
561         }
562     }
563
564     /**
565      * Testing regular polling of coils
566      *
567      * Amount of requests is timed, and average poll period is checked
568      *
569      * @throws Exception
570      */
571     @Test
572     public void testRegularReadEvery150msWithCoil() throws Exception {
573         generateData();
574         ModbusSlaveEndpoint endpoint = getEndpoint();
575
576         AtomicInteger unexpectedCount = new AtomicInteger();
577         CountDownLatch callbackCalled = new CountDownLatch(5);
578         AtomicInteger dataReceived = new AtomicInteger();
579
580         long start = System.currentTimeMillis();
581         try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, null)) {
582             comms.registerRegularPoll(
583                     new ModbusReadRequestBlueprint(SLAVE_UNIT_ID, ModbusReadFunctionCode.READ_COILS, 1, 15, 1), 150, 0,
584                     result -> {
585                         Optional<@NonNull BitArray> bitsOptional = result.getBits();
586                         if (bitsOptional.isPresent()) {
587                             BitArray bits = bitsOptional.get();
588                             dataReceived.incrementAndGet();
589                             try {
590                                 assertThat(bits.size(), is(equalTo(15)));
591                                 testCoilValues(bits, 1);
592                             } catch (AssertionError e) {
593                                 unexpectedCount.incrementAndGet();
594                             }
595                         } else {
596                             unexpectedCount.incrementAndGet();
597                         }
598                         callbackCalled.countDown();
599                     }, failure -> {
600                         unexpectedCount.incrementAndGet();
601                         callbackCalled.countDown();
602                     });
603             assertTrue(callbackCalled.await(60, TimeUnit.SECONDS));
604
605             long end = System.currentTimeMillis();
606             assertPollDetails(unexpectedCount, dataReceived, start, end, 145, 500);
607         }
608     }
609
610     /**
611      * Testing regular polling of holding registers
612      *
613      * Amount of requests is timed, and average poll period is checked
614      *
615      * @throws Exception
616      */
617     @Test
618     public void testRegularReadEvery150msWithHolding() throws Exception {
619         generateData();
620         ModbusSlaveEndpoint endpoint = getEndpoint();
621
622         AtomicInteger unexpectedCount = new AtomicInteger();
623         CountDownLatch callbackCalled = new CountDownLatch(5);
624         AtomicInteger dataReceived = new AtomicInteger();
625
626         long start = System.currentTimeMillis();
627         try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, null)) {
628             comms.registerRegularPoll(new ModbusReadRequestBlueprint(SLAVE_UNIT_ID,
629                     ModbusReadFunctionCode.READ_MULTIPLE_REGISTERS, 1, 15, 1), 150, 0, result -> {
630                         Optional<@NonNull ModbusRegisterArray> registersOptional = result.getRegisters();
631                         if (registersOptional.isPresent()) {
632                             ModbusRegisterArray registers = registersOptional.get();
633                             dataReceived.incrementAndGet();
634                             try {
635                                 assertThat(registers.size(), is(equalTo(15)));
636                                 testHoldingValues(registers, 1);
637                             } catch (AssertionError e) {
638                                 unexpectedCount.incrementAndGet();
639                             }
640                         } else {
641                             unexpectedCount.incrementAndGet();
642                         }
643                         callbackCalled.countDown();
644                     }, failure -> {
645                         unexpectedCount.incrementAndGet();
646                         callbackCalled.countDown();
647                     });
648             assertTrue(callbackCalled.await(60, TimeUnit.SECONDS));
649             long end = System.currentTimeMillis();
650             assertPollDetails(unexpectedCount, dataReceived, start, end, 145, 500);
651         }
652     }
653
654     @Test
655     public void testRegularReadFirstErrorThenOK() throws Exception {
656         generateData();
657         ModbusSlaveEndpoint endpoint = getEndpoint();
658
659         AtomicInteger unexpectedCount = new AtomicInteger();
660         CountDownLatch callbackCalled = new CountDownLatch(5);
661         AtomicInteger dataReceived = new AtomicInteger();
662
663         long start = System.currentTimeMillis();
664         try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, null)) {
665             comms.registerRegularPoll(new ModbusReadRequestBlueprint(SLAVE_UNIT_ID,
666                     ModbusReadFunctionCode.READ_MULTIPLE_REGISTERS, 1, 15, 1), 150, 0, result -> {
667                         Optional<@NonNull ModbusRegisterArray> registersOptional = result.getRegisters();
668                         if (registersOptional.isPresent()) {
669                             ModbusRegisterArray registers = registersOptional.get();
670                             dataReceived.incrementAndGet();
671                             try {
672                                 assertThat(registers.size(), is(equalTo(15)));
673                                 testHoldingValues(registers, 1);
674                             } catch (AssertionError e) {
675                                 unexpectedCount.incrementAndGet();
676                             }
677
678                         } else {
679                             unexpectedCount.incrementAndGet();
680                         }
681                         callbackCalled.countDown();
682                     }, failure -> {
683                         unexpectedCount.incrementAndGet();
684                         callbackCalled.countDown();
685                     });
686             assertTrue(callbackCalled.await(60, TimeUnit.SECONDS));
687             long end = System.currentTimeMillis();
688             assertPollDetails(unexpectedCount, dataReceived, start, end, 145, 500);
689         }
690     }
691
692     /**
693      *
694      * @param unexpectedCount number of unexpected callback calls
695      * @param callbackCalled number of callback calls (including unexpected)
696      * @param dataReceived number of expected callback calls (onBits or onRegisters)
697      * @param pollStartMillis poll start time in milliepoch
698      * @param expectedPollAverageMin average poll period should be at least greater than this
699      * @param expectedPollAverageMax average poll period less than this
700      * @throws InterruptedException
701      */
702     private void assertPollDetails(AtomicInteger unexpectedCount, AtomicInteger expectedCount, long pollStartMillis,
703             long pollEndMillis, int expectedPollAverageMin, int expectedPollAverageMax) throws InterruptedException {
704         int responses = expectedCount.get();
705         assertThat(unexpectedCount.get(), is(equalTo(0)));
706         assertTrue(responses > 1);
707
708         // Rest of the (timing-sensitive) assertions are not run in CI
709         assumeFalse(isRunningInCI(), "Running in CI! Will not test timing-sensitive details");
710         float averagePollPeriodMillis = ((float) (pollEndMillis - pollStartMillis)) / (responses - 1);
711         assertTrue(averagePollPeriodMillis > expectedPollAverageMin && averagePollPeriodMillis < expectedPollAverageMax,
712                 String.format(
713                         "Measured avarage poll period %f ms (%d responses in %d ms) is not withing expected limits [%d, %d]",
714                         averagePollPeriodMillis, responses, pollEndMillis - pollStartMillis, expectedPollAverageMin,
715                         expectedPollAverageMax));
716     }
717
718     @Test
719     public void testUnregisterPollingOnClose() throws Exception {
720         ModbusSlaveEndpoint endpoint = getEndpoint();
721
722         AtomicInteger unexpectedCount = new AtomicInteger();
723         AtomicInteger errorCount = new AtomicInteger();
724         CountDownLatch successfulCountDownLatch = new CountDownLatch(3);
725         AtomicInteger expectedReceived = new AtomicInteger();
726
727         long start = System.currentTimeMillis();
728         try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, null)) {
729             comms.registerRegularPoll(new ModbusReadRequestBlueprint(SLAVE_UNIT_ID,
730                     ModbusReadFunctionCode.READ_MULTIPLE_REGISTERS, 1, 15, 1), 200, 0, result -> {
731                         Optional<@NonNull ModbusRegisterArray> registersOptional = result.getRegisters();
732                         if (registersOptional.isPresent()) {
733                             expectedReceived.incrementAndGet();
734                             successfulCountDownLatch.countDown();
735                         } else {
736                             // bits
737                             unexpectedCount.incrementAndGet();
738                         }
739                     }, failure -> {
740                         if (spi.getDigitalInCount() > 0) {
741                             // No errors expected after server filled with data
742                             unexpectedCount.incrementAndGet();
743                         } else {
744                             expectedReceived.incrementAndGet();
745                             errorCount.incrementAndGet();
746                             generateData();
747                             successfulCountDownLatch.countDown();
748                         }
749                     });
750             // Wait for N successful responses before proceeding with assertions of poll rate
751             assertTrue(successfulCountDownLatch.await(60, TimeUnit.SECONDS));
752
753             long end = System.currentTimeMillis();
754             assertPollDetails(unexpectedCount, expectedReceived, start, end, 190, 600);
755
756             // wait some more and ensure nothing comes back
757             Thread.sleep(500);
758             assertThat(unexpectedCount.get(), is(equalTo(0)));
759         }
760     }
761
762     @Test
763     public void testUnregisterPollingExplicit() throws Exception {
764         ModbusSlaveEndpoint endpoint = getEndpoint();
765
766         AtomicInteger unexpectedCount = new AtomicInteger();
767         AtomicInteger errorCount = new AtomicInteger();
768         CountDownLatch callbackCalled = new CountDownLatch(3);
769         AtomicInteger expectedReceived = new AtomicInteger();
770
771         long start = System.currentTimeMillis();
772         try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, null)) {
773             PollTask task = comms.registerRegularPoll(new ModbusReadRequestBlueprint(SLAVE_UNIT_ID,
774                     ModbusReadFunctionCode.READ_MULTIPLE_REGISTERS, 1, 15, 1), 200, 0, result -> {
775                         Optional<@NonNull ModbusRegisterArray> registersOptional = result.getRegisters();
776                         if (registersOptional.isPresent()) {
777                             expectedReceived.incrementAndGet();
778                         } else {
779                             // bits
780                             unexpectedCount.incrementAndGet();
781                         }
782                         callbackCalled.countDown();
783                     }, failure -> {
784                         if (spi.getDigitalInCount() > 0) {
785                             // No errors expected after server filled with data
786                             unexpectedCount.incrementAndGet();
787                         } else {
788                             expectedReceived.incrementAndGet();
789                             errorCount.incrementAndGet();
790                             generateData();
791                         }
792                     });
793             assertTrue(callbackCalled.await(60, TimeUnit.SECONDS));
794             long end = System.currentTimeMillis();
795             assertPollDetails(unexpectedCount, expectedReceived, start, end, 190, 600);
796
797             // Explicitly unregister the regular poll
798             comms.unregisterRegularPoll(task);
799
800             // wait some more and ensure nothing comes back
801             Thread.sleep(500);
802             assertThat(unexpectedCount.get(), is(equalTo(0)));
803         }
804     }
805
806     @SuppressWarnings("null")
807     @Test
808     public void testPoolConfigurationWithoutListener() throws Exception {
809         EndpointPoolConfiguration defaultConfig = modbusManager.getEndpointPoolConfiguration(getEndpoint());
810         assertThat(defaultConfig, is(notNullValue()));
811
812         EndpointPoolConfiguration newConfig = new EndpointPoolConfiguration();
813         newConfig.setConnectMaxTries(5);
814         try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(getEndpoint(),
815                 newConfig)) {
816             // Sets configuration for the endpoint implicitly
817         }
818
819         assertThat(modbusManager.getEndpointPoolConfiguration(getEndpoint()).getConnectMaxTries(), is(equalTo(5)));
820         assertThat(modbusManager.getEndpointPoolConfiguration(getEndpoint()), is(not(equalTo(defaultConfig))));
821
822         // Reset config
823         try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(getEndpoint(), null)) {
824             // Sets configuration for the endpoint implicitly
825         }
826         // Should match the default
827         assertThat(modbusManager.getEndpointPoolConfiguration(getEndpoint()), is(equalTo(defaultConfig)));
828     }
829
830     @Test
831     public void testConnectionCloseAfterLastCommunicationInterfaceClosed() throws IllegalArgumentException, Exception {
832         assumeFalse(isRunningInCI(), "Running in CI! Will not test timing-sensitive details");
833         ModbusSlaveEndpoint endpoint = getEndpoint();
834         assumeTrue(endpoint instanceof ModbusTCPSlaveEndpoint,
835                 "Connection closing test supported only with TCP slaves");
836
837         // Generate server data
838         generateData();
839
840         EndpointPoolConfiguration config = new EndpointPoolConfiguration();
841         config.setReconnectAfterMillis(9_000_000);
842
843         // 1. capture open connections at this point
844         long openSocketsBefore = getNumberOfOpenClients(socketSpy);
845         assertThat(openSocketsBefore, is(equalTo(0L)));
846
847         // 2. make poll, binding opens the tcp connection
848         try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, config)) {
849             {
850                 CountDownLatch latch = new CountDownLatch(1);
851                 comms.submitOneTimePoll(new ModbusReadRequestBlueprint(1, ModbusReadFunctionCode.READ_COILS, 0, 1, 1),
852                         response -> {
853                             latch.countDown();
854                         }, failure -> {
855                             latch.countDown();
856                         });
857                 assertTrue(latch.await(60, TimeUnit.SECONDS));
858             }
859             waitForAssert(() -> {
860                 // 3. ensure one open connection
861                 long openSocketsAfter = getNumberOfOpenClients(socketSpy);
862                 assertThat(openSocketsAfter, is(equalTo(1L)));
863             });
864             try (ModbusCommunicationInterface comms2 = modbusManager.newModbusCommunicationInterface(endpoint,
865                     config)) {
866                 {
867                     CountDownLatch latch = new CountDownLatch(1);
868                     comms.submitOneTimePoll(
869                             new ModbusReadRequestBlueprint(1, ModbusReadFunctionCode.READ_COILS, 0, 1, 1), response -> {
870                                 latch.countDown();
871                             }, failure -> {
872                                 latch.countDown();
873                             });
874                     assertTrue(latch.await(60, TimeUnit.SECONDS));
875                 }
876                 assertThat(getNumberOfOpenClients(socketSpy), is(equalTo(1L)));
877                 // wait for moment (to check that no connections are closed)
878                 Thread.sleep(1000);
879                 // no more than 1 connection, even though requests are going through
880                 assertThat(getNumberOfOpenClients(socketSpy), is(equalTo(1L)));
881             }
882             Thread.sleep(1000);
883             // Still one connection open even after closing second connection
884             assertThat(getNumberOfOpenClients(socketSpy), is(equalTo(1L)));
885
886         } // 4. close (the last) comms
887           // ensure that open connections are closed
888           // (despite huge "reconnect after millis")
889         waitForAssert(() -> {
890             long openSocketsAfterClose = getNumberOfOpenClients(socketSpy);
891             assertThat(openSocketsAfterClose, is(equalTo(0L)));
892         });
893     }
894
895     @Test
896     public void testConnectionCloseAfterOneOffPoll() throws IllegalArgumentException, Exception {
897         assumeFalse(isRunningInCI(), "Running in CI! Will not test timing-sensitive details");
898         ModbusSlaveEndpoint endpoint = getEndpoint();
899         assumeTrue(endpoint instanceof ModbusTCPSlaveEndpoint,
900                 "Connection closing test supported only with TCP slaves");
901
902         // Generate server data
903         generateData();
904
905         EndpointPoolConfiguration config = new EndpointPoolConfiguration();
906         config.setReconnectAfterMillis(2_000);
907
908         // 1. capture open connections at this point
909         long openSocketsBefore = getNumberOfOpenClients(socketSpy);
910         assertThat(openSocketsBefore, is(equalTo(0L)));
911
912         // 2. make poll, binding opens the tcp connection
913         try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, config)) {
914             {
915                 CountDownLatch latch = new CountDownLatch(1);
916                 comms.submitOneTimePoll(new ModbusReadRequestBlueprint(1, ModbusReadFunctionCode.READ_COILS, 0, 1, 1),
917                         response -> {
918                             latch.countDown();
919                         }, failure -> {
920                             latch.countDown();
921                         });
922                 assertTrue(latch.await(60, TimeUnit.SECONDS));
923             }
924             // Right after the poll we should have one connection open
925             waitForAssert(() -> {
926                 // 3. ensure one open connection
927                 long openSocketsAfter = getNumberOfOpenClients(socketSpy);
928                 assertThat(openSocketsAfter, is(equalTo(1L)));
929             });
930             // 4. Connection should close itself by the commons pool eviction policy (checking for old idle connection
931             // every now and then)
932             waitForAssert(() -> {
933                 // 3. ensure one open connection
934                 long openSocketsAfter = getNumberOfOpenClients(socketSpy);
935                 assertThat(openSocketsAfter, is(equalTo(0L)));
936             }, 60_000, 50);
937
938         }
939     }
940
941     private long getNumberOfOpenClients(SpyingSocketFactory socketSpy) {
942         final InetAddress testServerAddress;
943         try {
944             testServerAddress = localAddress();
945         } catch (UnknownHostException e) {
946             throw new RuntimeException(e);
947         }
948         return socketSpy.sockets.stream().filter(socketImpl -> {
949             Socket socket = getSocketOfSocketImpl(socketImpl);
950             return socket.getPort() == tcpModbusPort && socket.isConnected()
951                     && socket.getLocalAddress().equals(testServerAddress);
952         }).count();
953     }
954
955     /**
956      * Spy all sockets that are created
957      *
958      * @author Sami Salonen
959      *
960      */
961     private static class SpyingSocketFactory implements SocketImplFactory {
962
963         Queue<SocketImpl> sockets = new ConcurrentLinkedQueue<SocketImpl>();
964
965         @Override
966         public SocketImpl createSocketImpl() {
967             SocketImpl socket = newSocksSocketImpl();
968             sockets.add(socket);
969             return socket;
970         }
971     }
972
973     private static SocketImpl newSocksSocketImpl() {
974         try {
975             Class<?> defaultSocketImpl = Class.forName("java.net.SocksSocketImpl");
976             Constructor<?> constructor = defaultSocketImpl.getDeclaredConstructor();
977             constructor.setAccessible(true);
978             return (SocketImpl) constructor.newInstance();
979         } catch (Exception e) {
980             throw new RuntimeException(e);
981         }
982     }
983
984     /**
985      * Get Socket corresponding to SocketImpl using reflection
986      */
987     private static Socket getSocketOfSocketImpl(SocketImpl impl) {
988         try {
989             Method getSocket = SocketImpl.class.getDeclaredMethod("getSocket");
990             getSocket.setAccessible(true);
991             return (Socket) getSocket.invoke(impl);
992         } catch (Exception e) {
993             throw new RuntimeException(e);
994         }
995     }
996 }