import java.io.IOException;
import java.io.InputStream;
+import java.io.InterruptedIOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.net.Socket;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implements AudioSource {
private final Logger logger = LoggerFactory.getLogger(PulseAudioAudioSource.class);
- private final Set<PipedOutputStream> pipeOutputs = new HashSet<>();
+ private final ConcurrentLinkedQueue<PipedOutputStream> pipeOutputs = new ConcurrentLinkedQueue<>();
private final ScheduledExecutorService executor;
private @Nullable Future<?> pipeWriteTask;
}
setIdle(true);
var pipeOutput = new PipedOutputStream();
- registerPipe(pipeOutput);
- var pipeInput = new PipedInputStream(pipeOutput, 1024 * 20) {
+ var pipeInput = new PipedInputStream(pipeOutput, 1024 * 10) {
@Override
public void close() throws IOException {
unregisterPipe(pipeOutput);
super.close();
}
};
+ registerPipe(pipeOutput);
// get raw audio from the pulse audio socket
return new PulseAudioStream(sourceFormat, pipeInput, (idle) -> {
setIdle(idle);
}
});
} catch (IOException e) {
- disconnect(); // disconnect force to clear connection in case of socket not cleanly shutdown
+ disconnect(); // disconnect to force clear connection in case of socket not cleanly shutdown
if (countAttempt == 2) { // we won't retry : log and quit
String port = clientSocket != null ? Integer.toString(clientSocket.getPort()) : "unknown";
logger.warn(
startPipeWrite();
}
- private void startPipeWrite() {
- if (pipeWriteTask == null) {
+ private synchronized void startPipeWrite() {
+ if (this.pipeWriteTask == null) {
this.pipeWriteTask = executor.submit(() -> {
int lengthRead;
byte[] buffer = new byte[1024];
+ int readRetries = 3;
while (!pipeOutputs.isEmpty()) {
var stream = getSourceInputStream();
if (stream != null) {
try {
lengthRead = stream.read(buffer);
+ readRetries = 3;
for (var output : pipeOutputs) {
- output.write(buffer, 0, lengthRead);
- output.flush();
+ try {
+ output.write(buffer, 0, lengthRead);
+ if (pipeOutputs.contains(output)) {
+ output.flush();
+ }
+ } catch (IOException e) {
+ if (e instanceof InterruptedIOException && pipeOutputs.isEmpty()) {
+ // task has been ended while writing
+ return;
+ }
+ logger.warn("IOException while writing to from pulse source pipe: {}",
+ getExceptionMessage(e));
+ } catch (RuntimeException e) {
+ logger.warn("RuntimeException while writing to pulse source pipe: {}",
+ getExceptionMessage(e));
+ }
}
} catch (IOException e) {
- logger.warn("IOException while reading from pulse source: {}", e.getMessage());
+ logger.warn("IOException while reading from pulse source: {}", getExceptionMessage(e));
+ if (readRetries == 0) {
+ // force reconnection on persistent IOException
+ super.disconnect();
+ } else {
+ readRetries--;
+ }
} catch (RuntimeException e) {
- logger.warn("RuntimeException while reading from pulse source: {}", e.getMessage());
+ logger.warn("RuntimeException while reading from pulse source: {}", getExceptionMessage(e));
}
} else {
logger.warn("Unable to get source input stream");
private synchronized void unregisterPipe(PipedOutputStream pipeOutput) {
this.pipeOutputs.remove(pipeOutput);
+ try {
+ Thread.sleep(0);
+ } catch (InterruptedException ignored) {
+ }
stopPipeWriteTask();
try {
pipeOutput.close();
}
}
- private void stopPipeWriteTask() {
+ private synchronized void stopPipeWriteTask() {
var pipeWriteTask = this.pipeWriteTask;
if (pipeOutputs.isEmpty() && pipeWriteTask != null) {
pipeWriteTask.cancel(true);
}
}
+ private @Nullable String getExceptionMessage(Exception e) {
+ String message = e.getMessage();
+ var cause = e.getCause();
+ if (message == null && cause != null) {
+ message = cause.getMessage();
+ }
+ return message;
+ }
+
private @Nullable InputStream getSourceInputStream() {
try {
connectIfNeeded();