import java.util.ArrayList;
import java.util.Collections;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
return isStarted;
}
- private void internalStop() {
- if (subscriptionScheduler != null && !subscriptionScheduler.isCancelled()) {
+ private void stopSubscriptionScheduler() {
+ final ScheduledFuture<?> subscriptionScheduler = this.subscriptionScheduler;
+ if (subscriptionScheduler != null) {
subscriptionScheduler.cancel(true);
- subscriptionScheduler = null;
+ this.subscriptionScheduler = null;
}
- if (pollingScheduler != null && !pollingScheduler.isCancelled()) {
+ }
+
+ private void internalStop() {
+ stopSubscriptionScheduler();
+
+ ScheduledFuture<?> pollingScheduler = this.pollingScheduler;
+ if (pollingScheduler != null) {
pollingScheduler.cancel(true);
- pollingScheduler = null;
+ this.pollingScheduler = null;
unsubscribe();
logger.debug("internal stop EventListener");
}
return subscribed;
}
- private void subscribe(final List<String> evetNames) {
- final Iterator<String> eventNameIter = evetNames.iterator();
- subscriptionScheduler = scheduler.scheduleWithFixedDelay(new Runnable() {
-
- @Override
- public void run() {
- while (eventNameIter.hasNext()) {
- subscribe(eventNameIter.next());
- }
- subscriptionScheduler.cancel(true);
- }
+ private void subscribe(final List<String> eventNames) {
+ subscriptionScheduler = scheduler.scheduleWithFixedDelay(() -> {
+ eventNames.forEach(this::subscribe);
+ stopSubscriptionScheduler();
}, 0, SUBSCRIBE_DELAY, TimeUnit.MILLISECONDS);
}