]> git.basschouten.com Git - openhab-addons.git/blob
04bc37636c28ea7532aed0bef7d7f541c51359ba
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2022 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.pulseaudio.internal;
14
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.io.InterruptedIOException;
18 import java.io.PipedInputStream;
19 import java.io.PipedOutputStream;
20 import java.net.Socket;
21 import java.util.HashSet;
22 import java.util.Set;
23 import java.util.concurrent.ConcurrentLinkedQueue;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.ScheduledExecutorService;
26 import java.util.function.Consumer;
27
28 import org.eclipse.jdt.annotation.NonNullByDefault;
29 import org.eclipse.jdt.annotation.Nullable;
30 import org.openhab.binding.pulseaudio.internal.handler.PulseaudioHandler;
31 import org.openhab.core.audio.AudioException;
32 import org.openhab.core.audio.AudioFormat;
33 import org.openhab.core.audio.AudioSource;
34 import org.openhab.core.audio.AudioStream;
35 import org.openhab.core.common.ThreadPoolManager;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 /**
40  * The audio source for openhab, implemented by a connection to a pulseaudio source using Simple TCP protocol
41  *
42  * @author Miguel Álvarez - Initial contribution
43  *
44  */
45 @NonNullByDefault
46 public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implements AudioSource {
47
48     private final Logger logger = LoggerFactory.getLogger(PulseAudioAudioSource.class);
49     private final ConcurrentLinkedQueue<PipedOutputStream> pipeOutputs = new ConcurrentLinkedQueue<>();
50     private final ScheduledExecutorService executor;
51
52     private @Nullable Future<?> pipeWriteTask;
53
54     public PulseAudioAudioSource(PulseaudioHandler pulseaudioHandler, ScheduledExecutorService scheduler) {
55         super(pulseaudioHandler, scheduler);
56         executor = ThreadPoolManager
57                 .getScheduledPool("OH-binding-" + pulseaudioHandler.getThing().getUID() + "-source");
58     }
59
60     @Override
61     public Set<AudioFormat> getSupportedFormats() {
62         var supportedFormats = new HashSet<AudioFormat>();
63         var audioFormat = pulseaudioHandler.getSourceAudioFormat();
64         if (audioFormat != null) {
65             supportedFormats.add(audioFormat);
66         }
67         return supportedFormats;
68     }
69
70     @Override
71     public AudioStream getInputStream(AudioFormat audioFormat) throws AudioException {
72         try {
73             for (int countAttempt = 1; countAttempt <= 2; countAttempt++) { // two attempts allowed
74                 try {
75                     connectIfNeeded();
76                     final Socket clientSocketLocal = clientSocket;
77                     if (clientSocketLocal == null) {
78                         break;
79                     }
80                     var sourceFormat = pulseaudioHandler.getSourceAudioFormat();
81                     if (sourceFormat == null) {
82                         throw new AudioException("Unable to get source audio format");
83                     }
84                     if (!audioFormat.isCompatible(sourceFormat)) {
85                         throw new AudioException("Incompatible audio format requested");
86                     }
87                     setIdle(true);
88                     var pipeOutput = new PipedOutputStream();
89                     var pipeInput = new PipedInputStream(pipeOutput, 1024 * 10) {
90                         @Override
91                         public void close() throws IOException {
92                             unregisterPipe(pipeOutput);
93                             super.close();
94                         }
95                     };
96                     registerPipe(pipeOutput);
97                     // get raw audio from the pulse audio socket
98                     return new PulseAudioStream(sourceFormat, pipeInput, (idle) -> {
99                         setIdle(idle);
100                         if (idle) {
101                             scheduleDisconnect();
102                         } else {
103                             // ensure pipe is writing
104                             startPipeWrite();
105                         }
106                     });
107                 } catch (IOException e) {
108                     disconnect(); // disconnect to force clear connection in case of socket not cleanly shutdown
109                     if (countAttempt == 2) { // we won't retry : log and quit
110                         String port = clientSocket != null ? Integer.toString(clientSocket.getPort()) : "unknown";
111                         logger.warn(
112                                 "Error while trying to get audio from pulseaudio audio source. Cannot connect to {}:{}, error: {}",
113                                 pulseaudioHandler.getHost(), port, e.getMessage());
114                         setIdle(true);
115                         throw e;
116                     }
117                 } catch (InterruptedException ie) {
118                     logger.info("Interrupted during source audio connection: {}", ie.getMessage());
119                     setIdle(true);
120                     throw new AudioException(ie);
121                 }
122                 countAttempt++;
123             }
124         } catch (IOException e) {
125             throw new AudioException(e);
126         } finally {
127             scheduleDisconnect();
128         }
129         setIdle(true);
130         throw new AudioException("Unable to create input stream");
131     }
132
133     private synchronized void registerPipe(PipedOutputStream pipeOutput) {
134         this.pipeOutputs.add(pipeOutput);
135         startPipeWrite();
136     }
137
138     private synchronized void startPipeWrite() {
139         if (this.pipeWriteTask == null) {
140             this.pipeWriteTask = executor.submit(() -> {
141                 int lengthRead;
142                 byte[] buffer = new byte[1024];
143                 int readRetries = 3;
144                 while (!pipeOutputs.isEmpty()) {
145                     var stream = getSourceInputStream();
146                     if (stream != null) {
147                         try {
148                             lengthRead = stream.read(buffer);
149                             readRetries = 3;
150                             for (var output : pipeOutputs) {
151                                 try {
152                                     output.write(buffer, 0, lengthRead);
153                                     if (pipeOutputs.contains(output)) {
154                                         output.flush();
155                                     }
156                                 } catch (IOException e) {
157                                     if (e instanceof InterruptedIOException && pipeOutputs.isEmpty()) {
158                                         // task has been ended while writing
159                                         return;
160                                     }
161                                     logger.warn("IOException while writing to from pulse source pipe: {}",
162                                             getExceptionMessage(e));
163                                 } catch (RuntimeException e) {
164                                     logger.warn("RuntimeException while writing to pulse source pipe: {}",
165                                             getExceptionMessage(e));
166                                 }
167                             }
168                         } catch (IOException e) {
169                             logger.warn("IOException while reading from pulse source: {}", getExceptionMessage(e));
170                             if (readRetries == 0) {
171                                 // force reconnection on persistent IOException
172                                 super.disconnect();
173                             } else {
174                                 readRetries--;
175                             }
176                         } catch (RuntimeException e) {
177                             logger.warn("RuntimeException while reading from pulse source: {}", getExceptionMessage(e));
178                         }
179                     } else {
180                         logger.warn("Unable to get source input stream");
181                     }
182                 }
183                 this.pipeWriteTask = null;
184             });
185         }
186     }
187
188     private synchronized void unregisterPipe(PipedOutputStream pipeOutput) {
189         this.pipeOutputs.remove(pipeOutput);
190         try {
191             Thread.sleep(0);
192         } catch (InterruptedException ignored) {
193         }
194         stopPipeWriteTask();
195         try {
196             pipeOutput.close();
197         } catch (IOException ignored) {
198         }
199     }
200
201     private synchronized void stopPipeWriteTask() {
202         var pipeWriteTask = this.pipeWriteTask;
203         if (pipeOutputs.isEmpty() && pipeWriteTask != null) {
204             pipeWriteTask.cancel(true);
205             this.pipeWriteTask = null;
206         }
207     }
208
209     private @Nullable String getExceptionMessage(Exception e) {
210         String message = e.getMessage();
211         var cause = e.getCause();
212         if (message == null && cause != null) {
213             message = cause.getMessage();
214         }
215         return message;
216     }
217
218     private @Nullable InputStream getSourceInputStream() {
219         try {
220             connectIfNeeded();
221         } catch (IOException | InterruptedException ignored) {
222         }
223         try {
224             return (clientSocket != null) ? clientSocket.getInputStream() : null;
225         } catch (IOException ignored) {
226             return null;
227         }
228     }
229
230     @Override
231     public void disconnect() {
232         stopPipeWriteTask();
233         super.disconnect();
234     }
235
236     static class PulseAudioStream extends AudioStream {
237         private final Logger logger = LoggerFactory.getLogger(PulseAudioAudioSource.class);
238         private final AudioFormat format;
239         private final InputStream input;
240         private final Consumer<Boolean> setIdle;
241         private boolean closed = false;
242
243         public PulseAudioStream(AudioFormat format, InputStream input, Consumer<Boolean> setIdle) {
244             this.input = input;
245             this.format = format;
246             this.setIdle = setIdle;
247         }
248
249         @Override
250         public AudioFormat getFormat() {
251             return format;
252         }
253
254         @Override
255         public int read() throws IOException {
256             byte[] b = new byte[1];
257             int bytesRead = read(b);
258             if (-1 == bytesRead) {
259                 return bytesRead;
260             }
261             Byte bb = Byte.valueOf(b[0]);
262             return bb.intValue();
263         }
264
265         @Override
266         public int read(byte @Nullable [] b) throws IOException {
267             return read(b, 0, b.length);
268         }
269
270         @Override
271         public int read(byte @Nullable [] b, int off, int len) throws IOException {
272             logger.trace("reading from pulseaudio stream");
273             if (closed) {
274                 throw new IOException("Stream is closed");
275             }
276             setIdle.accept(false);
277             return input.read(b, off, len);
278         }
279
280         @Override
281         public void close() throws IOException {
282             closed = true;
283             setIdle.accept(true);
284             input.close();
285         }
286     };
287 }