/*
 * Decompiled with CFR 0.152.
 */
package com.expediagroup.rhapsody.kafka.factory;

import com.expediagroup.rhapsody.api.Acknowledgeable;
import com.expediagroup.rhapsody.core.transformer.AutoAcknowledgementConfig;
import com.expediagroup.rhapsody.kafka.acknowledgement.ReceiverAcknowledgementStrategy;
import com.expediagroup.rhapsody.kafka.factory.KafkaConfigFactory;
import com.expediagroup.rhapsody.kafka.factory.KafkaFluxFactory;
import java.util.Collection;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

public class KafkaValueFluxFactory<V>
extends KafkaFluxFactory<Object, V> {
    public KafkaValueFluxFactory(KafkaConfigFactory configFactory) {
        super(configFactory);
    }

    public Flux<Flux<V>> receiveAutoGroupValue(Collection<String> topics, AutoAcknowledgementConfig autoAcknowledgementConfig, Function<? super Flux<ConsumerRecord<Object, V>>, ? extends Publisher<ConsumerRecord<Object, V>>> pregroup) {
        return this.receiveAutoGroup(topics, autoAcknowledgementConfig, pregroup).map(this::extractNonNullValues);
    }

    public Flux<V> receiveAutoValue(Collection<String> topics, AutoAcknowledgementConfig autoAcknowledgementConfig) {
        return this.receiveAuto(topics, autoAcknowledgementConfig).transform(this::extractNonNullValues);
    }

    public Flux<Flux<Acknowledgeable<V>>> receiveGroupValue(Collection<String> topics, ReceiverAcknowledgementStrategy receiverAcknowledgementStrategy, Function<? super Flux<Acknowledgeable<ConsumerRecord<Object, V>>>, ? extends Publisher<Acknowledgeable<ConsumerRecord<Object, V>>>> pregroup) {
        return this.receiveGroup(topics, receiverAcknowledgementStrategy, pregroup).map(this::extractAcknowledgeableNonNullValues);
    }

    public Flux<Acknowledgeable<V>> receiveValue(Collection<String> topics) {
        return this.receive(topics).transform(this::extractAcknowledgeableNonNullValues);
    }

    public Flux<Acknowledgeable<V>> receiveValue(Collection<String> topics, ReceiverAcknowledgementStrategy receiverAcknowledgementStrategy) {
        return this.receive(topics, receiverAcknowledgementStrategy).transform(this::extractAcknowledgeableNonNullValues);
    }

    private Flux<V> extractNonNullValues(Flux<? extends ConsumerRecord<Object, V>> consumerRecords) {
        return consumerRecords.filter(consumerRecord -> consumerRecord.value() != null).map(ConsumerRecord::value);
    }

    private Flux<Acknowledgeable<V>> extractAcknowledgeableNonNullValues(Flux<? extends Acknowledgeable<ConsumerRecord<Object, V>>> acknowledgeables) {
        return acknowledgeables.filter(Acknowledgeable.filtering(consumerRecord -> consumerRecord.value() != null, Acknowledgeable::acknowledge)).map(Acknowledgeable.mapping(ConsumerRecord::value));
    }
}

