2 * Copyright (c) 2010-2022 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.pulseaudio.internal;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.io.PipedInputStream;
18 import java.io.PipedOutputStream;
19 import java.net.Socket;
20 import java.util.HashSet;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.function.Consumer;
26 import org.eclipse.jdt.annotation.NonNullByDefault;
27 import org.eclipse.jdt.annotation.Nullable;
28 import org.openhab.binding.pulseaudio.internal.handler.PulseaudioHandler;
29 import org.openhab.core.audio.AudioException;
30 import org.openhab.core.audio.AudioFormat;
31 import org.openhab.core.audio.AudioSource;
32 import org.openhab.core.audio.AudioStream;
33 import org.openhab.core.common.ThreadPoolManager;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
38 * The audio source for openhab, implemented by a connection to a pulseaudio source using Simple TCP protocol
40 * @author Miguel Álvarez - Initial contribution
44 public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implements AudioSource {
46 private final Logger logger = LoggerFactory.getLogger(PulseAudioAudioSource.class);
47 private final Set<PipedOutputStream> pipeOutputs = new HashSet<>();
48 private final ScheduledExecutorService executor;
50 private @Nullable Future<?> pipeWriteTask;
52 public PulseAudioAudioSource(PulseaudioHandler pulseaudioHandler, ScheduledExecutorService scheduler) {
53 super(pulseaudioHandler, scheduler);
54 executor = ThreadPoolManager
55 .getScheduledPool("OH-binding-" + pulseaudioHandler.getThing().getUID() + "-source");
59 public Set<AudioFormat> getSupportedFormats() {
60 var supportedFormats = new HashSet<AudioFormat>();
61 var audioFormat = pulseaudioHandler.getSourceAudioFormat();
62 if (audioFormat != null) {
63 supportedFormats.add(audioFormat);
65 return supportedFormats;
69 public AudioStream getInputStream(AudioFormat audioFormat) throws AudioException {
71 for (int countAttempt = 1; countAttempt <= 2; countAttempt++) { // two attempts allowed
74 final Socket clientSocketLocal = clientSocket;
75 if (clientSocketLocal == null) {
78 var sourceFormat = pulseaudioHandler.getSourceAudioFormat();
79 if (sourceFormat == null) {
80 throw new AudioException("Unable to get source audio format");
82 if (!audioFormat.isCompatible(sourceFormat)) {
83 throw new AudioException("Incompatible audio format requested");
86 var pipeOutput = new PipedOutputStream();
87 registerPipe(pipeOutput);
88 var pipeInput = new PipedInputStream(pipeOutput, 1024 * 20) {
90 public void close() throws IOException {
91 unregisterPipe(pipeOutput);
95 // get raw audio from the pulse audio socket
96 return new PulseAudioStream(sourceFormat, pipeInput, (idle) -> {
101 // ensure pipe is writing
105 } catch (IOException e) {
106 disconnect(); // disconnect force to clear connection in case of socket not cleanly shutdown
107 if (countAttempt == 2) { // we won't retry : log and quit
108 String port = clientSocket != null ? Integer.toString(clientSocket.getPort()) : "unknown";
110 "Error while trying to get audio from pulseaudio audio source. Cannot connect to {}:{}, error: {}",
111 pulseaudioHandler.getHost(), port, e.getMessage());
115 } catch (InterruptedException ie) {
116 logger.info("Interrupted during source audio connection: {}", ie.getMessage());
118 throw new AudioException(ie);
122 } catch (IOException e) {
123 throw new AudioException(e);
125 scheduleDisconnect();
128 throw new AudioException("Unable to create input stream");
131 private synchronized void registerPipe(PipedOutputStream pipeOutput) {
132 this.pipeOutputs.add(pipeOutput);
136 private void startPipeWrite() {
137 if (pipeWriteTask == null) {
138 this.pipeWriteTask = executor.submit(() -> {
140 byte[] buffer = new byte[1024];
141 while (!pipeOutputs.isEmpty()) {
142 var stream = getSourceInputStream();
143 if (stream != null) {
145 lengthRead = stream.read(buffer);
146 for (var output : pipeOutputs) {
147 output.write(buffer, 0, lengthRead);
150 } catch (IOException e) {
151 logger.warn("IOException while reading from pulse source: {}", e.getMessage());
152 } catch (RuntimeException e) {
153 logger.warn("RuntimeException while reading from pulse source: {}", e.getMessage());
156 logger.warn("Unable to get source input stream");
159 this.pipeWriteTask = null;
164 private synchronized void unregisterPipe(PipedOutputStream pipeOutput) {
165 this.pipeOutputs.remove(pipeOutput);
169 } catch (IOException ignored) {
173 private void stopPipeWriteTask() {
174 var pipeWriteTask = this.pipeWriteTask;
175 if (pipeOutputs.isEmpty() && pipeWriteTask != null) {
176 pipeWriteTask.cancel(true);
177 this.pipeWriteTask = null;
181 private @Nullable InputStream getSourceInputStream() {
184 } catch (IOException | InterruptedException ignored) {
187 return (clientSocket != null) ? clientSocket.getInputStream() : null;
188 } catch (IOException ignored) {
194 public void disconnect() {
199 static class PulseAudioStream extends AudioStream {
200 private final Logger logger = LoggerFactory.getLogger(PulseAudioAudioSource.class);
201 private final AudioFormat format;
202 private final InputStream input;
203 private final Consumer<Boolean> setIdle;
204 private boolean closed = false;
206 public PulseAudioStream(AudioFormat format, InputStream input, Consumer<Boolean> setIdle) {
208 this.format = format;
209 this.setIdle = setIdle;
213 public AudioFormat getFormat() {
218 public int read() throws IOException {
219 byte[] b = new byte[1];
220 int bytesRead = read(b);
221 if (-1 == bytesRead) {
224 Byte bb = Byte.valueOf(b[0]);
225 return bb.intValue();
229 public int read(byte @Nullable [] b) throws IOException {
230 return read(b, 0, b.length);
234 public int read(byte @Nullable [] b, int off, int len) throws IOException {
235 logger.trace("reading from pulseaudio stream");
237 throw new IOException("Stream is closed");
239 setIdle.accept(false);
240 return input.read(b, off, len);
244 public void close() throws IOException {
246 setIdle.accept(true);