import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
private int debugId = 0;
private Thread monitorThread;
private final Lock threadLock = new ReentrantLock();
- private final Lock queueUpdatedLock = new ReentrantLock();
- private final Condition queueUpdated = queueUpdatedLock.newCondition();
private AtomicBoolean sessionActive = new AtomicBoolean(false);
// Data structures
private final Map<LxUuid, LxControl> controls = new HashMap<>();
private final Map<ChannelUID, LxControl> channels = new HashMap<>();
- private final ConcurrentLinkedQueue<LxStateUpdate> stateUpdateQueue = new ConcurrentLinkedQueue<>();
+ private final BlockingQueue<LxStateUpdate> stateUpdateQueue = new LinkedBlockingQueue<>();
private LxDynamicStateDescriptionProvider dynamicStateDescriptionProvider;
private final Logger logger = LoggerFactory.getLogger(LxServerHandler.class);
*/
void queueStateUpdate(LxUuid uuid, Object value) {
stateUpdateQueue.add(new LxStateUpdate(uuid, value));
- queueUpdatedLock.lock();
- try {
- queueUpdated.signalAll();
- } finally {
- queueUpdatedLock.unlock();
- }
}
/**
private void processStateUpdates() throws InterruptedException {
while (sessionActive.get()) {
logger.debug("[{}] Sleeping for {} seconds.", debugId, bindingConfig.keepAlivePeriod - elapsed);
- queueUpdatedLock.lock();
- try {
- if (!queueUpdated.await(bindingConfig.keepAlivePeriod - elapsed, TimeUnit.SECONDS)) {
- sendKeepAlive();
- continue;
- }
- } finally {
- queueUpdatedLock.unlock();
- }
+ LxStateUpdate update = stateUpdateQueue.poll(bindingConfig.keepAlivePeriod - elapsed, TimeUnit.SECONDS);
elapsed = Duration.between(lastKeepAlive, Instant.now()).getSeconds();
- if (elapsed >= bindingConfig.keepAlivePeriod) {
+ if (update == null || elapsed >= bindingConfig.keepAlivePeriod) {
sendKeepAlive();
+ elapsed = 0;
}
- LxStateUpdate update;
- while ((update = stateUpdateQueue.poll()) != null && sessionActive.get()) {
+ if (update != null) {
updateStateValue(update);
}
}