/*
 * Decompiled with CFR 0.152.
 */
package com.expediagroup.rhapsody.kafka.factory;

import com.expediagroup.rhapsody.api.Acknowledgeable;
import com.expediagroup.rhapsody.api.Headed;
import com.expediagroup.rhapsody.kafka.factory.KafkaConfigFactory;
import com.expediagroup.rhapsody.kafka.factory.KafkaSenderFactory;
import com.expediagroup.rhapsody.kafka.record.RecordHeaderConversion;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.kafka.sender.SenderResult;

public class KafkaValueSenderFactory<V>
extends KafkaSenderFactory<Object, V> {
    public KafkaValueSenderFactory(KafkaConfigFactory configFactory) {
        super(configFactory);
    }

    public Flux<Acknowledgeable<SenderResult<V>>> sendAcknowledgeableValues(Publisher<Acknowledgeable<V>> acknowledgeableValues, Function<V, String> valueToTopic, Function<V, ?> valueToKey) {
        return this.sendAcknowledgeable(Flux.from(acknowledgeableValues).map(acknowledgeable -> this.createAcknowledgeableProducerRecord((Acknowledgeable<V>)acknowledgeable, valueToTopic, valueToKey)));
    }

    public Flux<SenderResult<V>> sendValues(Publisher<V> values, Function<V, String> valueToTopic, Function<V, ?> valueToKey) {
        return this.send(Flux.from(values).map(value -> new ProducerRecord((String)valueToTopic.apply(value), null, valueToKey.apply(value), value, this.extractHeaders(value))));
    }

    protected Acknowledgeable<ProducerRecord<Object, V>> createAcknowledgeableProducerRecord(Acknowledgeable<V> acknowledgeable, Function<V, String> valueToTopic, Function<V, ?> valueToKey) {
        return acknowledgeable.map(value -> new ProducerRecord((String)valueToTopic.apply(value), null, valueToKey.apply(value), value, KafkaValueSenderFactory.createHeaders((Headed)acknowledgeable)));
    }

    protected List<Header> extractHeaders(V value) {
        return Headed.tryCast(value).map(KafkaValueSenderFactory::createHeaders).orElseGet(Collections::emptyList);
    }

    protected static List<Header> createHeaders(Headed headed) {
        return KafkaValueSenderFactory.createHeaders(headed.header().toMap());
    }

    protected static List<Header> createHeaders(Map<String, String> headerMap) {
        return headerMap.entrySet().stream().map(entry -> RecordHeaderConversion.toHeader((String)((String)entry.getKey()), (String)((String)entry.getValue()))).collect(Collectors.toList());
    }
}

