]> git.basschouten.com Git - openhab-addons.git/blob
5d46f4cce39f44a27fbe8f72848fb3a15e862ad8
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2021 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.lcn.internal.connection;
14
15 import java.io.ByteArrayOutputStream;
16 import java.io.IOException;
17 import java.nio.BufferOverflowException;
18 import java.nio.ByteBuffer;
19 import java.nio.channels.AsynchronousSocketChannel;
20 import java.nio.channels.Channel;
21 import java.nio.channels.CompletionHandler;
22 import java.time.Instant;
23 import java.time.temporal.ChronoUnit;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Objects;
30 import java.util.Queue;
31 import java.util.concurrent.BlockingQueue;
32 import java.util.concurrent.LinkedBlockingQueue;
33 import java.util.concurrent.ScheduledExecutorService;
34
35 import org.eclipse.jdt.annotation.NonNullByDefault;
36 import org.eclipse.jdt.annotation.Nullable;
37 import org.openhab.binding.lcn.internal.common.LcnAddr;
38 import org.openhab.binding.lcn.internal.common.LcnAddrGrp;
39 import org.openhab.binding.lcn.internal.common.LcnAddrMod;
40 import org.openhab.binding.lcn.internal.common.LcnDefs;
41 import org.openhab.binding.lcn.internal.common.LcnException;
42 import org.openhab.binding.lcn.internal.common.PckGenerator;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 /**
47  * This class represents a configured connection to one LCN-PCHK.
48  * It uses a {@link AsynchronousSocketChannel} to connect to LCN-PCHK.
49  * Included logic:
50  * <ul>
51  * <li>Reconnection on connection loss
52  * <li>Segment scan (to detect the local segment ID)
53  * <li>Acknowledge handling
54  * <li>Periodic value requests
55  * <li>Caching of runtime data about the underlying LCN bus
56  * </ul>
57  *
58  * @author Fabian Wolter - Initial Contribution
59  */
60 @NonNullByDefault
61 public class Connection {
62     private final Logger logger = LoggerFactory.getLogger(Connection.class);
63     private static final int BROADCAST_MODULE_ID = 3;
64     private static final int BROADCAST_SEGMENT_ID = 3;
65     private final ConnectionSettings settings;
66     private final ConnectionCallback callback;
67     @Nullable
68     private AsynchronousSocketChannel channel;
69     /** The local segment id. -1 means "unknown". */
70     private int localSegId;
71     private final ByteBuffer readBuffer = ByteBuffer.allocate(1024);
72     private final ByteArrayOutputStream sendBuffer = new ByteArrayOutputStream();
73     private final Queue<@Nullable SendData> sendQueue = new LinkedBlockingQueue<>();
74     private final BlockingQueue<PckQueueItem> offlineSendQueue = new LinkedBlockingQueue<>();
75     private final Map<LcnAddr, ModInfo> modData = Collections.synchronizedMap(new HashMap<>());
76     private volatile boolean writeInProgress;
77     private final ScheduledExecutorService scheduler;
78     private final ConnectionStateMachine connectionStateMachine;
79
80     /**
81      * Constructs a clean (disconnected) connection with the given settings.
82      * This does not start the actual connection process.
83      *
84      * @param sets the settings to use for the new connection
85      * @param callback the callback to the owner
86      * @throws IOException
87      */
88     public Connection(ConnectionSettings sets, ScheduledExecutorService scheduler, ConnectionCallback callback) {
89         this.settings = sets;
90         this.callback = callback;
91         this.scheduler = scheduler;
92         this.clearRuntimeData();
93
94         connectionStateMachine = new ConnectionStateMachine(this, scheduler);
95     }
96
97     /** Clears all runtime data. */
98     void clearRuntimeData() {
99         this.channel = null;
100         this.localSegId = -1;
101         this.readBuffer.clear();
102         this.sendQueue.clear();
103         this.sendBuffer.reset();
104     }
105
106     /**
107      * Retrieves the settings for this connection (never changed).
108      *
109      * @return the settings
110      */
111     public ConnectionSettings getSettings() {
112         return this.settings;
113     }
114
115     private boolean isSocketConnected() {
116         try {
117             AsynchronousSocketChannel localChannel = channel;
118             return localChannel != null && localChannel.getRemoteAddress() != null;
119         } catch (IOException e) {
120             return false;
121         }
122     }
123
124     /**
125      * Sets the local segment id.
126      *
127      * @param localSegId the new local segment id
128      */
129     public void setLocalSegId(int localSegId) {
130         this.localSegId = localSegId;
131     }
132
133     /**
134      * Called whenever an acknowledge is received.
135      *
136      * @param addr the source LCN module
137      * @param code the LCN internal code (-1 = "positive")
138      */
139     public void onAck(LcnAddrMod addr, int code) {
140         synchronized (modData) {
141             if (modData.containsKey(addr)) {
142                 ModInfo modInfo = modData.get(addr);
143                 if (modInfo != null) {
144                     modInfo.onAck(code, this, this.settings.getTimeout(), System.nanoTime());
145                 }
146             }
147         }
148     }
149
150     /**
151      * Creates and/or returns cached data for the given LCN module.
152      *
153      * @param addr the module's address
154      * @return the data
155      */
156     public ModInfo updateModuleData(LcnAddrMod addr) {
157         return Objects.requireNonNull(modData.computeIfAbsent(addr, ModInfo::new));
158     }
159
160     /**
161      * Reads and processes input from the underlying channel.
162      * Fragmented input is kept in {@link #readBuffer} and will be processed with the next call.
163      *
164      * @throws IOException if connection was closed or a generic channel error occurred
165      */
166     void readAndProcess() {
167         AsynchronousSocketChannel localChannel = channel;
168         if (localChannel != null && isSocketConnected()) {
169             localChannel.read(readBuffer, null, new CompletionHandler<@Nullable Integer, @Nullable Void>() {
170                 @Override
171                 public void completed(@Nullable Integer transmittedByteCount, @Nullable Void attachment) {
172                     synchronized (Connection.this) {
173                         if (transmittedByteCount == null || transmittedByteCount == -1) {
174                             String msg = "Connection was closed by foreign host.";
175                             connectionStateMachine.handleConnectionFailed(new LcnException(msg));
176                         } else {
177                             // read data chunks from socket and separate frames
178                             readBuffer.flip();
179                             int aPos = readBuffer.position(); // 0
180                             String s = new String(readBuffer.array(), aPos, transmittedByteCount, LcnDefs.LCN_ENCODING);
181                             int pos1 = 0, pos2 = s.indexOf(PckGenerator.TERMINATION, pos1);
182                             while (pos2 != -1) {
183                                 String data = s.substring(pos1, pos2);
184                                 if (logger.isTraceEnabled()) {
185                                     logger.trace("Received: '{}'", data);
186                                 }
187                                 scheduler.submit(() -> {
188                                     connectionStateMachine.onInputReceived(data);
189                                     callback.onPckMessageReceived(data);
190                                 });
191                                 // Seek position in input array
192                                 aPos += s.substring(pos1, pos2 + 1).getBytes(LcnDefs.LCN_ENCODING).length;
193                                 // Next input
194                                 pos1 = pos2 + 1;
195                                 pos2 = s.indexOf(PckGenerator.TERMINATION, pos1);
196                             }
197                             readBuffer.limit(readBuffer.capacity());
198                             readBuffer.position(transmittedByteCount - aPos); // Keeps fragments for the next call
199
200                             if (isSocketConnected()) {
201                                 readAndProcess();
202                             }
203                         }
204                     }
205                 }
206
207                 @Override
208                 public void failed(@Nullable Throwable e, @Nullable Void attachment) {
209                     logger.debug("Lost connection");
210                     connectionStateMachine.handleConnectionFailed(e);
211                 }
212             });
213         } else {
214             connectionStateMachine.handleConnectionFailed(new LcnException("Socket not open"));
215         }
216     }
217
218     /**
219      * Writes all queued data.
220      * Will try to write all data at once to reduce overhead.
221      */
222     public synchronized void triggerWriteToSocket() {
223         AsynchronousSocketChannel localChannel = channel;
224         if (localChannel == null || !isSocketConnected() || writeInProgress) {
225             return;
226         }
227         sendBuffer.reset();
228         SendData item = sendQueue.poll();
229
230         if (item != null) {
231             try {
232                 if (!item.write(sendBuffer, localSegId)) {
233                     logger.warn("Data loss: Could not write packet into send buffer");
234                 }
235
236                 writeInProgress = true;
237                 byte[] data = sendBuffer.toByteArray();
238                 localChannel.write(ByteBuffer.wrap(data), null,
239                         new CompletionHandler<@Nullable Integer, @Nullable Void>() {
240                             @Override
241                             public void completed(@Nullable Integer result, @Nullable Void attachment) {
242                                 synchronized (Connection.this) {
243                                     if (result != data.length) {
244                                         logger.warn("Data loss while writing to channel: {}", settings.getAddress());
245                                     } else {
246                                         if (logger.isTraceEnabled()) {
247                                             logger.trace("Sent: {}", new String(data, 0, data.length));
248                                         }
249                                     }
250
251                                     writeInProgress = false;
252
253                                     if (sendQueue.size() > 0) {
254                                         /**
255                                          * This could lead to stack overflows, since the CompletionHandler may run in
256                                          * the same Thread as triggerWriteToSocket() is invoked (see
257                                          * {@link AsynchronousChannelGroup}/Threading), but we do not expect as much
258                                          * data in one chunk here, that the stack can be filled in a critical way.
259                                          */
260                                         triggerWriteToSocket();
261                                     }
262                                 }
263                             }
264
265                             @Override
266                             public void failed(@Nullable Throwable exc, @Nullable Void attachment) {
267                                 synchronized (Connection.this) {
268                                     if (exc != null) {
269                                         logger.warn("Writing to channel \"{}\" failed: {}", settings.getAddress(),
270                                                 exc.getMessage());
271                                     }
272                                     writeInProgress = false;
273                                     connectionStateMachine.handleConnectionFailed(new LcnException("write() failed"));
274                                 }
275                             }
276                         });
277             } catch (BufferOverflowException | IOException e) {
278                 logger.warn("Sending failed: {}: {}: {}", item, e.getClass().getSimpleName(), e.getMessage());
279             }
280         }
281     }
282
283     /**
284      * Queues plain text to be sent to LCN-PCHK.
285      * Sending will be done the next time {@link #triggerWriteToSocket()} is called.
286      *
287      * @param plainText the text
288      */
289     public void queueDirectlyPlainText(String plainText) {
290         this.queueAndSend(new SendDataPlainText(plainText));
291     }
292
293     /**
294      * Queues a PCK command to be sent.
295      *
296      * @param addr the target LCN address
297      * @param wantsAck true to wait for acknowledge on receipt (should be false for group addresses)
298      * @param pck the pure PCK command (without address header)
299      */
300     void queueDirectly(LcnAddr addr, boolean wantsAck, String pck) {
301         this.queueDirectly(addr, wantsAck, pck.getBytes(LcnDefs.LCN_ENCODING));
302     }
303
304     /**
305      * Queues a PCK command for immediate sending, regardless of the Connection state. The PCK command is automatically
306      * re-sent if the destination is not a group, an Ack is requested and the module did not answer within the expected
307      * time.
308      *
309      * @param addr the target LCN address
310      * @param wantsAck true to wait for acknowledge on receipt (should be false for group addresses)
311      * @param data the pure PCK command (without address header)
312      */
313     void queueDirectly(LcnAddr addr, boolean wantsAck, byte[] data) {
314         if (!addr.isGroup() && wantsAck) {
315             this.updateModuleData((LcnAddrMod) addr).queuePckCommandWithAck(data, this, this.settings.getTimeout(),
316                     System.nanoTime());
317         } else {
318             this.queueAndSend(new SendDataPck(addr, false, data));
319         }
320     }
321
322     /**
323      * Enqueues a raw PCK command and triggers the socket to start sending, if it does not already. Does not take care
324      * of any Acks.
325      *
326      * @param data raw PCK command
327      */
328     synchronized void queueAndSend(SendData data) {
329         this.sendQueue.add(data);
330
331         triggerWriteToSocket();
332     }
333
334     /**
335      * Enqueues a PCK command to the offline queue. Data will be sent when the Connection state will enter
336      * {@link ConnectionStateConnected}.
337      *
338      * @param addr LCN module address
339      * @param wantsAck true, if the LCN module shall respond with an Ack on successful processing
340      * @param data the pure PCK command (without address header)
341      */
342     void queueOffline(LcnAddr addr, boolean wantsAck, byte[] data) {
343         offlineSendQueue.add(new PckQueueItem(addr, wantsAck, data));
344     }
345
346     /**
347      * Enqueues a PCK command for sending. Takes care of the Connection state and buffers the command for a specific
348      * time if the Connection is not ready. If an Ack is requested, the PCK command is automatically
349      * re-sent, if the module did not answer in the expected time.
350      *
351      * @param addr LCN module address
352      * @param wantsAck true, if the LCN module shall respond with an Ack on successful processing
353      * @param pck the pure PCK command (without address header)
354      */
355     public void queue(LcnAddr addr, boolean wantsAck, String pck) {
356         this.queue(addr, wantsAck, pck.getBytes(LcnDefs.LCN_ENCODING));
357     }
358
359     /**
360      * Enqueues a PCK command for sending. Takes care of the Connection state and buffers the command for a specific
361      * time if the Connection is not ready. If an Ack is requested, the PCK command is automatically
362      * re-sent, if the module did not answer in the expected time.
363      *
364      * @param addr LCN module address
365      * @param wantsAck true, if the LCN module shall respond with an Ack on successful processing
366      * @param pck the pure PCK command (without address header)
367      */
368     public void queue(LcnAddr addr, boolean wantsAck, byte[] pck) {
369         connectionStateMachine.queue(addr, wantsAck, pck);
370     }
371
372     /**
373      * Process the offline PCK command queue. Does only send recently enqueued PCK commands, the rest is discarded.
374      */
375     void sendOfflineQueue() {
376         List<PckQueueItem> allItems = new ArrayList<>(offlineSendQueue.size());
377         offlineSendQueue.drainTo(allItems);
378
379         allItems.forEach(item -> {
380             // only send messages that were enqueued recently, discard older messages
381             long timeout = settings.getTimeout();
382             if (item.getEnqueued().isAfter(Instant.now().minus(timeout * 4, ChronoUnit.MILLIS))) {
383                 queueDirectly(item.getAddr(), item.isWantsAck(), item.getData());
384             }
385         });
386     }
387
388     /**
389      * Gets the Connection's callback.
390      *
391      * @return the callback
392      */
393     public ConnectionCallback getCallback() {
394         return callback;
395     }
396
397     /**
398      * Sets the SocketChannel of this Connection
399      *
400      * @param channel the new Channel
401      */
402     public void setSocketChannel(AsynchronousSocketChannel channel) {
403         this.channel = channel;
404     }
405
406     /**
407      * Gets the SocketChannel of the Connection.
408      *
409      * @returnthe socket channel
410      */
411     @Nullable
412     public Channel getSocketChannel() {
413         return channel;
414     }
415
416     /**
417      * Gets the local segment ID. When no segments are used, the local segment ID is 0.
418      *
419      * @return the local segment ID
420      */
421     public int getLocalSegId() {
422         return localSegId;
423     }
424
425     /**
426      * Runs the periodic updates on all ModInfos.
427      */
428     public void updateModInfos() {
429         synchronized (modData) {
430             modData.values().forEach(i -> i.update(this, settings.getTimeout(), System.nanoTime()));
431         }
432     }
433
434     /**
435      * Removes an LCN module from the ModData list.
436      *
437      * @param addr the module's address to be removed
438      */
439     public void removeLcnModule(LcnAddr addr) {
440         modData.remove(addr);
441     }
442
443     /**
444      * Invoked when this Connection shall be shut-down finally.
445      */
446     public void shutdown() {
447         connectionStateMachine.shutdownFinally();
448     }
449
450     /**
451      * Sends a broadcast to all LCN modules with a reuqest to respond with an Ack.
452      */
453     public void sendModuleDiscoveryCommand() {
454         queueAndSend(new SendDataPck(new LcnAddrGrp(BROADCAST_SEGMENT_ID, BROADCAST_MODULE_ID), true,
455                 PckGenerator.nullCommand().getBytes(LcnDefs.LCN_ENCODING)));
456         queueAndSend(new SendDataPck(new LcnAddrGrp(0, BROADCAST_MODULE_ID), true,
457                 PckGenerator.nullCommand().getBytes(LcnDefs.LCN_ENCODING)));
458     }
459
460     /**
461      * Requests the serial number and the firmware version of the given LCN module.
462      *
463      * @param addr module's address
464      */
465     public void sendSerialNumberRequest(LcnAddrMod addr) {
466         queueDirectly(addr, false, PckGenerator.requestSn());
467     }
468
469     /**
470      * Requests theprogrammed name of the given LCN module.
471      *
472      * @param addr module's address
473      */
474     public void sendModuleNameRequest(LcnAddrMod addr) {
475         queueDirectly(addr, false, PckGenerator.requestModuleName(0));
476         queueDirectly(addr, false, PckGenerator.requestModuleName(1));
477     }
478 }