]> git.basschouten.com Git - openhab-addons.git/commitdiff
[pulseaudio] source: use thread safe collection and force reconnection (#12441)
authorGiviMAD <GiviMAD@users.noreply.github.com>
Thu, 17 Mar 2022 23:41:00 +0000 (00:41 +0100)
committerGitHub <noreply@github.com>
Thu, 17 Mar 2022 23:41:00 +0000 (00:41 +0100)
* [pulseaudio] use thread safe collection
* [pulseaudio] source: connect pipe before store ref
* [pulseaudio] source: improve warning messages
* [pulseaudio] fix IOException when closing all sources
* [pulseaudio] prevent warning when InterruptedIOException on source close

Signed-off-by: Miguel Álvarez Díez <miguelwork92@gmail.com>
bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSource.java

index a44b2badcec50a398c1e9422df51075e161841d6..04bc37636c28ea7532aed0bef7d7f541c51359ba 100644 (file)
@@ -14,11 +14,13 @@ package org.openhab.binding.pulseaudio.internal;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InterruptedIOException;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
 import java.net.Socket;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.Consumer;
@@ -44,7 +46,7 @@ import org.slf4j.LoggerFactory;
 public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implements AudioSource {
 
     private final Logger logger = LoggerFactory.getLogger(PulseAudioAudioSource.class);
-    private final Set<PipedOutputStream> pipeOutputs = new HashSet<>();
+    private final ConcurrentLinkedQueue<PipedOutputStream> pipeOutputs = new ConcurrentLinkedQueue<>();
     private final ScheduledExecutorService executor;
 
     private @Nullable Future<?> pipeWriteTask;
@@ -84,14 +86,14 @@ public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implem
                     }
                     setIdle(true);
                     var pipeOutput = new PipedOutputStream();
-                    registerPipe(pipeOutput);
-                    var pipeInput = new PipedInputStream(pipeOutput, 1024 * 20) {
+                    var pipeInput = new PipedInputStream(pipeOutput, 1024 * 10) {
                         @Override
                         public void close() throws IOException {
                             unregisterPipe(pipeOutput);
                             super.close();
                         }
                     };
+                    registerPipe(pipeOutput);
                     // get raw audio from the pulse audio socket
                     return new PulseAudioStream(sourceFormat, pipeInput, (idle) -> {
                         setIdle(idle);
@@ -103,7 +105,7 @@ public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implem
                         }
                     });
                 } catch (IOException e) {
-                    disconnect(); // disconnect force to clear connection in case of socket not cleanly shutdown
+                    disconnect(); // disconnect to force clear connection in case of socket not cleanly shutdown
                     if (countAttempt == 2) { // we won't retry : log and quit
                         String port = clientSocket != null ? Integer.toString(clientSocket.getPort()) : "unknown";
                         logger.warn(
@@ -133,24 +135,46 @@ public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implem
         startPipeWrite();
     }
 
-    private void startPipeWrite() {
-        if (pipeWriteTask == null) {
+    private synchronized void startPipeWrite() {
+        if (this.pipeWriteTask == null) {
             this.pipeWriteTask = executor.submit(() -> {
                 int lengthRead;
                 byte[] buffer = new byte[1024];
+                int readRetries = 3;
                 while (!pipeOutputs.isEmpty()) {
                     var stream = getSourceInputStream();
                     if (stream != null) {
                         try {
                             lengthRead = stream.read(buffer);
+                            readRetries = 3;
                             for (var output : pipeOutputs) {
-                                output.write(buffer, 0, lengthRead);
-                                output.flush();
+                                try {
+                                    output.write(buffer, 0, lengthRead);
+                                    if (pipeOutputs.contains(output)) {
+                                        output.flush();
+                                    }
+                                } catch (IOException e) {
+                                    if (e instanceof InterruptedIOException && pipeOutputs.isEmpty()) {
+                                        // task has been ended while writing
+                                        return;
+                                    }
+                                    logger.warn("IOException while writing to from pulse source pipe: {}",
+                                            getExceptionMessage(e));
+                                } catch (RuntimeException e) {
+                                    logger.warn("RuntimeException while writing to pulse source pipe: {}",
+                                            getExceptionMessage(e));
+                                }
                             }
                         } catch (IOException e) {
-                            logger.warn("IOException while reading from pulse source: {}", e.getMessage());
+                            logger.warn("IOException while reading from pulse source: {}", getExceptionMessage(e));
+                            if (readRetries == 0) {
+                                // force reconnection on persistent IOException
+                                super.disconnect();
+                            } else {
+                                readRetries--;
+                            }
                         } catch (RuntimeException e) {
-                            logger.warn("RuntimeException while reading from pulse source: {}", e.getMessage());
+                            logger.warn("RuntimeException while reading from pulse source: {}", getExceptionMessage(e));
                         }
                     } else {
                         logger.warn("Unable to get source input stream");
@@ -163,6 +187,10 @@ public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implem
 
     private synchronized void unregisterPipe(PipedOutputStream pipeOutput) {
         this.pipeOutputs.remove(pipeOutput);
+        try {
+            Thread.sleep(0);
+        } catch (InterruptedException ignored) {
+        }
         stopPipeWriteTask();
         try {
             pipeOutput.close();
@@ -170,7 +198,7 @@ public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implem
         }
     }
 
-    private void stopPipeWriteTask() {
+    private synchronized void stopPipeWriteTask() {
         var pipeWriteTask = this.pipeWriteTask;
         if (pipeOutputs.isEmpty() && pipeWriteTask != null) {
             pipeWriteTask.cancel(true);
@@ -178,6 +206,15 @@ public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implem
         }
     }
 
+    private @Nullable String getExceptionMessage(Exception e) {
+        String message = e.getMessage();
+        var cause = e.getCause();
+        if (message == null && cause != null) {
+            message = cause.getMessage();
+        }
+        return message;
+    }
+
     private @Nullable InputStream getSourceInputStream() {
         try {
             connectIfNeeded();