package com.expediagroup.rhapsody.kafka.factory;

import com.expediagroup.rhapsody.api.Acknowledgeable;
import com.expediagroup.rhapsody.core.transformer.AutoAcknowledgementConfig;
import com.expediagroup.rhapsody.kafka.acknowledgement.ReceiverAcknowledgementStrategy;
import java.util.Collection;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

/* loaded from: input_file:com/expediagroup/rhapsody/kafka/factory/KafkaValueFluxFactory.class */
public class KafkaValueFluxFactory<V> extends KafkaFluxFactory<Object, V> {
    public KafkaValueFluxFactory(KafkaConfigFactory kafkaConfigFactory) {
        super(kafkaConfigFactory);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public Flux<Flux<V>> receiveAutoGroupValue(Collection<String> collection, AutoAcknowledgementConfig autoAcknowledgementConfig, Function<? super Flux<ConsumerRecord<Object, V>>, ? extends Publisher<ConsumerRecord<Object, V>>> function) {
        return receiveAutoGroup(collection, autoAcknowledgementConfig, function).map((v1) -> {
            return extractNonNullValues(v1);
        });
    }

    public Flux<V> receiveAutoValue(Collection<String> collection, AutoAcknowledgementConfig autoAcknowledgementConfig) {
        return receiveAuto(collection, autoAcknowledgementConfig).transform(this::extractNonNullValues);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public Flux<Flux<Acknowledgeable<V>>> receiveGroupValue(Collection<String> collection, ReceiverAcknowledgementStrategy receiverAcknowledgementStrategy, Function<? super Flux<Acknowledgeable<ConsumerRecord<Object, V>>>, ? extends Publisher<Acknowledgeable<ConsumerRecord<Object, V>>>> function) {
        return receiveGroup(collection, receiverAcknowledgementStrategy, function).map((v1) -> {
            return extractAcknowledgeableNonNullValues(v1);
        });
    }

    public Flux<Acknowledgeable<V>> receiveValue(Collection<String> collection) {
        return receive(collection).transform(this::extractAcknowledgeableNonNullValues);
    }

    public Flux<Acknowledgeable<V>> receiveValue(Collection<String> collection, ReceiverAcknowledgementStrategy receiverAcknowledgementStrategy) {
        return receive(collection, receiverAcknowledgementStrategy).transform(this::extractAcknowledgeableNonNullValues);
    }

    private Flux<V> extractNonNullValues(Flux<? extends ConsumerRecord<Object, V>> flux) {
        return flux.filter(consumerRecord -> {
            return consumerRecord.value() != null;
        }).map((v0) -> {
            return v0.value();
        });
    }

    private Flux<Acknowledgeable<V>> extractAcknowledgeableNonNullValues(Flux<? extends Acknowledgeable<ConsumerRecord<Object, V>>> flux) {
        return flux.filter(Acknowledgeable.filtering(consumerRecord -> {
            return consumerRecord.value() != null;
        }, (v0) -> {
            v0.acknowledge();
        })).map(Acknowledgeable.mapping((v0) -> {
            return v0.value();
        }));
    }
}
