]> git.basschouten.com Git - openhab-addons.git/blob
f89a7051c1e8c057325a2aafbe8d371416528e83
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.lcn.internal.connection;
14
15 import java.io.ByteArrayOutputStream;
16 import java.io.IOException;
17 import java.nio.BufferOverflowException;
18 import java.nio.ByteBuffer;
19 import java.nio.channels.AsynchronousSocketChannel;
20 import java.nio.channels.Channel;
21 import java.nio.channels.CompletionHandler;
22 import java.time.Instant;
23 import java.time.temporal.ChronoUnit;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Queue;
30 import java.util.concurrent.BlockingQueue;
31 import java.util.concurrent.LinkedBlockingQueue;
32 import java.util.concurrent.ScheduledExecutorService;
33
34 import org.eclipse.jdt.annotation.NonNullByDefault;
35 import org.eclipse.jdt.annotation.Nullable;
36 import org.openhab.binding.lcn.internal.common.LcnAddr;
37 import org.openhab.binding.lcn.internal.common.LcnAddrGrp;
38 import org.openhab.binding.lcn.internal.common.LcnAddrMod;
39 import org.openhab.binding.lcn.internal.common.LcnDefs;
40 import org.openhab.binding.lcn.internal.common.LcnException;
41 import org.openhab.binding.lcn.internal.common.PckGenerator;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 /**
46  * This class represents a configured connection to one LCN-PCHK.
47  * It uses a {@link AsynchronousSocketChannel} to connect to LCN-PCHK.
48  * Included logic:
49  * <ul>
50  * <li>Reconnection on connection loss
51  * <li>Segment scan (to detect the local segment ID)
52  * <li>Acknowledge handling
53  * <li>Periodic value requests
54  * <li>Caching of runtime data about the underlying LCN bus
55  * </ul>
56  *
57  * @author Fabian Wolter - Initial Contribution
58  */
59 @NonNullByDefault
60 public class Connection {
61     private final Logger logger = LoggerFactory.getLogger(Connection.class);
62     private static final int BROADCAST_MODULE_ID = 3;
63     private static final int BROADCAST_SEGMENT_ID = 3;
64     private final ConnectionSettings settings;
65     private final ConnectionCallback callback;
66     @Nullable
67     private AsynchronousSocketChannel channel;
68     /** The local segment id. -1 means "unknown". */
69     private int localSegId;
70     private final ByteBuffer readBuffer = ByteBuffer.allocate(1024);
71     private final ByteArrayOutputStream sendBuffer = new ByteArrayOutputStream();
72     private final Queue<@Nullable SendData> sendQueue = new LinkedBlockingQueue<>();
73     private final BlockingQueue<PckQueueItem> offlineSendQueue = new LinkedBlockingQueue<>();
74     private final Map<LcnAddr, ModInfo> modData = Collections.synchronizedMap(new HashMap<>());
75     private volatile boolean writeInProgress;
76     private final ScheduledExecutorService scheduler;
77     private final ConnectionStateMachine connectionStateMachine;
78
79     /**
80      * Constructs a clean (disconnected) connection with the given settings.
81      * This does not start the actual connection process.
82      *
83      * @param sets the settings to use for the new connection
84      * @param callback the callback to the owner
85      * @throws IOException
86      */
87     public Connection(ConnectionSettings sets, ScheduledExecutorService scheduler, ConnectionCallback callback) {
88         this.settings = sets;
89         this.callback = callback;
90         this.scheduler = scheduler;
91         this.clearRuntimeData();
92
93         connectionStateMachine = new ConnectionStateMachine(this, scheduler);
94     }
95
96     /** Clears all runtime data. */
97     void clearRuntimeData() {
98         this.channel = null;
99         this.localSegId = -1;
100         this.readBuffer.clear();
101         this.sendQueue.clear();
102         this.sendBuffer.reset();
103     }
104
105     /**
106      * Retrieves the settings for this connection (never changed).
107      *
108      * @return the settings
109      */
110     public ConnectionSettings getSettings() {
111         return this.settings;
112     }
113
114     private boolean isSocketConnected() {
115         try {
116             AsynchronousSocketChannel localChannel = channel;
117             return localChannel != null && localChannel.getRemoteAddress() != null;
118         } catch (IOException e) {
119             return false;
120         }
121     }
122
123     /**
124      * Sets the local segment id.
125      *
126      * @param localSegId the new local segment id
127      */
128     public void setLocalSegId(int localSegId) {
129         this.localSegId = localSegId;
130     }
131
132     /**
133      * Called whenever an acknowledge is received.
134      *
135      * @param addr the source LCN module
136      * @param code the LCN internal code (-1 = "positive")
137      */
138     public void onAck(LcnAddrMod addr, int code) {
139         synchronized (modData) {
140             if (modData.containsKey(addr)) {
141                 modData.get(addr).onAck(code, this, this.settings.getTimeout(), System.nanoTime());
142             }
143         }
144     }
145
146     /**
147      * Creates and/or returns cached data for the given LCN module.
148      *
149      * @param addr the module's address
150      * @return the data
151      */
152     public ModInfo updateModuleData(LcnAddrMod addr) {
153         return modData.computeIfAbsent(addr, ModInfo::new);
154     }
155
156     /**
157      * Reads and processes input from the underlying channel.
158      * Fragmented input is kept in {@link #readBuffer} and will be processed with the next call.
159      *
160      * @throws IOException if connection was closed or a generic channel error occurred
161      */
162     void readAndProcess() {
163         AsynchronousSocketChannel localChannel = channel;
164         if (localChannel != null && isSocketConnected()) {
165             localChannel.read(readBuffer, null, new CompletionHandler<@Nullable Integer, @Nullable Void>() {
166                 @Override
167                 public void completed(@Nullable Integer transmittedByteCount, @Nullable Void attachment) {
168                     synchronized (Connection.this) {
169                         if (transmittedByteCount == null || transmittedByteCount == -1) {
170                             String msg = "Connection was closed by foreign host.";
171                             connectionStateMachine.handleConnectionFailed(new LcnException(msg));
172                         } else {
173                             // read data chunks from socket and separate frames
174                             readBuffer.flip();
175                             int aPos = readBuffer.position(); // 0
176                             String s = new String(readBuffer.array(), aPos, transmittedByteCount, LcnDefs.LCN_ENCODING);
177                             int pos1 = 0, pos2 = s.indexOf(PckGenerator.TERMINATION, pos1);
178                             while (pos2 != -1) {
179                                 String data = s.substring(pos1, pos2);
180                                 if (logger.isTraceEnabled()) {
181                                     logger.trace("Received: '{}'", data);
182                                 }
183                                 scheduler.submit(() -> {
184                                     connectionStateMachine.onInputReceived(data);
185                                     callback.onPckMessageReceived(data);
186                                 });
187                                 // Seek position in input array
188                                 aPos += s.substring(pos1, pos2 + 1).getBytes(LcnDefs.LCN_ENCODING).length;
189                                 // Next input
190                                 pos1 = pos2 + 1;
191                                 pos2 = s.indexOf(PckGenerator.TERMINATION, pos1);
192                             }
193                             readBuffer.limit(readBuffer.capacity());
194                             readBuffer.position(transmittedByteCount - aPos); // Keeps fragments for the next call
195
196                             if (isSocketConnected()) {
197                                 readAndProcess();
198                             }
199                         }
200                     }
201                 }
202
203                 @Override
204                 public void failed(@Nullable Throwable e, @Nullable Void attachment) {
205                     logger.debug("Lost connection");
206                     connectionStateMachine.handleConnectionFailed(e);
207                 }
208             });
209         } else {
210             connectionStateMachine.handleConnectionFailed(new LcnException("Socket not open"));
211         }
212     }
213
214     /**
215      * Writes all queued data.
216      * Will try to write all data at once to reduce overhead.
217      */
218     public synchronized void triggerWriteToSocket() {
219         AsynchronousSocketChannel localChannel = channel;
220         if (localChannel == null || !isSocketConnected() || writeInProgress) {
221             return;
222         }
223         sendBuffer.reset();
224         SendData item = sendQueue.poll();
225
226         if (item != null) {
227             try {
228                 if (!item.write(sendBuffer, localSegId)) {
229                     logger.warn("Data loss: Could not write packet into send buffer");
230                 }
231
232                 writeInProgress = true;
233                 byte[] data = sendBuffer.toByteArray();
234                 localChannel.write(ByteBuffer.wrap(data), null,
235                         new CompletionHandler<@Nullable Integer, @Nullable Void>() {
236                             @Override
237                             public void completed(@Nullable Integer result, @Nullable Void attachment) {
238                                 synchronized (Connection.this) {
239                                     if (result != data.length) {
240                                         logger.warn("Data loss while writing to channel: {}", settings.getAddress());
241                                     } else {
242                                         if (logger.isTraceEnabled()) {
243                                             logger.trace("Sent: {}", new String(data, 0, data.length));
244                                         }
245                                     }
246
247                                     writeInProgress = false;
248
249                                     if (sendQueue.size() > 0) {
250                                         /**
251                                          * This could lead to stack overflows, since the CompletionHandler may run in
252                                          * the same Thread as triggerWriteToSocket() is invoked (see
253                                          * {@link AsynchronousChannelGroup}/Threading), but we do not expect as much
254                                          * data in one chunk here, that the stack can be filled in a critical way.
255                                          */
256                                         triggerWriteToSocket();
257                                     }
258                                 }
259                             }
260
261                             @Override
262                             public void failed(@Nullable Throwable exc, @Nullable Void attachment) {
263                                 synchronized (Connection.this) {
264                                     if (exc != null) {
265                                         logger.warn("Writing to channel \"{}\" failed: {}", settings.getAddress(),
266                                                 exc.getMessage());
267                                     }
268                                     writeInProgress = false;
269                                     connectionStateMachine.handleConnectionFailed(new LcnException("write() failed"));
270                                 }
271                             }
272                         });
273             } catch (BufferOverflowException | IOException e) {
274                 logger.warn("Sending failed: {}: {}: {}", item, e.getClass().getSimpleName(), e.getMessage());
275             }
276         }
277     }
278
279     /**
280      * Queues plain text to be sent to LCN-PCHK.
281      * Sending will be done the next time {@link #triggerWriteToSocket()} is called.
282      *
283      * @param plainText the text
284      */
285     public void queueDirectlyPlainText(String plainText) {
286         this.queueAndSend(new SendDataPlainText(plainText));
287     }
288
289     /**
290      * Queues a PCK command to be sent.
291      *
292      * @param addr the target LCN address
293      * @param wantsAck true to wait for acknowledge on receipt (should be false for group addresses)
294      * @param pck the pure PCK command (without address header)
295      */
296     void queueDirectly(LcnAddr addr, boolean wantsAck, String pck) {
297         this.queueDirectly(addr, wantsAck, pck.getBytes(LcnDefs.LCN_ENCODING));
298     }
299
300     /**
301      * Queues a PCK command for immediate sending, regardless of the Connection state. The PCK command is automatically
302      * re-sent if the destination is not a group, an Ack is requested and the module did not answer within the expected
303      * time.
304      *
305      * @param addr the target LCN address
306      * @param wantsAck true to wait for acknowledge on receipt (should be false for group addresses)
307      * @param data the pure PCK command (without address header)
308      */
309     void queueDirectly(LcnAddr addr, boolean wantsAck, byte[] data) {
310         if (!addr.isGroup() && wantsAck) {
311             this.updateModuleData((LcnAddrMod) addr).queuePckCommandWithAck(data, this, this.settings.getTimeout(),
312                     System.nanoTime());
313         } else {
314             this.queueAndSend(new SendDataPck(addr, false, data));
315         }
316     }
317
318     /**
319      * Enqueues a raw PCK command and triggers the socket to start sending, if it does not already. Does not take care
320      * of any Acks.
321      *
322      * @param data raw PCK command
323      */
324     synchronized void queueAndSend(SendData data) {
325         this.sendQueue.add(data);
326
327         triggerWriteToSocket();
328     }
329
330     /**
331      * Enqueues a PCK command to the offline queue. Data will be sent when the Connection state will enter
332      * {@link ConnectionStateConnected}.
333      *
334      * @param addr LCN module address
335      * @param wantsAck true, if the LCN module shall respond with an Ack on successful processing
336      * @param data the pure PCK command (without address header)
337      */
338     void queueOffline(LcnAddr addr, boolean wantsAck, byte[] data) {
339         offlineSendQueue.add(new PckQueueItem(addr, wantsAck, data));
340     }
341
342     /**
343      * Enqueues a PCK command for sending. Takes care of the Connection state and buffers the command for a specific
344      * time if the Connection is not ready. If an Ack is requested, the PCK command is automatically
345      * re-sent, if the module did not answer in the expected time.
346      *
347      * @param addr LCN module address
348      * @param wantsAck true, if the LCN module shall respond with an Ack on successful processing
349      * @param pck the pure PCK command (without address header)
350      */
351     public void queue(LcnAddr addr, boolean wantsAck, String pck) {
352         this.queue(addr, wantsAck, pck.getBytes(LcnDefs.LCN_ENCODING));
353     }
354
355     /**
356      * Enqueues a PCK command for sending. Takes care of the Connection state and buffers the command for a specific
357      * time if the Connection is not ready. If an Ack is requested, the PCK command is automatically
358      * re-sent, if the module did not answer in the expected time.
359      *
360      * @param addr LCN module address
361      * @param wantsAck true, if the LCN module shall respond with an Ack on successful processing
362      * @param pck the pure PCK command (without address header)
363      */
364     public void queue(LcnAddr addr, boolean wantsAck, byte[] pck) {
365         connectionStateMachine.queue(addr, wantsAck, pck);
366     }
367
368     /**
369      * Process the offline PCK command queue. Does only send recently enqueued PCK commands, the rest is discarded.
370      */
371     void sendOfflineQueue() {
372         List<PckQueueItem> allItems = new ArrayList<>(offlineSendQueue.size());
373         offlineSendQueue.drainTo(allItems);
374
375         allItems.forEach(item -> {
376             // only send messages that were enqueued recently, discard older messages
377             long timeout = settings.getTimeout();
378             if (item.getEnqueued().isAfter(Instant.now().minus(timeout * 4, ChronoUnit.MILLIS))) {
379                 queueDirectly(item.getAddr(), item.isWantsAck(), item.getData());
380             }
381         });
382     }
383
384     /**
385      * Gets the Connection's callback.
386      *
387      * @return the callback
388      */
389     public ConnectionCallback getCallback() {
390         return callback;
391     }
392
393     /**
394      * Sets the SocketChannel of this Connection
395      *
396      * @param channel the new Channel
397      */
398     public void setSocketChannel(AsynchronousSocketChannel channel) {
399         this.channel = channel;
400     }
401
402     /**
403      * Gets the SocketChannel of the Connection.
404      *
405      * @returnthe socket channel
406      */
407     @Nullable
408     public Channel getSocketChannel() {
409         return channel;
410     }
411
412     /**
413      * Gets the local segment ID. When no segments are used, the local segment ID is 0.
414      *
415      * @return the local segment ID
416      */
417     public int getLocalSegId() {
418         return localSegId;
419     }
420
421     /**
422      * Runs the periodic updates on all ModInfos.
423      */
424     public void updateModInfos() {
425         synchronized (modData) {
426             modData.values().forEach(i -> i.update(this, settings.getTimeout(), System.nanoTime()));
427         }
428     }
429
430     /**
431      * Removes an LCN module from the ModData list.
432      *
433      * @param addr the module's address to be removed
434      */
435     public void removeLcnModule(LcnAddr addr) {
436         modData.remove(addr);
437     }
438
439     /**
440      * Invoked when this Connection shall be shut-down finally.
441      */
442     public void shutdown() {
443         connectionStateMachine.shutdownFinally();
444     }
445
446     /**
447      * Sends a broadcast to all LCN modules with a reuqest to respond with an Ack.
448      */
449     public void sendModuleDiscoveryCommand() {
450         queueAndSend(new SendDataPck(new LcnAddrGrp(BROADCAST_SEGMENT_ID, BROADCAST_MODULE_ID), true,
451                 PckGenerator.nullCommand().getBytes(LcnDefs.LCN_ENCODING)));
452         queueAndSend(new SendDataPck(new LcnAddrGrp(0, BROADCAST_MODULE_ID), true,
453                 PckGenerator.nullCommand().getBytes(LcnDefs.LCN_ENCODING)));
454     }
455
456     /**
457      * Requests the serial number and the firmware version of the given LCN module.
458      *
459      * @param addr module's address
460      */
461     public void sendSerialNumberRequest(LcnAddrMod addr) {
462         queueDirectly(addr, false, PckGenerator.requestSn());
463     }
464
465     /**
466      * Requests theprogrammed name of the given LCN module.
467      *
468      * @param addr module's address
469      */
470     public void sendModuleNameRequest(LcnAddrMod addr) {
471         queueDirectly(addr, false, PckGenerator.requestModuleName(0));
472         queueDirectly(addr, false, PckGenerator.requestModuleName(1));
473     }
474 }