]> git.basschouten.com Git - openhab-addons.git/blob
b53fd4a432be2d8df6898264023241c889e458ee
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2021 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.lcn.internal.connection;
14
15 import java.io.ByteArrayOutputStream;
16 import java.io.IOException;
17 import java.nio.BufferOverflowException;
18 import java.nio.ByteBuffer;
19 import java.nio.channels.AsynchronousSocketChannel;
20 import java.nio.channels.Channel;
21 import java.nio.channels.CompletionHandler;
22 import java.time.Instant;
23 import java.time.temporal.ChronoUnit;
24 import java.util.*;
25 import java.util.concurrent.BlockingQueue;
26 import java.util.concurrent.LinkedBlockingQueue;
27 import java.util.concurrent.ScheduledExecutorService;
28
29 import org.eclipse.jdt.annotation.NonNullByDefault;
30 import org.eclipse.jdt.annotation.Nullable;
31 import org.openhab.binding.lcn.internal.common.LcnAddr;
32 import org.openhab.binding.lcn.internal.common.LcnAddrGrp;
33 import org.openhab.binding.lcn.internal.common.LcnAddrMod;
34 import org.openhab.binding.lcn.internal.common.LcnDefs;
35 import org.openhab.binding.lcn.internal.common.LcnException;
36 import org.openhab.binding.lcn.internal.common.PckGenerator;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 /**
41  * This class represents a configured connection to one LCN-PCHK.
42  * It uses a {@link AsynchronousSocketChannel} to connect to LCN-PCHK.
43  * Included logic:
44  * <ul>
45  * <li>Reconnection on connection loss
46  * <li>Segment scan (to detect the local segment ID)
47  * <li>Acknowledge handling
48  * <li>Periodic value requests
49  * <li>Caching of runtime data about the underlying LCN bus
50  * </ul>
51  *
52  * @author Fabian Wolter - Initial Contribution
53  */
54 @NonNullByDefault
55 public class Connection {
56     private final Logger logger = LoggerFactory.getLogger(Connection.class);
57     private static final int BROADCAST_MODULE_ID = 3;
58     private static final int BROADCAST_SEGMENT_ID = 3;
59     private final ConnectionSettings settings;
60     private final ConnectionCallback callback;
61     @Nullable
62     private AsynchronousSocketChannel channel;
63     /** The local segment id. -1 means "unknown". */
64     private int localSegId;
65     private final ByteBuffer readBuffer = ByteBuffer.allocate(1024);
66     private final ByteArrayOutputStream sendBuffer = new ByteArrayOutputStream();
67     private final Queue<@Nullable SendData> sendQueue = new LinkedBlockingQueue<>();
68     private final BlockingQueue<PckQueueItem> offlineSendQueue = new LinkedBlockingQueue<>();
69     private final Map<LcnAddr, ModInfo> modData = Collections.synchronizedMap(new HashMap<>());
70     private volatile boolean writeInProgress;
71     private final ScheduledExecutorService scheduler;
72     private final ConnectionStateMachine connectionStateMachine;
73
74     /**
75      * Constructs a clean (disconnected) connection with the given settings.
76      * This does not start the actual connection process.
77      *
78      * @param sets the settings to use for the new connection
79      * @param callback the callback to the owner
80      * @throws IOException
81      */
82     public Connection(ConnectionSettings sets, ScheduledExecutorService scheduler, ConnectionCallback callback) {
83         this.settings = sets;
84         this.callback = callback;
85         this.scheduler = scheduler;
86         this.clearRuntimeData();
87
88         connectionStateMachine = new ConnectionStateMachine(this, scheduler);
89     }
90
91     /** Clears all runtime data. */
92     void clearRuntimeData() {
93         this.channel = null;
94         this.localSegId = -1;
95         this.readBuffer.clear();
96         this.sendQueue.clear();
97         this.sendBuffer.reset();
98     }
99
100     /**
101      * Retrieves the settings for this connection (never changed).
102      *
103      * @return the settings
104      */
105     public ConnectionSettings getSettings() {
106         return this.settings;
107     }
108
109     private boolean isSocketConnected() {
110         try {
111             AsynchronousSocketChannel localChannel = channel;
112             return localChannel != null && localChannel.getRemoteAddress() != null;
113         } catch (IOException e) {
114             return false;
115         }
116     }
117
118     /**
119      * Sets the local segment id.
120      *
121      * @param localSegId the new local segment id
122      */
123     public void setLocalSegId(int localSegId) {
124         this.localSegId = localSegId;
125     }
126
127     /**
128      * Called whenever an acknowledge is received.
129      *
130      * @param addr the source LCN module
131      * @param code the LCN internal code (-1 = "positive")
132      */
133     public void onAck(LcnAddrMod addr, int code) {
134         synchronized (modData) {
135             if (modData.containsKey(addr)) {
136                 modData.get(addr).onAck(code, this, this.settings.getTimeout(), System.nanoTime());
137             }
138         }
139     }
140
141     /**
142      * Creates and/or returns cached data for the given LCN module.
143      *
144      * @param addr the module's address
145      * @return the data
146      */
147     public ModInfo updateModuleData(LcnAddrMod addr) {
148         return Objects.requireNonNull(modData.computeIfAbsent(addr, ModInfo::new));
149     }
150
151     /**
152      * Reads and processes input from the underlying channel.
153      * Fragmented input is kept in {@link #readBuffer} and will be processed with the next call.
154      *
155      * @throws IOException if connection was closed or a generic channel error occurred
156      */
157     void readAndProcess() {
158         AsynchronousSocketChannel localChannel = channel;
159         if (localChannel != null && isSocketConnected()) {
160             localChannel.read(readBuffer, null, new CompletionHandler<@Nullable Integer, @Nullable Void>() {
161                 @Override
162                 public void completed(@Nullable Integer transmittedByteCount, @Nullable Void attachment) {
163                     synchronized (Connection.this) {
164                         if (transmittedByteCount == null || transmittedByteCount == -1) {
165                             String msg = "Connection was closed by foreign host.";
166                             connectionStateMachine.handleConnectionFailed(new LcnException(msg));
167                         } else {
168                             // read data chunks from socket and separate frames
169                             readBuffer.flip();
170                             int aPos = readBuffer.position(); // 0
171                             String s = new String(readBuffer.array(), aPos, transmittedByteCount, LcnDefs.LCN_ENCODING);
172                             int pos1 = 0, pos2 = s.indexOf(PckGenerator.TERMINATION, pos1);
173                             while (pos2 != -1) {
174                                 String data = s.substring(pos1, pos2);
175                                 if (logger.isTraceEnabled()) {
176                                     logger.trace("Received: '{}'", data);
177                                 }
178                                 scheduler.submit(() -> {
179                                     connectionStateMachine.onInputReceived(data);
180                                     callback.onPckMessageReceived(data);
181                                 });
182                                 // Seek position in input array
183                                 aPos += s.substring(pos1, pos2 + 1).getBytes(LcnDefs.LCN_ENCODING).length;
184                                 // Next input
185                                 pos1 = pos2 + 1;
186                                 pos2 = s.indexOf(PckGenerator.TERMINATION, pos1);
187                             }
188                             readBuffer.limit(readBuffer.capacity());
189                             readBuffer.position(transmittedByteCount - aPos); // Keeps fragments for the next call
190
191                             if (isSocketConnected()) {
192                                 readAndProcess();
193                             }
194                         }
195                     }
196                 }
197
198                 @Override
199                 public void failed(@Nullable Throwable e, @Nullable Void attachment) {
200                     logger.debug("Lost connection");
201                     connectionStateMachine.handleConnectionFailed(e);
202                 }
203             });
204         } else {
205             connectionStateMachine.handleConnectionFailed(new LcnException("Socket not open"));
206         }
207     }
208
209     /**
210      * Writes all queued data.
211      * Will try to write all data at once to reduce overhead.
212      */
213     public synchronized void triggerWriteToSocket() {
214         AsynchronousSocketChannel localChannel = channel;
215         if (localChannel == null || !isSocketConnected() || writeInProgress) {
216             return;
217         }
218         sendBuffer.reset();
219         SendData item = sendQueue.poll();
220
221         if (item != null) {
222             try {
223                 if (!item.write(sendBuffer, localSegId)) {
224                     logger.warn("Data loss: Could not write packet into send buffer");
225                 }
226
227                 writeInProgress = true;
228                 byte[] data = sendBuffer.toByteArray();
229                 localChannel.write(ByteBuffer.wrap(data), null,
230                         new CompletionHandler<@Nullable Integer, @Nullable Void>() {
231                             @Override
232                             public void completed(@Nullable Integer result, @Nullable Void attachment) {
233                                 synchronized (Connection.this) {
234                                     if (result != data.length) {
235                                         logger.warn("Data loss while writing to channel: {}", settings.getAddress());
236                                     } else {
237                                         if (logger.isTraceEnabled()) {
238                                             logger.trace("Sent: {}", new String(data, 0, data.length));
239                                         }
240                                     }
241
242                                     writeInProgress = false;
243
244                                     if (sendQueue.size() > 0) {
245                                         /**
246                                          * This could lead to stack overflows, since the CompletionHandler may run in
247                                          * the same Thread as triggerWriteToSocket() is invoked (see
248                                          * {@link AsynchronousChannelGroup}/Threading), but we do not expect as much
249                                          * data in one chunk here, that the stack can be filled in a critical way.
250                                          */
251                                         triggerWriteToSocket();
252                                     }
253                                 }
254                             }
255
256                             @Override
257                             public void failed(@Nullable Throwable exc, @Nullable Void attachment) {
258                                 synchronized (Connection.this) {
259                                     if (exc != null) {
260                                         logger.warn("Writing to channel \"{}\" failed: {}", settings.getAddress(),
261                                                 exc.getMessage());
262                                     }
263                                     writeInProgress = false;
264                                     connectionStateMachine.handleConnectionFailed(new LcnException("write() failed"));
265                                 }
266                             }
267                         });
268             } catch (BufferOverflowException | IOException e) {
269                 logger.warn("Sending failed: {}: {}: {}", item, e.getClass().getSimpleName(), e.getMessage());
270             }
271         }
272     }
273
274     /**
275      * Queues plain text to be sent to LCN-PCHK.
276      * Sending will be done the next time {@link #triggerWriteToSocket()} is called.
277      *
278      * @param plainText the text
279      */
280     public void queueDirectlyPlainText(String plainText) {
281         this.queueAndSend(new SendDataPlainText(plainText));
282     }
283
284     /**
285      * Queues a PCK command to be sent.
286      *
287      * @param addr the target LCN address
288      * @param wantsAck true to wait for acknowledge on receipt (should be false for group addresses)
289      * @param pck the pure PCK command (without address header)
290      */
291     void queueDirectly(LcnAddr addr, boolean wantsAck, String pck) {
292         this.queueDirectly(addr, wantsAck, pck.getBytes(LcnDefs.LCN_ENCODING));
293     }
294
295     /**
296      * Queues a PCK command for immediate sending, regardless of the Connection state. The PCK command is automatically
297      * re-sent if the destination is not a group, an Ack is requested and the module did not answer within the expected
298      * time.
299      *
300      * @param addr the target LCN address
301      * @param wantsAck true to wait for acknowledge on receipt (should be false for group addresses)
302      * @param data the pure PCK command (without address header)
303      */
304     void queueDirectly(LcnAddr addr, boolean wantsAck, byte[] data) {
305         if (!addr.isGroup() && wantsAck) {
306             this.updateModuleData((LcnAddrMod) addr).queuePckCommandWithAck(data, this, this.settings.getTimeout(),
307                     System.nanoTime());
308         } else {
309             this.queueAndSend(new SendDataPck(addr, false, data));
310         }
311     }
312
313     /**
314      * Enqueues a raw PCK command and triggers the socket to start sending, if it does not already. Does not take care
315      * of any Acks.
316      *
317      * @param data raw PCK command
318      */
319     synchronized void queueAndSend(SendData data) {
320         this.sendQueue.add(data);
321
322         triggerWriteToSocket();
323     }
324
325     /**
326      * Enqueues a PCK command to the offline queue. Data will be sent when the Connection state will enter
327      * {@link ConnectionStateConnected}.
328      *
329      * @param addr LCN module address
330      * @param wantsAck true, if the LCN module shall respond with an Ack on successful processing
331      * @param data the pure PCK command (without address header)
332      */
333     void queueOffline(LcnAddr addr, boolean wantsAck, byte[] data) {
334         offlineSendQueue.add(new PckQueueItem(addr, wantsAck, data));
335     }
336
337     /**
338      * Enqueues a PCK command for sending. Takes care of the Connection state and buffers the command for a specific
339      * time if the Connection is not ready. If an Ack is requested, the PCK command is automatically
340      * re-sent, if the module did not answer in the expected time.
341      *
342      * @param addr LCN module address
343      * @param wantsAck true, if the LCN module shall respond with an Ack on successful processing
344      * @param pck the pure PCK command (without address header)
345      */
346     public void queue(LcnAddr addr, boolean wantsAck, String pck) {
347         this.queue(addr, wantsAck, pck.getBytes(LcnDefs.LCN_ENCODING));
348     }
349
350     /**
351      * Enqueues a PCK command for sending. Takes care of the Connection state and buffers the command for a specific
352      * time if the Connection is not ready. If an Ack is requested, the PCK command is automatically
353      * re-sent, if the module did not answer in the expected time.
354      *
355      * @param addr LCN module address
356      * @param wantsAck true, if the LCN module shall respond with an Ack on successful processing
357      * @param pck the pure PCK command (without address header)
358      */
359     public void queue(LcnAddr addr, boolean wantsAck, byte[] pck) {
360         connectionStateMachine.queue(addr, wantsAck, pck);
361     }
362
363     /**
364      * Process the offline PCK command queue. Does only send recently enqueued PCK commands, the rest is discarded.
365      */
366     void sendOfflineQueue() {
367         List<PckQueueItem> allItems = new ArrayList<>(offlineSendQueue.size());
368         offlineSendQueue.drainTo(allItems);
369
370         allItems.forEach(item -> {
371             // only send messages that were enqueued recently, discard older messages
372             long timeout = settings.getTimeout();
373             if (item.getEnqueued().isAfter(Instant.now().minus(timeout * 4, ChronoUnit.MILLIS))) {
374                 queueDirectly(item.getAddr(), item.isWantsAck(), item.getData());
375             }
376         });
377     }
378
379     /**
380      * Gets the Connection's callback.
381      *
382      * @return the callback
383      */
384     public ConnectionCallback getCallback() {
385         return callback;
386     }
387
388     /**
389      * Sets the SocketChannel of this Connection
390      *
391      * @param channel the new Channel
392      */
393     public void setSocketChannel(AsynchronousSocketChannel channel) {
394         this.channel = channel;
395     }
396
397     /**
398      * Gets the SocketChannel of the Connection.
399      *
400      * @returnthe socket channel
401      */
402     @Nullable
403     public Channel getSocketChannel() {
404         return channel;
405     }
406
407     /**
408      * Gets the local segment ID. When no segments are used, the local segment ID is 0.
409      *
410      * @return the local segment ID
411      */
412     public int getLocalSegId() {
413         return localSegId;
414     }
415
416     /**
417      * Runs the periodic updates on all ModInfos.
418      */
419     public void updateModInfos() {
420         synchronized (modData) {
421             modData.values().forEach(i -> i.update(this, settings.getTimeout(), System.nanoTime()));
422         }
423     }
424
425     /**
426      * Removes an LCN module from the ModData list.
427      *
428      * @param addr the module's address to be removed
429      */
430     public void removeLcnModule(LcnAddr addr) {
431         modData.remove(addr);
432     }
433
434     /**
435      * Invoked when this Connection shall be shut-down finally.
436      */
437     public void shutdown() {
438         connectionStateMachine.shutdownFinally();
439     }
440
441     /**
442      * Sends a broadcast to all LCN modules with a reuqest to respond with an Ack.
443      */
444     public void sendModuleDiscoveryCommand() {
445         queueAndSend(new SendDataPck(new LcnAddrGrp(BROADCAST_SEGMENT_ID, BROADCAST_MODULE_ID), true,
446                 PckGenerator.nullCommand().getBytes(LcnDefs.LCN_ENCODING)));
447         queueAndSend(new SendDataPck(new LcnAddrGrp(0, BROADCAST_MODULE_ID), true,
448                 PckGenerator.nullCommand().getBytes(LcnDefs.LCN_ENCODING)));
449     }
450
451     /**
452      * Requests the serial number and the firmware version of the given LCN module.
453      *
454      * @param addr module's address
455      */
456     public void sendSerialNumberRequest(LcnAddrMod addr) {
457         queueDirectly(addr, false, PckGenerator.requestSn());
458     }
459
460     /**
461      * Requests theprogrammed name of the given LCN module.
462      *
463      * @param addr module's address
464      */
465     public void sendModuleNameRequest(LcnAddrMod addr) {
466         queueDirectly(addr, false, PckGenerator.requestModuleName(0));
467         queueDirectly(addr, false, PckGenerator.requestModuleName(1));
468     }
469 }