]> git.basschouten.com Git - openhab-addons.git/commitdiff
[pulseaudio] Apply real disconnection when needed (#13338)
authorGwendal Roulleau <dalgwen@users.noreply.github.com>
Sun, 25 Sep 2022 10:59:30 +0000 (12:59 +0200)
committerGitHub <noreply@github.com>
Sun, 25 Sep 2022 10:59:30 +0000 (12:59 +0200)
* [pulseaudio] Removing isIdle test

The isIdle boolean was not properly handled.
When disconnection is called, isIdle is not relevant : we should always honnor the disconnection request.
In fact, isIdle prevented disconnection when it is necessary (example : when a IOException occurs when sending audio to sink)

+Little bug fix on volume parsing: some volume request doesn't respond with a space after the comma separating left/right channel.

* [pulseaudio] Enhancement to the idle detection for disconnection

Using a counter to count client instead of a isIdle variable, which was not thread safe.
The PulseaudioSimpleProtocolStream parent class is now the sole responsible for closing source or sink stream.

* [pulseaudio] Small performance enhancement

Avoid a costly synchronized operation for a method called very often.

Signed-off-by: Gwendal Roulleau <gwendal.roulleau@gmail.com>
bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSink.java
bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSource.java
bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseaudioSimpleProtocolStream.java
bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/cli/Parser.java
bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/handler/PulseaudioHandler.java

index 8e42bb1c261e7ec13c0e43b3866a9cbd0519c6b1..dad493e5e85fdf31a4abc7723024d948ded0a682 100644 (file)
@@ -66,6 +66,7 @@ public class PulseAudioAudioSink extends PulseaudioSimpleProtocolStream implemen
         if (audioStream == null) {
             return;
         }
+        addClientCount();
         try (ConvertedInputStream normalizedPCMStream = new ConvertedInputStream(audioStream)) {
             for (int countAttempt = 1; countAttempt <= 2; countAttempt++) { // two attempts allowed
                 try {
@@ -73,7 +74,6 @@ public class PulseAudioAudioSink extends PulseaudioSimpleProtocolStream implemen
                     final Socket clientSocketLocal = clientSocket;
                     if (clientSocketLocal != null) {
                         // send raw audio to the socket and to pulse audio
-                        setIdle(false);
                         Instant start = Instant.now();
                         normalizedPCMStream.transferTo(clientSocketLocal.getOutputStream());
                         if (normalizedPCMStream.getDuration() != -1) { // ensure, if the sound has a duration
@@ -108,9 +108,8 @@ public class PulseAudioAudioSink extends PulseaudioSimpleProtocolStream implemen
             throw new UnsupportedAudioFormatException("Cannot send sound to the pulseaudio sink",
                     audioStream.getFormat(), e);
         } finally {
-            scheduleDisconnect();
+            minusClientCount();
         }
-        setIdle(true);
     }
 
     @Override
index 5369f600114c938749e84d9cd64af12e776560e0..f7acb4b0af28cbaaeb7f3707c55b16ba72b6b560 100644 (file)
@@ -23,7 +23,6 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.function.Consumer;
 
 import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.eclipse.jdt.annotation.Nullable;
@@ -84,7 +83,6 @@ public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implem
                     if (!audioFormat.isCompatible(sourceFormat)) {
                         throw new AudioException("Incompatible audio format requested");
                     }
-                    setIdle(true);
                     var pipeOutput = new PipedOutputStream();
                     var pipeInput = new PipedInputStream(pipeOutput, 1024 * 10) {
                         @Override
@@ -95,14 +93,9 @@ public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implem
                     };
                     registerPipe(pipeOutput);
                     // get raw audio from the pulse audio socket
-                    return new PulseAudioStream(sourceFormat, pipeInput, (idle) -> {
-                        setIdle(idle);
-                        if (idle) {
-                            scheduleDisconnect();
-                        } else {
-                            // ensure pipe is writing
-                            startPipeWrite();
-                        }
+                    return new PulseAudioStream(sourceFormat, pipeInput, () -> {
+                        // ensure pipe is writing
+                        startPipeWrite();
                     });
                 } catch (IOException e) {
                     disconnect(); // disconnect to force clear connection in case of socket not cleanly shutdown
@@ -113,31 +106,40 @@ public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implem
                         logger.warn(
                                 "Error while trying to get audio from pulseaudio audio source. Cannot connect to {}:{}, error: {}",
                                 pulseaudioHandler.getHost(), port, e.getMessage());
-                        setIdle(true);
                         throw e;
                     }
                 } catch (InterruptedException ie) {
                     logger.info("Interrupted during source audio connection: {}", ie.getMessage());
-                    setIdle(true);
                     throw new AudioException(ie);
                 }
                 countAttempt++;
             }
         } catch (IOException e) {
             throw new AudioException(e);
-        } finally {
-            scheduleDisconnect();
         }
-        setIdle(true);
         throw new AudioException("Unable to create input stream");
     }
 
     private synchronized void registerPipe(PipedOutputStream pipeOutput) {
-        this.pipeOutputs.add(pipeOutput);
+        boolean isAdded = this.pipeOutputs.add(pipeOutput);
+        if (isAdded) {
+            addClientCount();
+        }
         startPipeWrite();
     }
 
-    private synchronized void startPipeWrite() {
+    /**
+     * As startPipeWrite is called for every chunk read,
+     * this wrapper method make the test before effectively
+     * locking the object (which is a costly operation)
+     */
+    private void startPipeWrite() {
+        if (this.pipeWriteTask == null) {
+            startPipeWriteSynchronized();
+        }
+    }
+
+    private synchronized void startPipeWriteSynchronized() {
         if (this.pipeWriteTask == null) {
             this.pipeWriteTask = executor.submit(() -> {
                 int lengthRead;
@@ -191,7 +193,10 @@ public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implem
     }
 
     private synchronized void unregisterPipe(PipedOutputStream pipeOutput) {
-        this.pipeOutputs.remove(pipeOutput);
+        boolean isRemoved = this.pipeOutputs.remove(pipeOutput);
+        if (isRemoved) {
+            minusClientCount();
+        }
         try {
             Thread.sleep(0);
         } catch (InterruptedException ignored) {
@@ -243,13 +248,13 @@ public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implem
         private final Logger logger = LoggerFactory.getLogger(PulseAudioAudioSource.class);
         private final AudioFormat format;
         private final InputStream input;
-        private final Consumer<Boolean> setIdle;
+        private final Runnable activity;
         private boolean closed = false;
 
-        public PulseAudioStream(AudioFormat format, InputStream input, Consumer<Boolean> setIdle) {
+        public PulseAudioStream(AudioFormat format, InputStream input, Runnable activity) {
             this.input = input;
             this.format = format;
-            this.setIdle = setIdle;
+            this.activity = activity;
         }
 
         @Override
@@ -282,14 +287,13 @@ public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implem
             if (closed) {
                 throw new IOException("Stream is closed");
             }
-            setIdle.accept(false);
+            activity.run();
             return input.read(b, off, len);
         }
 
         @Override
         public void close() throws IOException {
             closed = true;
-            setIdle.accept(true);
             input.close();
         }
     };
index 98c8bfb3d78dcbd68f0d6d00ede98b8db4b4c35a..8889a8ea300f82171dcf48830ee9ea609261f322 100644 (file)
@@ -18,6 +18,7 @@ import java.util.Locale;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.eclipse.jdt.annotation.Nullable;
@@ -43,7 +44,8 @@ public abstract class PulseaudioSimpleProtocolStream {
 
     protected @Nullable Socket clientSocket;
 
-    private boolean isIdle = true;
+    private ReentrantLock countClientLock = new ReentrantLock();
+    private Integer countClient = 0;
 
     private @Nullable ScheduledFuture<?> scheduledDisconnection;
 
@@ -54,6 +56,7 @@ public abstract class PulseaudioSimpleProtocolStream {
 
     /**
      * Connect to pulseaudio with the simple protocol
+     * Will schedule an attempt for disconnection after timeout
      *
      * @throws IOException
      * @throws InterruptedException when interrupted during the loading module wait
@@ -61,12 +64,13 @@ public abstract class PulseaudioSimpleProtocolStream {
     public void connectIfNeeded() throws IOException, InterruptedException {
         Socket clientSocketLocal = clientSocket;
         if (clientSocketLocal == null || !clientSocketLocal.isConnected() || clientSocketLocal.isClosed()) {
-            logger.debug("Simple TCP Stream connecting");
+            logger.debug("Simple TCP Stream connecting for {}", getLabel(null));
             String host = pulseaudioHandler.getHost();
             int port = pulseaudioHandler.getSimpleTcpPortAndLoadModuleIfNecessary();
             var clientSocketFinal = new Socket(host, port);
             clientSocketFinal.setSoTimeout(pulseaudioHandler.getBasicProtocolSOTimeout());
             clientSocket = clientSocketFinal;
+            scheduleDisconnectIfNoClient();
         }
     }
 
@@ -75,8 +79,8 @@ public abstract class PulseaudioSimpleProtocolStream {
      */
     public void disconnect() {
         final Socket clientSocketLocal = clientSocket;
-        if (clientSocketLocal != null && isIdle) {
-            logger.debug("Simple TCP Stream disconnecting");
+        if (clientSocketLocal != null) {
+            logger.debug("Simple TCP Stream disconnecting for {}", getLabel(null));
             try {
                 clientSocketLocal.close();
             } catch (IOException ignored) {
@@ -86,15 +90,23 @@ public abstract class PulseaudioSimpleProtocolStream {
         }
     }
 
-    public void scheduleDisconnect() {
-        var scheduledDisconnectionFinal = scheduledDisconnection;
-        if (scheduledDisconnectionFinal != null) {
-            scheduledDisconnectionFinal.cancel(true);
-        }
-        int idleTimeout = pulseaudioHandler.getIdleTimeout();
-        if (idleTimeout > -1) {
-            logger.debug("Scheduling disconnect");
-            scheduledDisconnection = scheduler.schedule(this::disconnect, idleTimeout, TimeUnit.MILLISECONDS);
+    private void scheduleDisconnectIfNoClient() {
+        countClientLock.lock();
+        try {
+            if (countClient <= 0) {
+                var scheduledDisconnectionFinal = scheduledDisconnection;
+                if (scheduledDisconnectionFinal != null) {
+                    logger.debug("Aborting next disconnect");
+                    scheduledDisconnectionFinal.cancel(true);
+                }
+                int idleTimeout = pulseaudioHandler.getIdleTimeout();
+                if (idleTimeout > -1) {
+                    logger.debug("Scheduling next disconnect");
+                    scheduledDisconnection = scheduler.schedule(this::disconnect, idleTimeout, TimeUnit.MILLISECONDS);
+                }
+            }
+        } finally {
+            countClientLock.unlock();
         }
     }
 
@@ -115,7 +127,35 @@ public abstract class PulseaudioSimpleProtocolStream {
         return label != null ? label : pulseaudioHandler.getThing().getUID().getId();
     }
 
-    public void setIdle(boolean idle) {
-        isIdle = idle;
+    protected void addClientCount() {
+        countClientLock.lock();
+        try {
+            countClient += 1;
+            logger.debug("Adding new client for pulseaudio sink/source {}. Current count: {}", getLabel(null),
+                    countClient);
+            if (countClient <= 0) { // safe against misuse
+                countClient = 1;
+            }
+            var scheduledDisconnectionFinal = scheduledDisconnection;
+            if (scheduledDisconnectionFinal != null) {
+                logger.debug("Aborting next disconnect");
+                scheduledDisconnectionFinal.cancel(true);
+            }
+        } finally {
+            countClientLock.unlock();
+        }
+    }
+
+    protected void minusClientCount() {
+        countClientLock.lock();
+        countClient -= 1;
+        logger.debug("Removing client for pulseaudio sink/source {}. Current count: {}", getLabel(null), countClient);
+        if (countClient < 0) { // safe against misuse
+            countClient = 0;
+        }
+        countClientLock.unlock();
+        if (countClient <= 0) {
+            scheduleDisconnectIfNoClient();
+        }
     }
 }
index b9bcd09074e3e311cbacc54110c944032a52d812..e5bdf08cf278c5cdea9ccd6b13bc85cab4dd3673 100644 (file)
@@ -352,7 +352,7 @@ public class Parser {
     private static int parseVolume(String vol) {
         int volumeTotal = 0;
         int nChannels = 0;
-        for (String channel : vol.split(", ")) {
+        for (String channel : vol.split(",")) {
             Matcher matcher = VOLUME_PATTERN.matcher(channel.trim());
             if (matcher.find()) {
                 volumeTotal += Integer.valueOf(matcher.group(3));
index a4f48ef0d258928426bafda2ca62616db83a1f0c..502f2301981365a8a243fc7f43d000a25976adc4 100644 (file)
@@ -140,8 +140,6 @@ public class PulseaudioHandler extends BaseThingHandler {
                 } catch (InterruptedException i) {
                     logger.info("Interrupted during sink audio connection: {}", i.getMessage());
                     return;
-                } finally {
-                    audioSink.scheduleDisconnect();
                 }
             }
         });
@@ -194,8 +192,6 @@ public class PulseaudioHandler extends BaseThingHandler {
                 } catch (InterruptedException i) {
                     logger.info("Interrupted during source audio connection: {}", i.getMessage());
                     return;
-                } finally {
-                    audioSource.scheduleDisconnect();
                 }
             }
         });