package io.atleon.polling.reactive;

import io.atleon.polling.Pollable;
import java.util.function.BiFunction;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:io/atleon/polling/reactive/PollingReceiverImp.class */
public class PollingReceiverImp<P, O> implements PollingReceiver<P, O> {
    private final Pollable<P, O> pollable;
    private final PollerOptions pollerOptions;
    private Poller<P, O> poller;

    /* JADX INFO: Access modifiers changed from: protected */
    public PollingReceiverImp(Pollable<P, O> pollable, PollerOptions pollerOptions) {
        this.pollable = pollable;
        this.pollerOptions = pollerOptions;
    }

    @Override // io.atleon.polling.reactive.PollingReceiver
    public Flux<ReceiverRecord<P, O>> receive() {
        return (Flux<ReceiverRecord<P, O>>) withPoller((scheduler, poller) -> {
            return poller.receive().publishOn(scheduler, 1).flatMapIterable(collection -> {
                return collection;
            }).map(polled -> {
                return new ReceiverRecord(polled, poller.getPollable());
            });
        });
    }

    private <T> Flux<T> withPoller(BiFunction<Scheduler, Poller<P, O>, Flux<T>> biFunction) {
        return Flux.usingWhen(Mono.fromCallable(() -> {
            Poller<P, O> create = Poller.create(this.pollable, this.pollerOptions.getPollingInterval());
            this.poller = create;
            return create;
        }), poller -> {
            return Flux.using(() -> {
                return Schedulers.single(this.pollerOptions.getSchedulerSupplier().get());
            }, scheduler -> {
                return (Publisher) biFunction.apply(scheduler, poller);
            }, (v0) -> {
                v0.dispose();
            });
        }, poller2 -> {
            return poller2.close().doFinally(signalType -> {
                this.poller = null;
            });
        });
    }
}
