package io.atleon.polling.reactive;

import io.atleon.polling.Pollable;
import io.atleon.polling.Polled;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.util.annotation.NonNull;

/* loaded from: input_file:io/atleon/polling/reactive/PollingEventLoop.class */
public class PollingEventLoop<P, O> implements Sinks.EmitFailureHandler {
    private static final Logger log = LoggerFactory.getLogger(PollingEventLoop.class);
    private final Scheduler scheduler;
    private final Pollable<P, O> pollable;
    private final Sinks.Many<Collection<Polled<P, O>>> sink;
    private final Duration pollingInterval;
    private final AtomicBoolean active = new AtomicBoolean(true);
    private final PollingEventLoop<P, O>.PollingEvent pollEvent = new PollingEvent();

    /* loaded from: input_file:io/atleon/polling/reactive/PollingEventLoop$PollingEvent.class */
    class PollingEvent implements Runnable {
        private final AtomicBoolean scheduled = new AtomicBoolean(false);

        PollingEvent() {
        }

        public void stop() {
            PollingEventLoop.this.active.compareAndSet(true, false);
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (PollingEventLoop.this.active.get()) {
                    Collection<Polled<P, O>> poll = PollingEventLoop.this.pollable.poll();
                    if (PollingEventLoop.this.active.get()) {
                        this.scheduled.set(false);
                        schedule();
                    }
                    if (poll.iterator().hasNext()) {
                        PollingEventLoop.this.sink.emitNext(poll, PollingEventLoop.this);
                    }
                }
            } catch (Exception e) {
                if (PollingEventLoop.this.active.get()) {
                    PollingEventLoop.log.error("Unexpected exception", e);
                    PollingEventLoop.this.sink.emitError(e, PollingEventLoop.this);
                }
            }
        }

        void schedule() {
            schedule(PollingEventLoop.this.pollingInterval.toMillis());
        }

        void scheduleImmediate() {
            schedule(0L);
        }

        void schedule(long j) {
            if (this.scheduled.getAndSet(true)) {
                return;
            }
            PollingEventLoop.this.scheduler.schedule(this, j, TimeUnit.MILLISECONDS);
        }
    }

    public PollingEventLoop(Scheduler scheduler, Pollable<P, O> pollable, Duration duration, Sinks.Many<Collection<Polled<P, O>>> many) {
        this.scheduler = scheduler;
        this.pollable = pollable;
        this.pollingInterval = duration;
        this.sink = many;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onRequest(long j) {
        this.active.set(true);
        this.pollEvent.scheduleImmediate();
    }

    public Mono<Void> stop() {
        return Mono.defer(() -> {
            this.pollEvent.stop();
            return Mono.empty();
        }).onErrorResume(th -> {
            return Mono.empty();
        });
    }

    public boolean onEmitFailure(@NonNull SignalType signalType, @NonNull Sinks.EmitResult emitResult) {
        return this.active.get() && emitResult == Sinks.EmitResult.FAIL_NON_SERIALIZED;
    }
}
