package io.atleon.polling.reactive;

import io.atleon.polling.Pollable;
import io.atleon.polling.Polled;
import java.time.Duration;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:io/atleon/polling/reactive/PollerImp.class */
public class PollerImp<P, O> implements Poller<P, O> {
    private final Pollable<P, O> pollable;
    private final PollingEventLoop<P, O> eventLoop;
    private final Sinks.Many<Collection<Polled<P, O>>> sink = Sinks.many().unicast().onBackpressureBuffer();
    private final Scheduler scheduler = Schedulers.newSingle(new EventThreadFactory());

    /* loaded from: input_file:io/atleon/polling/reactive/PollerImp$EventThreadFactory.class */
    static final class EventThreadFactory implements ThreadFactory {
        static final String PREFIX = "reactive-polling";
        static final AtomicLong COUNTER_REFERENCE = new AtomicLong();

        /* loaded from: input_file:io/atleon/polling/reactive/PollerImp$EventThreadFactory$EmitterThread.class */
        static final class EmitterThread extends Thread {
            EmitterThread(Runnable runnable, String str) {
                super(runnable, str);
            }
        }

        EventThreadFactory() {
        }

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            EmitterThread emitterThread = new EmitterThread(runnable, "reactive-polling-" + COUNTER_REFERENCE.incrementAndGet());
            emitterThread.setUncaughtExceptionHandler(PollerImp::defaultUncaughtException);
            return emitterThread;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public PollerImp(Pollable<P, O> pollable, Duration duration) {
        this.pollable = pollable;
        this.eventLoop = new PollingEventLoop<>(this.scheduler, pollable, duration, this.sink);
        this.scheduler.init();
    }

    @Override // io.atleon.polling.reactive.Poller
    public Pollable<P, O> getPollable() {
        return this.pollable;
    }

    @Override // io.atleon.polling.reactive.Poller
    public Flux<Collection<Polled<P, O>>> receive() {
        Flux asFlux = this.sink.asFlux();
        PollingEventLoop<P, O> pollingEventLoop = this.eventLoop;
        Objects.requireNonNull(pollingEventLoop);
        return asFlux.doOnRequest(pollingEventLoop::onRequest);
    }

    @Override // io.atleon.polling.reactive.Poller
    public Mono<Void> close() {
        return this.eventLoop.stop().doFinally(signalType -> {
            this.scheduler.dispose();
        });
    }

    static void defaultUncaughtException(Thread thread, Throwable th) {
        System.out.println("Polling worker in group " + thread.getThreadGroup().getName() + " failed with an uncaught exception");
    }
}
