]> git.basschouten.com Git - openhab-addons.git/blob
5369f600114c938749e84d9cd64af12e776560e0
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2022 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.pulseaudio.internal;
14
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.io.InterruptedIOException;
18 import java.io.PipedInputStream;
19 import java.io.PipedOutputStream;
20 import java.net.Socket;
21 import java.util.HashSet;
22 import java.util.Set;
23 import java.util.concurrent.ConcurrentLinkedQueue;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.ScheduledExecutorService;
26 import java.util.function.Consumer;
27
28 import org.eclipse.jdt.annotation.NonNullByDefault;
29 import org.eclipse.jdt.annotation.Nullable;
30 import org.openhab.binding.pulseaudio.internal.handler.PulseaudioHandler;
31 import org.openhab.core.audio.AudioException;
32 import org.openhab.core.audio.AudioFormat;
33 import org.openhab.core.audio.AudioSource;
34 import org.openhab.core.audio.AudioStream;
35 import org.openhab.core.common.ThreadPoolManager;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 /**
40  * The audio source for openhab, implemented by a connection to a pulseaudio source using Simple TCP protocol
41  *
42  * @author Miguel Álvarez - Initial contribution
43  *
44  */
45 @NonNullByDefault
46 public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implements AudioSource {
47
48     private final Logger logger = LoggerFactory.getLogger(PulseAudioAudioSource.class);
49     private final ConcurrentLinkedQueue<PipedOutputStream> pipeOutputs = new ConcurrentLinkedQueue<>();
50     private final ScheduledExecutorService executor;
51
52     private @Nullable Future<?> pipeWriteTask;
53
54     public PulseAudioAudioSource(PulseaudioHandler pulseaudioHandler, ScheduledExecutorService scheduler) {
55         super(pulseaudioHandler, scheduler);
56         executor = ThreadPoolManager
57                 .getScheduledPool("OH-binding-" + pulseaudioHandler.getThing().getUID() + "-source");
58     }
59
60     @Override
61     public Set<AudioFormat> getSupportedFormats() {
62         var supportedFormats = new HashSet<AudioFormat>();
63         var audioFormat = pulseaudioHandler.getSourceAudioFormat();
64         if (audioFormat != null) {
65             supportedFormats.add(audioFormat);
66         }
67         return supportedFormats;
68     }
69
70     @Override
71     public AudioStream getInputStream(AudioFormat audioFormat) throws AudioException {
72         try {
73             for (int countAttempt = 1; countAttempt <= 2; countAttempt++) { // two attempts allowed
74                 try {
75                     connectIfNeeded();
76                     final Socket clientSocketLocal = clientSocket;
77                     if (clientSocketLocal == null) {
78                         break;
79                     }
80                     var sourceFormat = pulseaudioHandler.getSourceAudioFormat();
81                     if (sourceFormat == null) {
82                         throw new AudioException("Unable to get source audio format");
83                     }
84                     if (!audioFormat.isCompatible(sourceFormat)) {
85                         throw new AudioException("Incompatible audio format requested");
86                     }
87                     setIdle(true);
88                     var pipeOutput = new PipedOutputStream();
89                     var pipeInput = new PipedInputStream(pipeOutput, 1024 * 10) {
90                         @Override
91                         public void close() throws IOException {
92                             unregisterPipe(pipeOutput);
93                             super.close();
94                         }
95                     };
96                     registerPipe(pipeOutput);
97                     // get raw audio from the pulse audio socket
98                     return new PulseAudioStream(sourceFormat, pipeInput, (idle) -> {
99                         setIdle(idle);
100                         if (idle) {
101                             scheduleDisconnect();
102                         } else {
103                             // ensure pipe is writing
104                             startPipeWrite();
105                         }
106                     });
107                 } catch (IOException e) {
108                     disconnect(); // disconnect to force clear connection in case of socket not cleanly shutdown
109                     if (countAttempt == 2) { // we won't retry : log and quit
110                         final Socket clientSocketLocal = clientSocket;
111                         String port = clientSocketLocal != null ? Integer.toString(clientSocketLocal.getPort())
112                                 : "unknown";
113                         logger.warn(
114                                 "Error while trying to get audio from pulseaudio audio source. Cannot connect to {}:{}, error: {}",
115                                 pulseaudioHandler.getHost(), port, e.getMessage());
116                         setIdle(true);
117                         throw e;
118                     }
119                 } catch (InterruptedException ie) {
120                     logger.info("Interrupted during source audio connection: {}", ie.getMessage());
121                     setIdle(true);
122                     throw new AudioException(ie);
123                 }
124                 countAttempt++;
125             }
126         } catch (IOException e) {
127             throw new AudioException(e);
128         } finally {
129             scheduleDisconnect();
130         }
131         setIdle(true);
132         throw new AudioException("Unable to create input stream");
133     }
134
135     private synchronized void registerPipe(PipedOutputStream pipeOutput) {
136         this.pipeOutputs.add(pipeOutput);
137         startPipeWrite();
138     }
139
140     private synchronized void startPipeWrite() {
141         if (this.pipeWriteTask == null) {
142             this.pipeWriteTask = executor.submit(() -> {
143                 int lengthRead;
144                 byte[] buffer = new byte[1024];
145                 int readRetries = 3;
146                 while (!pipeOutputs.isEmpty()) {
147                     var stream = getSourceInputStream();
148                     if (stream != null) {
149                         try {
150                             lengthRead = stream.read(buffer);
151                             readRetries = 3;
152                             for (var output : pipeOutputs) {
153                                 try {
154                                     output.write(buffer, 0, lengthRead);
155                                     if (pipeOutputs.contains(output)) {
156                                         output.flush();
157                                     }
158                                 } catch (InterruptedIOException e) {
159                                     if (pipeOutputs.isEmpty()) {
160                                         // task has been ended while writing
161                                         return;
162                                     }
163                                     logger.warn("InterruptedIOException while writing to from pulse source pipe: {}",
164                                             getExceptionMessage(e));
165                                 } catch (IOException e) {
166                                     logger.warn("IOException while writing to from pulse source pipe: {}",
167                                             getExceptionMessage(e));
168                                 } catch (RuntimeException e) {
169                                     logger.warn("RuntimeException while writing to pulse source pipe: {}",
170                                             getExceptionMessage(e));
171                                 }
172                             }
173                         } catch (IOException e) {
174                             logger.warn("IOException while reading from pulse source: {}", getExceptionMessage(e));
175                             if (readRetries == 0) {
176                                 // force reconnection on persistent IOException
177                                 super.disconnect();
178                             } else {
179                                 readRetries--;
180                             }
181                         } catch (RuntimeException e) {
182                             logger.warn("RuntimeException while reading from pulse source: {}", getExceptionMessage(e));
183                         }
184                     } else {
185                         logger.warn("Unable to get source input stream");
186                     }
187                 }
188                 this.pipeWriteTask = null;
189             });
190         }
191     }
192
193     private synchronized void unregisterPipe(PipedOutputStream pipeOutput) {
194         this.pipeOutputs.remove(pipeOutput);
195         try {
196             Thread.sleep(0);
197         } catch (InterruptedException ignored) {
198         }
199         stopPipeWriteTask();
200         try {
201             pipeOutput.close();
202         } catch (IOException ignored) {
203         }
204     }
205
206     private synchronized void stopPipeWriteTask() {
207         var pipeWriteTask = this.pipeWriteTask;
208         if (pipeOutputs.isEmpty() && pipeWriteTask != null) {
209             pipeWriteTask.cancel(true);
210             this.pipeWriteTask = null;
211         }
212     }
213
214     private @Nullable String getExceptionMessage(Exception e) {
215         String message = e.getMessage();
216         var cause = e.getCause();
217         if (message == null && cause != null) {
218             message = cause.getMessage();
219         }
220         return message;
221     }
222
223     private @Nullable InputStream getSourceInputStream() {
224         try {
225             connectIfNeeded();
226         } catch (IOException | InterruptedException ignored) {
227         }
228         try {
229             var clientSocketFinal = clientSocket;
230             return (clientSocketFinal != null) ? clientSocketFinal.getInputStream() : null;
231         } catch (IOException ignored) {
232             return null;
233         }
234     }
235
236     @Override
237     public void disconnect() {
238         stopPipeWriteTask();
239         super.disconnect();
240     }
241
242     static class PulseAudioStream extends AudioStream {
243         private final Logger logger = LoggerFactory.getLogger(PulseAudioAudioSource.class);
244         private final AudioFormat format;
245         private final InputStream input;
246         private final Consumer<Boolean> setIdle;
247         private boolean closed = false;
248
249         public PulseAudioStream(AudioFormat format, InputStream input, Consumer<Boolean> setIdle) {
250             this.input = input;
251             this.format = format;
252             this.setIdle = setIdle;
253         }
254
255         @Override
256         public AudioFormat getFormat() {
257             return format;
258         }
259
260         @Override
261         public int read() throws IOException {
262             byte[] b = new byte[1];
263             int bytesRead = read(b);
264             if (-1 == bytesRead) {
265                 return bytesRead;
266             }
267             Byte bb = Byte.valueOf(b[0]);
268             return bb.intValue();
269         }
270
271         @Override
272         public int read(byte @Nullable [] b) throws IOException {
273             return read(b, 0, b == null ? 0 : b.length);
274         }
275
276         @Override
277         public int read(byte @Nullable [] b, int off, int len) throws IOException {
278             if (b == null) {
279                 throw new IOException("Buffer is null");
280             }
281             logger.trace("reading from pulseaudio stream");
282             if (closed) {
283                 throw new IOException("Stream is closed");
284             }
285             setIdle.accept(false);
286             return input.read(b, off, len);
287         }
288
289         @Override
290         public void close() throws IOException {
291             closed = true;
292             setIdle.accept(true);
293             input.close();
294         }
295     };
296 }