/*
 * Decompiled with CFR 0.152.
 */
package com.expediagroup.rhapsody.kafka.acknowledgement;

import com.expediagroup.rhapsody.api.Acknowledgeable;
import com.expediagroup.rhapsody.kafka.acknowledgement.ReceiverAcknowledgementStrategy;
import com.expediagroup.rhapsody.kafka.factory.AcknowledgeableConsumerRecordFactory;
import java.util.Map;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.reactivestreams.Publisher;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.kafka.receiver.ReceiverOffset;
import reactor.kafka.receiver.ReceiverRecord;

abstract class AbstractReceiverAcknowledgementStrategy
implements ReceiverAcknowledgementStrategy {
    AbstractReceiverAcknowledgementStrategy() {
    }

    @Override
    public final <K, V> Function<? super Publisher<ReceiverRecord<K, V>>, ? extends Publisher<Acknowledgeable<ConsumerRecord<K, V>>>> createRecordTransformer(Map<String, ?> properties) {
        AcknowledgeableConsumerRecordFactory acknowledgeableFactory = AcknowledgeableConsumerRecordFactory.create(properties);
        long maxInFlight = ReceiverAcknowledgementStrategy.loadMaxInFlightPerTopicPartition(properties).orElse(Long.MAX_VALUE);
        return source -> Flux.defer(() -> this.transform((Publisher)source, (AcknowledgeableConsumerRecordFactory)acknowledgeableFactory, maxInFlight));
    }

    protected final <K, V> Publisher<Acknowledgeable<ConsumerRecord<K, V>>> transform(Publisher<? extends ReceiverRecord<K, V>> source, AcknowledgeableConsumerRecordFactory<K, V> acknowledgeableFactory, long maxInFlight) {
        EmitterProcessor manualProcessor = EmitterProcessor.create();
        FluxSink sink = manualProcessor.sink();
        return Flux.from(source).map(receiverRecord -> acknowledgeableFactory.create(receiverRecord, () -> ((ReceiverOffset)receiverRecord.receiverOffset()).acknowledge(), arg_0 -> ((FluxSink)sink).error(arg_0))).mergeWith((Publisher)manualProcessor).transform(this.createOperator(maxInFlight));
    }

    protected abstract <K, V> UnaryOperator<Publisher<Acknowledgeable<ConsumerRecord<K, V>>>> createOperator(long var1);
}

