package io.atleon.polling;

import io.atleon.core.Alo;
import io.atleon.core.AloFactory;
import io.atleon.core.AloFlux;
import io.atleon.core.ComposedAlo;
import io.atleon.core.OrderManagingAcknowledgementOperator;
import io.atleon.polling.reactive.PollerOptions;
import io.atleon.polling.reactive.PollingReceiver;
import io.atleon.polling.reactive.PollingSourceConfig;
import io.atleon.polling.reactive.ReceiverRecord;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:io/atleon/polling/AloPollingReceiver.class */
public class AloPollingReceiver<P, O> {
    private final Pollable<P, O> pollable;
    private final PollingSourceConfig config;
    private final Mono<ReceiveResources<P, O>> resourcesMono;

    /* loaded from: input_file:io/atleon/polling/AloPollingReceiver$NackStrategy.class */
    public enum NackStrategy {
        EMIT(true, false),
        NACK(false, true),
        NACK_EMIT(true, true);

        private final boolean emit;
        private final boolean nack;

        NackStrategy(boolean z, boolean z2) {
            this.emit = z;
            this.nack = z2;
        }

        public boolean isEmit() {
            return this.emit;
        }

        public boolean isNack() {
            return this.nack;
        }
    }

    /* loaded from: input_file:io/atleon/polling/AloPollingReceiver$ReceiveResources.class */
    private static final class ReceiveResources<P, O> {
        private final AloFactory<Polled<P, O>> aloFactory;
        private final NackStrategy nackStrategy;

        private ReceiveResources(AloFactory<Polled<P, O>> aloFactory, NackStrategy nackStrategy) {
            this.aloFactory = aloFactory;
            this.nackStrategy = nackStrategy;
        }

        static <P, O> ReceiveResources<P, O> create(AloFactory<Polled<P, O>> aloFactory, NackStrategy nackStrategy) {
            return new ReceiveResources<>(aloFactory, nackStrategy);
        }

        public Flux<Alo<Polled<P, O>>> receive(PollingReceiver<P, O> pollingReceiver) {
            return pollingReceiver.receive().transform(this::createAloRecords);
        }

        private Flux<Alo<Polled<P, O>>> createAloRecords(Flux<ReceiverRecord<P, O>> flux) {
            Sinks.Empty empty = Sinks.empty();
            return flux.map(receiverRecord -> {
                return this.aloFactory.create(receiverRecord.getRecord(), () -> {
                    ack(receiverRecord);
                }, th -> {
                    nack(empty, th, receiverRecord);
                });
            }).mergeWith(empty.asMono()).transform(flux2 -> {
                return new OrderManagingAcknowledgementOperator(flux2, (v0) -> {
                    return v0.getGroup();
                });
            });
        }

        private void ack(ReceiverRecord<P, O> receiverRecord) {
            receiverRecord.getPollable().ack(receiverRecord.getRecord().getOffset());
        }

        private void nack(Sinks.Empty<Alo<Polled<P, O>>> empty, Throwable th, ReceiverRecord<P, O> receiverRecord) {
            if (this.nackStrategy.nack) {
                receiverRecord.getPollable().nack(th, receiverRecord.getRecord().getOffset());
            }
            if (this.nackStrategy.emit) {
                empty.tryEmitError(th);
            }
        }
    }

    private AloPollingReceiver(Pollable<P, O> pollable, PollingSourceConfig pollingSourceConfig) {
        this.pollable = pollable;
        this.config = pollingSourceConfig;
        this.resourcesMono = Mono.just(ReceiveResources.create(ComposedAlo.factory(), pollingSourceConfig.getNackStrategy()));
    }

    public static <P, O> AloPollingReceiver<P, O> from(Pollable<P, O> pollable, PollingSourceConfig pollingSourceConfig) {
        return new AloPollingReceiver<>(pollable, pollingSourceConfig);
    }

    public AloFlux<P> receivePayloads() {
        return ((AloFlux) this.resourcesMono.flatMapMany(receiveResources -> {
            return receiveResources.receive(PollingReceiver.create(this.pollable, buildPollerOptions()));
        }).as((v0) -> {
            return AloFlux.wrap(v0);
        })).map((v0) -> {
            return v0.getPayload();
        });
    }

    private PollerOptions buildPollerOptions() {
        return PollerOptions.create(this.config.getPollingInterval(), () -> {
            return Schedulers.newSingle(AloPollingReceiver.class.getSimpleName());
        });
    }
}
