]> git.basschouten.com Git - openhab-addons.git/blob
a44b2badcec50a398c1e9422df51075e161841d6
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2022 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.pulseaudio.internal;
14
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.io.PipedInputStream;
18 import java.io.PipedOutputStream;
19 import java.net.Socket;
20 import java.util.HashSet;
21 import java.util.Set;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.function.Consumer;
25
26 import org.eclipse.jdt.annotation.NonNullByDefault;
27 import org.eclipse.jdt.annotation.Nullable;
28 import org.openhab.binding.pulseaudio.internal.handler.PulseaudioHandler;
29 import org.openhab.core.audio.AudioException;
30 import org.openhab.core.audio.AudioFormat;
31 import org.openhab.core.audio.AudioSource;
32 import org.openhab.core.audio.AudioStream;
33 import org.openhab.core.common.ThreadPoolManager;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * The audio source for openhab, implemented by a connection to a pulseaudio source using Simple TCP protocol
39  *
40  * @author Miguel Álvarez - Initial contribution
41  *
42  */
43 @NonNullByDefault
44 public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implements AudioSource {
45
46     private final Logger logger = LoggerFactory.getLogger(PulseAudioAudioSource.class);
47     private final Set<PipedOutputStream> pipeOutputs = new HashSet<>();
48     private final ScheduledExecutorService executor;
49
50     private @Nullable Future<?> pipeWriteTask;
51
52     public PulseAudioAudioSource(PulseaudioHandler pulseaudioHandler, ScheduledExecutorService scheduler) {
53         super(pulseaudioHandler, scheduler);
54         executor = ThreadPoolManager
55                 .getScheduledPool("OH-binding-" + pulseaudioHandler.getThing().getUID() + "-source");
56     }
57
58     @Override
59     public Set<AudioFormat> getSupportedFormats() {
60         var supportedFormats = new HashSet<AudioFormat>();
61         var audioFormat = pulseaudioHandler.getSourceAudioFormat();
62         if (audioFormat != null) {
63             supportedFormats.add(audioFormat);
64         }
65         return supportedFormats;
66     }
67
68     @Override
69     public AudioStream getInputStream(AudioFormat audioFormat) throws AudioException {
70         try {
71             for (int countAttempt = 1; countAttempt <= 2; countAttempt++) { // two attempts allowed
72                 try {
73                     connectIfNeeded();
74                     final Socket clientSocketLocal = clientSocket;
75                     if (clientSocketLocal == null) {
76                         break;
77                     }
78                     var sourceFormat = pulseaudioHandler.getSourceAudioFormat();
79                     if (sourceFormat == null) {
80                         throw new AudioException("Unable to get source audio format");
81                     }
82                     if (!audioFormat.isCompatible(sourceFormat)) {
83                         throw new AudioException("Incompatible audio format requested");
84                     }
85                     setIdle(true);
86                     var pipeOutput = new PipedOutputStream();
87                     registerPipe(pipeOutput);
88                     var pipeInput = new PipedInputStream(pipeOutput, 1024 * 20) {
89                         @Override
90                         public void close() throws IOException {
91                             unregisterPipe(pipeOutput);
92                             super.close();
93                         }
94                     };
95                     // get raw audio from the pulse audio socket
96                     return new PulseAudioStream(sourceFormat, pipeInput, (idle) -> {
97                         setIdle(idle);
98                         if (idle) {
99                             scheduleDisconnect();
100                         } else {
101                             // ensure pipe is writing
102                             startPipeWrite();
103                         }
104                     });
105                 } catch (IOException e) {
106                     disconnect(); // disconnect force to clear connection in case of socket not cleanly shutdown
107                     if (countAttempt == 2) { // we won't retry : log and quit
108                         String port = clientSocket != null ? Integer.toString(clientSocket.getPort()) : "unknown";
109                         logger.warn(
110                                 "Error while trying to get audio from pulseaudio audio source. Cannot connect to {}:{}, error: {}",
111                                 pulseaudioHandler.getHost(), port, e.getMessage());
112                         setIdle(true);
113                         throw e;
114                     }
115                 } catch (InterruptedException ie) {
116                     logger.info("Interrupted during source audio connection: {}", ie.getMessage());
117                     setIdle(true);
118                     throw new AudioException(ie);
119                 }
120                 countAttempt++;
121             }
122         } catch (IOException e) {
123             throw new AudioException(e);
124         } finally {
125             scheduleDisconnect();
126         }
127         setIdle(true);
128         throw new AudioException("Unable to create input stream");
129     }
130
131     private synchronized void registerPipe(PipedOutputStream pipeOutput) {
132         this.pipeOutputs.add(pipeOutput);
133         startPipeWrite();
134     }
135
136     private void startPipeWrite() {
137         if (pipeWriteTask == null) {
138             this.pipeWriteTask = executor.submit(() -> {
139                 int lengthRead;
140                 byte[] buffer = new byte[1024];
141                 while (!pipeOutputs.isEmpty()) {
142                     var stream = getSourceInputStream();
143                     if (stream != null) {
144                         try {
145                             lengthRead = stream.read(buffer);
146                             for (var output : pipeOutputs) {
147                                 output.write(buffer, 0, lengthRead);
148                                 output.flush();
149                             }
150                         } catch (IOException e) {
151                             logger.warn("IOException while reading from pulse source: {}", e.getMessage());
152                         } catch (RuntimeException e) {
153                             logger.warn("RuntimeException while reading from pulse source: {}", e.getMessage());
154                         }
155                     } else {
156                         logger.warn("Unable to get source input stream");
157                     }
158                 }
159                 this.pipeWriteTask = null;
160             });
161         }
162     }
163
164     private synchronized void unregisterPipe(PipedOutputStream pipeOutput) {
165         this.pipeOutputs.remove(pipeOutput);
166         stopPipeWriteTask();
167         try {
168             pipeOutput.close();
169         } catch (IOException ignored) {
170         }
171     }
172
173     private void stopPipeWriteTask() {
174         var pipeWriteTask = this.pipeWriteTask;
175         if (pipeOutputs.isEmpty() && pipeWriteTask != null) {
176             pipeWriteTask.cancel(true);
177             this.pipeWriteTask = null;
178         }
179     }
180
181     private @Nullable InputStream getSourceInputStream() {
182         try {
183             connectIfNeeded();
184         } catch (IOException | InterruptedException ignored) {
185         }
186         try {
187             return (clientSocket != null) ? clientSocket.getInputStream() : null;
188         } catch (IOException ignored) {
189             return null;
190         }
191     }
192
193     @Override
194     public void disconnect() {
195         stopPipeWriteTask();
196         super.disconnect();
197     }
198
199     static class PulseAudioStream extends AudioStream {
200         private final Logger logger = LoggerFactory.getLogger(PulseAudioAudioSource.class);
201         private final AudioFormat format;
202         private final InputStream input;
203         private final Consumer<Boolean> setIdle;
204         private boolean closed = false;
205
206         public PulseAudioStream(AudioFormat format, InputStream input, Consumer<Boolean> setIdle) {
207             this.input = input;
208             this.format = format;
209             this.setIdle = setIdle;
210         }
211
212         @Override
213         public AudioFormat getFormat() {
214             return format;
215         }
216
217         @Override
218         public int read() throws IOException {
219             byte[] b = new byte[1];
220             int bytesRead = read(b);
221             if (-1 == bytesRead) {
222                 return bytesRead;
223             }
224             Byte bb = Byte.valueOf(b[0]);
225             return bb.intValue();
226         }
227
228         @Override
229         public int read(byte @Nullable [] b) throws IOException {
230             return read(b, 0, b.length);
231         }
232
233         @Override
234         public int read(byte @Nullable [] b, int off, int len) throws IOException {
235             logger.trace("reading from pulseaudio stream");
236             if (closed) {
237                 throw new IOException("Stream is closed");
238             }
239             setIdle.accept(false);
240             return input.read(b, off, len);
241         }
242
243         @Override
244         public void close() throws IOException {
245             closed = true;
246             setIdle.accept(true);
247             input.close();
248         }
249     };
250 }