package reactor.aeron;

import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ThreadFactory;
import java.util.function.Consumer;
import javax.management.ObjectName;
import javax.management.StandardMBean;
import org.agrona.concurrent.IdleStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.MonoSink;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:reactor/aeron/AeronEventLoop.class */
public final class AeronEventLoop implements OnDisposable {
    private static final Logger logger = LoggerFactory.getLogger(AeronEventLoop.class);
    private final IdleStrategy idleStrategy;
    private final String name;
    private final int workerId;
    private final int groupId;
    private final Queue<CommandTask> commands = new ConcurrentLinkedQueue();
    private final List<MessagePublication> publications = new ArrayList();
    private final List<MessageSubscription> subscriptions = new ArrayList();
    private final List<DefaultAeronInbound> inbounds = new ArrayList();
    private final MonoProcessor<Void> dispose = MonoProcessor.create();
    private final MonoProcessor<Void> onDispose = MonoProcessor.create();
    private final Mono<Worker> workerMono = Mono.fromCallable(this::createWorker).cache();
    private volatile Thread thread;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:reactor/aeron/AeronEventLoop$CommandTask.class */
    public static class CommandTask<T> implements Runnable {
        private final MonoSink<T> sink;
        private final Consumer<MonoSink<T>> consumer;

        private CommandTask(MonoSink<T> monoSink, Consumer<MonoSink<T>> consumer) {
            this.sink = monoSink;
            this.consumer = consumer;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.consumer.accept(this.sink);
            } catch (Exception e) {
                AeronEventLoop.logger.error("Exception occurred on CommandTask: ", e);
                this.sink.error(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:reactor/aeron/AeronEventLoop$Worker.class */
    public class Worker implements Runnable {
        private final WorkerFlightRecorder flightRecorder;

        public Worker(WorkerFlightRecorder workerFlightRecorder) {
            this.flightRecorder = workerFlightRecorder;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.flightRecorder.start();
            while (!AeronEventLoop.this.dispose.isDisposed()) {
                this.flightRecorder.countTick();
                processCommands();
                int processOutbound = processOutbound();
                this.flightRecorder.countOutbound(processOutbound);
                int processInbound = processInbound();
                this.flightRecorder.countInbound(processInbound);
                int i = processOutbound + processInbound;
                if (i < 1) {
                    this.flightRecorder.countIdle();
                } else {
                    this.flightRecorder.countWork(i);
                }
                this.flightRecorder.tryReport();
                AeronEventLoop.this.idleStrategy.idle(i);
            }
            try {
                processCommands();
                AeronEventLoop.this.disposeInbounds();
                disposeSubscriptions();
                disposePublications();
                AeronEventLoop.this.onDispose.onComplete();
            } catch (Throwable th) {
                AeronEventLoop.this.onDispose.onComplete();
                throw th;
            }
        }

        private int processInbound() {
            int i = 0;
            int size = AeronEventLoop.this.inbounds.size();
            for (int i2 = 0; i2 < size; i2++) {
                try {
                    i += ((DefaultAeronInbound) AeronEventLoop.this.inbounds.get(i2)).poll();
                } catch (Exception e) {
                    AeronEventLoop.logger.error("Unexpected exception occurred on inbound.poll(): ", e);
                }
            }
            return i;
        }

        private int processOutbound() {
            int i = 0;
            int size = AeronEventLoop.this.publications.size();
            for (int i2 = 0; i2 < size; i2++) {
                try {
                    i += ((MessagePublication) AeronEventLoop.this.publications.get(i2)).proceed();
                } catch (Exception e) {
                    AeronEventLoop.logger.error("Unexpected exception occurred on publication.proceed(): ", e);
                }
            }
            return i;
        }

        private void processCommands() {
            while (true) {
                CommandTask commandTask = (CommandTask) AeronEventLoop.this.commands.poll();
                if (commandTask == null) {
                    return;
                } else {
                    commandTask.run();
                }
            }
        }

        private void disposePublications() {
            Iterator it = AeronEventLoop.this.publications.iterator();
            while (it.hasNext()) {
                MessagePublication messagePublication = (MessagePublication) it.next();
                it.remove();
                try {
                    messagePublication.close();
                } catch (Exception e) {
                }
            }
        }

        private void disposeSubscriptions() {
            Iterator it = AeronEventLoop.this.subscriptions.iterator();
            while (it.hasNext()) {
                MessageSubscription messageSubscription = (MessageSubscription) it.next();
                it.remove();
                try {
                    messageSubscription.close();
                } catch (Exception e) {
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AeronEventLoop(String str, int i, int i2, IdleStrategy idleStrategy) {
        this.name = str;
        this.workerId = i;
        this.groupId = i2;
        this.idleStrategy = idleStrategy;
    }

    private static ThreadFactory defaultThreadFactory(String str) {
        return runnable -> {
            Thread thread = new Thread(runnable);
            thread.setName(str);
            thread.setUncaughtExceptionHandler((thread2, th) -> {
                logger.error("Uncaught exception occurred: ", th);
            });
            return thread;
        };
    }

    private Worker createWorker() throws Exception {
        String format = String.format("%s-%x-%d", this.name, Integer.valueOf(this.groupId), Integer.valueOf(this.workerId));
        ThreadFactory defaultThreadFactory = defaultThreadFactory(format);
        WorkerFlightRecorder workerFlightRecorder = new WorkerFlightRecorder();
        ManagementFactory.getPlatformMBeanServer().registerMBean(new StandardMBean(workerFlightRecorder, WorkerMBean.class), new ObjectName("reactor.aeron:name=" + format));
        Worker worker = new Worker(workerFlightRecorder);
        this.thread = defaultThreadFactory.newThread(worker);
        this.thread.start();
        return worker;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean inEventLoop() {
        return this.thread == Thread.currentThread();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<MessagePublication> registerPublication(MessagePublication messagePublication) {
        return worker().flatMap(worker -> {
            return command(monoSink -> {
                if (cancelIfDisposed(monoSink)) {
                    return;
                }
                this.publications.add(messagePublication);
                logger.debug("Registered {}", messagePublication);
                monoSink.success(messagePublication);
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<MessageSubscription> registerSubscription(MessageSubscription messageSubscription) {
        return worker().flatMap(worker -> {
            return command(monoSink -> {
                if (cancelIfDisposed(monoSink)) {
                    return;
                }
                this.subscriptions.add(messageSubscription);
                logger.debug("Registered {}", messageSubscription);
                monoSink.success(messageSubscription);
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<DefaultAeronInbound> registerInbound(DefaultAeronInbound defaultAeronInbound) {
        return worker().flatMap(worker -> {
            return command(monoSink -> {
                if (cancelIfDisposed(monoSink)) {
                    return;
                }
                this.inbounds.add(defaultAeronInbound);
                logger.debug("Registered {}", defaultAeronInbound);
                monoSink.success(defaultAeronInbound);
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> disposePublication(MessagePublication messagePublication) {
        return worker().flatMap(worker -> {
            return command(monoSink -> {
                this.publications.remove(messagePublication);
                messagePublication.getClass();
                Mono fromRunnable = Mono.fromRunnable(messagePublication::close);
                monoSink.getClass();
                Consumer consumer = monoSink::error;
                monoSink.getClass();
                fromRunnable.subscribe((Consumer) null, consumer, monoSink::success);
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> disposeSubscription(MessageSubscription messageSubscription) {
        return worker().flatMap(worker -> {
            return command(monoSink -> {
                this.subscriptions.remove(messageSubscription);
                messageSubscription.getClass();
                Mono fromRunnable = Mono.fromRunnable(messageSubscription::close);
                monoSink.getClass();
                Consumer consumer = monoSink::error;
                monoSink.getClass();
                fromRunnable.subscribe((Consumer) null, consumer, monoSink::success);
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> disposeInbound(DefaultAeronInbound defaultAeronInbound) {
        return worker().flatMap(worker -> {
            return command(monoSink -> {
                this.inbounds.remove(defaultAeronInbound);
                defaultAeronInbound.getClass();
                Mono fromRunnable = Mono.fromRunnable(defaultAeronInbound::close);
                monoSink.getClass();
                Consumer consumer = monoSink::error;
                monoSink.getClass();
                fromRunnable.subscribe((Consumer) null, consumer, monoSink::success);
            });
        });
    }

    public void dispose() {
        this.dispose.onComplete();
        if (this.thread == null) {
            this.onDispose.onComplete();
        }
    }

    @Override // reactor.aeron.OnDisposable
    public Mono<Void> onDispose() {
        return this.onDispose;
    }

    public boolean isDisposed() {
        return this.onDispose.isDisposed();
    }

    private Mono<Worker> worker() {
        return this.workerMono.takeUntilOther(listenUnavailable());
    }

    private <T> Mono<T> command(Consumer<MonoSink<T>> consumer) {
        return Mono.create(monoSink -> {
            this.commands.add(new CommandTask(monoSink, consumer));
        });
    }

    private <T> Mono<T> listenUnavailable() {
        return this.dispose.map(r2 -> {
            return r2;
        }).switchIfEmpty(Mono.error(AeronExceptions::failWithEventLoopUnavailable));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void disposeInbounds() {
        Iterator<DefaultAeronInbound> it = this.inbounds.iterator();
        while (it.hasNext()) {
            DefaultAeronInbound next = it.next();
            it.remove();
            try {
                next.close();
            } catch (Exception e) {
            }
        }
    }

    private boolean cancelIfDisposed(MonoSink<?> monoSink) {
        boolean isDisposed = this.dispose.isDisposed();
        if (isDisposed) {
            monoSink.error(AeronExceptions.failWithCancel("CommandTask has cancelled"));
        }
        return isDisposed;
    }
}
