]> git.basschouten.com Git - openhab-addons.git/blob
3292aeb487bd3352d7c0e3fdb42c3a61a48881ab
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2024 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.pulseaudio.internal;
14
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.net.Socket;
18 import java.util.Set;
19 import java.util.concurrent.Future;
20 import java.util.concurrent.ScheduledExecutorService;
21
22 import org.eclipse.jdt.annotation.NonNullByDefault;
23 import org.eclipse.jdt.annotation.Nullable;
24 import org.openhab.binding.pulseaudio.internal.handler.PulseaudioHandler;
25 import org.openhab.core.audio.AudioException;
26 import org.openhab.core.audio.AudioFormat;
27 import org.openhab.core.audio.AudioSource;
28 import org.openhab.core.audio.AudioStream;
29 import org.openhab.core.audio.PipedAudioStream;
30 import org.openhab.core.common.ThreadPoolManager;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 /**
35  * The audio source for openhab, implemented by a connection to a pulseaudio source using Simple TCP protocol
36  *
37  * @author Miguel Álvarez - Initial contribution
38  *
39  */
40 @NonNullByDefault
41 public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implements AudioSource {
42
43     private final Logger logger = LoggerFactory.getLogger(PulseAudioAudioSource.class);
44     private final PipedAudioStream.Group streamGroup;
45     private final ScheduledExecutorService executor;
46     private final AudioFormat streamFormat;
47
48     private @Nullable Future<?> pipeWriteTask;
49
50     public PulseAudioAudioSource(PulseaudioHandler pulseaudioHandler, ScheduledExecutorService scheduler) {
51         super(pulseaudioHandler, scheduler);
52         streamFormat = pulseaudioHandler.getSourceAudioFormat();
53         executor = ThreadPoolManager
54                 .getScheduledPool("OH-binding-" + pulseaudioHandler.getThing().getUID() + "-source");
55         streamGroup = PipedAudioStream.newGroup(streamFormat);
56     }
57
58     @Override
59     public Set<AudioFormat> getSupportedFormats() {
60         return Set.of(streamFormat);
61     }
62
63     @Override
64     public AudioStream getInputStream(AudioFormat audioFormat) throws AudioException {
65         try {
66             for (int countAttempt = 1; countAttempt <= 2; countAttempt++) { // two attempts allowed
67                 try {
68                     connectIfNeeded();
69                     final Socket clientSocketLocal = clientSocket;
70                     if (clientSocketLocal == null) {
71                         break;
72                     }
73                     if (!audioFormat.isCompatible(streamFormat)) {
74                         throw new AudioException("Incompatible audio format requested");
75                     }
76                     var audioStream = streamGroup.getAudioStreamInGroup();
77                     audioStream.onClose(() -> {
78                         minusClientCount();
79                         stopPipeWriteTask();
80                     });
81                     addClientCount();
82                     startPipeWrite();
83                     // get raw audio from the pulse audio socket
84                     return audioStream;
85                 } catch (IOException e) {
86                     disconnect(); // disconnect to force clear connection in case of socket not cleanly shutdown
87                     if (countAttempt == 2) { // we won't retry : log and quit
88                         final Socket clientSocketLocal = clientSocket;
89                         String port = clientSocketLocal != null ? Integer.toString(clientSocketLocal.getPort())
90                                 : "unknown";
91                         logger.warn(
92                                 "Error while trying to get audio from pulseaudio audio source. Cannot connect to {}:{}, error: {}",
93                                 pulseaudioHandler.getHost(), port, e.getMessage());
94                         throw e;
95                     }
96                 } catch (InterruptedException ie) {
97                     logger.info("Interrupted during source audio connection: {}", ie.getMessage());
98                     throw new AudioException(ie);
99                 }
100                 countAttempt++;
101             }
102         } catch (IOException e) {
103             throw new AudioException(e);
104         }
105         throw new AudioException("Unable to create input stream");
106     }
107
108     /**
109      * As startPipeWrite is called for every chunk read,
110      * this wrapper method make the test before effectively
111      * locking the object (which is a costly operation)
112      */
113     private void startPipeWrite() {
114         if (this.pipeWriteTask == null) {
115             startPipeWriteSynchronized();
116         }
117     }
118
119     private synchronized void startPipeWriteSynchronized() {
120         if (this.pipeWriteTask == null) {
121             this.pipeWriteTask = executor.submit(() -> {
122                 int lengthRead;
123                 byte[] buffer = new byte[1200];
124                 int readRetries = 3;
125                 while (!streamGroup.isEmpty()) {
126                     var stream = getSourceInputStream();
127                     if (stream != null) {
128                         try {
129                             lengthRead = stream.read(buffer);
130                             readRetries = 3;
131                             streamGroup.write(buffer, 0, lengthRead);
132                             streamGroup.flush();
133                         } catch (IOException e) {
134                             logger.warn("IOException while reading from pulse source: {}", getExceptionMessage(e));
135                             if (readRetries == 0) {
136                                 // force reconnection on persistent IOException
137                                 super.disconnect();
138                             } else {
139                                 readRetries--;
140                             }
141                         } catch (RuntimeException e) {
142                             logger.warn("RuntimeException while reading from pulse source: {}", getExceptionMessage(e));
143                         }
144                     } else {
145                         logger.warn("Unable to get source input stream");
146                     }
147                 }
148                 this.pipeWriteTask = null;
149             });
150         }
151     }
152
153     private synchronized void stopPipeWriteTask() {
154         var pipeWriteTask = this.pipeWriteTask;
155         if (streamGroup.isEmpty() && pipeWriteTask != null) {
156             pipeWriteTask.cancel(true);
157             this.pipeWriteTask = null;
158         }
159     }
160
161     private @Nullable String getExceptionMessage(Exception e) {
162         String message = e.getMessage();
163         var cause = e.getCause();
164         if (message == null && cause != null) {
165             message = cause.getMessage();
166         }
167         return message;
168     }
169
170     private @Nullable InputStream getSourceInputStream() {
171         try {
172             connectIfNeeded();
173         } catch (IOException | InterruptedException ignored) {
174         }
175         try {
176             var clientSocketFinal = clientSocket;
177             return (clientSocketFinal != null) ? clientSocketFinal.getInputStream() : null;
178         } catch (IOException ignored) {
179             return null;
180         }
181     }
182
183     @Override
184     public void disconnect() {
185         stopPipeWriteTask();
186         super.disconnect();
187     }
188 }