package reactor.aeron;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ThreadFactory;
import java.util.function.Consumer;
import org.agrona.concurrent.IdleStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.MonoSink;

/* loaded from: input_file:reactor/aeron/AeronEventLoop.class */
public final class AeronEventLoop implements OnDisposable {
    private static final Logger logger = LoggerFactory.getLogger(AeronEventLoop.class);
    private final IdleStrategy idleStrategy;
    private volatile Thread thread;
    private final MonoProcessor<Void> dispose = MonoProcessor.create();
    private final MonoProcessor<Void> onDispose = MonoProcessor.create();
    private final Queue<CommandTask> commandTasks = new ConcurrentLinkedQueue();
    private final List<MessagePublication> publications = new ArrayList();
    private final List<MessageSubscription> subscriptions = new ArrayList();
    private final Mono<Worker> workerMono = Mono.fromCallable(this::createWorker).cache();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:reactor/aeron/AeronEventLoop$CommandTask.class */
    public static class CommandTask implements Runnable {
        private final MonoSink<Void> sink;
        private final Consumer<MonoSink<Void>> consumer;

        private CommandTask(MonoSink<Void> monoSink, Consumer<MonoSink<Void>> consumer) {
            this.sink = monoSink;
            this.consumer = consumer;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.consumer.accept(this.sink);
            } catch (Exception e) {
                AeronEventLoop.logger.warn("Exception occurred on command task: {}", e);
                this.sink.error(e);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void cancel() {
            this.sink.error(Exceptions.failWithCancel());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:reactor/aeron/AeronEventLoop$Worker.class */
    public class Worker implements Runnable {
        private Worker() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!AeronEventLoop.this.dispose.isDisposed()) {
                while (true) {
                    CommandTask commandTask = (CommandTask) AeronEventLoop.this.commandTasks.poll();
                    if (commandTask == null) {
                        break;
                    } else {
                        commandTask.run();
                    }
                }
                int i = 0;
                int size = AeronEventLoop.this.publications.size();
                for (int i2 = 0; i2 < size; i2++) {
                    i += ((MessagePublication) AeronEventLoop.this.publications.get(i2)).proceed();
                }
                int size2 = AeronEventLoop.this.subscriptions.size();
                for (int i3 = 0; i3 < size2; i3++) {
                    i += ((MessageSubscription) AeronEventLoop.this.subscriptions.get(i3)).poll();
                }
                AeronEventLoop.this.idleStrategy.idle(i);
            }
            try {
                disposeCommandTasks();
                disposeSubscriptions();
                disposePublications();
                AeronEventLoop.this.onDispose.onComplete();
            } catch (Throwable th) {
                AeronEventLoop.this.onDispose.onComplete();
                throw th;
            }
        }

        private void disposeCommandTasks() {
            while (true) {
                CommandTask commandTask = (CommandTask) AeronEventLoop.this.commandTasks.poll();
                if (commandTask == null) {
                    return;
                } else {
                    commandTask.cancel();
                }
            }
        }

        private void disposePublications() {
            Iterator it = AeronEventLoop.this.publications.iterator();
            while (it.hasNext()) {
                MessagePublication messagePublication = (MessagePublication) it.next();
                try {
                    messagePublication.close();
                } catch (Exception e) {
                    AeronEventLoop.logger.warn("Exception occurred on closing publication: {}, cause: {}", messagePublication, e);
                }
                it.remove();
            }
        }

        private void disposeSubscriptions() {
            Iterator it = AeronEventLoop.this.subscriptions.iterator();
            while (it.hasNext()) {
                MessageSubscription messageSubscription = (MessageSubscription) it.next();
                try {
                    messageSubscription.close();
                } catch (Exception e) {
                    AeronEventLoop.logger.warn("Exception occurred on closing subscription: {}, cause: {}", messageSubscription, e);
                }
                it.remove();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AeronEventLoop(IdleStrategy idleStrategy) {
        this.idleStrategy = idleStrategy;
    }

    private Worker createWorker() {
        ThreadFactory defaultThreadFactory = defaultThreadFactory();
        Worker worker = new Worker();
        this.thread = defaultThreadFactory.newThread(worker);
        this.thread.start();
        return worker;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean inEventLoop() {
        return this.thread == Thread.currentThread();
    }

    public Mono<Void> execute(Consumer<MonoSink<Void>> consumer) {
        return worker().flatMap(worker -> {
            return command(consumer);
        });
    }

    public Mono<Void> register(MessagePublication messagePublication) {
        return worker().flatMap(worker -> {
            return command(monoSink -> {
                register(messagePublication, (MonoSink<Void>) monoSink);
            });
        });
    }

    private void register(MessagePublication messagePublication, MonoSink<Void> monoSink) {
        Objects.requireNonNull(messagePublication, "messagePublication must be not null");
        this.publications.add(messagePublication);
        monoSink.success();
    }

    public Mono<Void> register(MessageSubscription messageSubscription) {
        return worker().flatMap(worker -> {
            return command(monoSink -> {
                register(messageSubscription, (MonoSink<Void>) monoSink);
            });
        });
    }

    private void register(MessageSubscription messageSubscription, MonoSink<Void> monoSink) {
        Objects.requireNonNull(messageSubscription, "messageSubscription must be not null");
        this.subscriptions.add(messageSubscription);
        monoSink.success();
    }

    public Mono<Void> dispose(MessagePublication messagePublication) {
        return worker().flatMap(worker -> {
            return command(monoSink -> {
                dispose(messagePublication, (MonoSink<Void>) monoSink);
            });
        });
    }

    private void dispose(MessagePublication messagePublication, MonoSink<Void> monoSink) {
        this.publications.removeIf(messagePublication2 -> {
            return messagePublication2 == messagePublication;
        });
        Optional.ofNullable(messagePublication).ifPresent((v0) -> {
            v0.close();
        });
        monoSink.success();
    }

    public Mono<Void> dispose(MessageSubscription messageSubscription) {
        return worker().flatMap(worker -> {
            return command(monoSink -> {
                dispose(messageSubscription, (MonoSink<Void>) monoSink);
            });
        });
    }

    private void dispose(MessageSubscription messageSubscription, MonoSink<Void> monoSink) {
        this.subscriptions.removeIf(messageSubscription2 -> {
            return messageSubscription2 == messageSubscription;
        });
        Optional.ofNullable(messageSubscription).ifPresent((v0) -> {
            v0.close();
        });
        monoSink.success();
    }

    public void dispose() {
        this.dispose.onComplete();
    }

    @Override // reactor.aeron.OnDisposable
    public Mono<Void> onDispose() {
        return this.onDispose;
    }

    public boolean isDisposed() {
        return this.onDispose.isDisposed();
    }

    private static ThreadFactory defaultThreadFactory() {
        return runnable -> {
            Thread thread = new Thread(runnable);
            thread.setName("aeron-event-loop");
            thread.setUncaughtExceptionHandler((thread2, th) -> {
                logger.error("Uncaught exception occurred: {}", th, th);
            });
            return thread;
        };
    }

    private Mono<Worker> worker() {
        return this.workerMono.takeUntilOther(listenDispose());
    }

    private Mono<Void> command(Consumer<MonoSink<Void>> consumer) {
        return Mono.create(monoSink -> {
            this.commandTasks.add(new CommandTask(monoSink, consumer));
        });
    }

    private <T> Mono<T> listenDispose() {
        return this.dispose.map(r2 -> {
            return r2;
        }).switchIfEmpty(Mono.error(Exceptions::failWithRejected));
    }
}
