package com.expediagroup.rhapsody.kafka.factory;

import com.expediagroup.rhapsody.api.Acknowledgeable;
import com.expediagroup.rhapsody.kafka.sending.AcknowledgeableSenderResult;
import com.expediagroup.rhapsody.util.ConfigLoading;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;

/* loaded from: input_file:com/expediagroup/rhapsody/kafka/factory/KafkaSenderFactory.class */
public class KafkaSenderFactory<K, V> {
    public static final String MAX_IN_FLIGHT_CONFIG = "max.in.flight";
    public static final String HEADERS_ENABLED_CONFIG = "headers.enabled";
    public static final String STOP_ON_ERROR_CONFIG = "stop.on.error";
    public static final String RESUBSCRIBE_ON_ERROR_CONFIG = "resubscribe.on.error";
    private static final boolean DEFAULT_HEADERS_ENABLED = false;
    private static final boolean DEFAULT_STOP_ON_ERROR = false;
    private static final boolean DEFAULT_RESUBSCRIBE_ON_ERROR = true;
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSenderFactory.class);
    private static final Map<String, AtomicLong> REGISTRATION_COUNTS_BY_CLIENT_ID = new ConcurrentHashMap();
    private final KafkaSender<K, V> kafkaSender;
    private final boolean headersEnabled;
    private final boolean resubscribeOnError;

    public KafkaSenderFactory(KafkaConfigFactory kafkaConfigFactory) {
        Map<String, Object> map = (Map) kafkaConfigFactory.create();
        this.kafkaSender = KafkaSender.create(createSenderOptions(map));
        this.headersEnabled = ((Boolean) ConfigLoading.load(map, HEADERS_ENABLED_CONFIG, Boolean::valueOf, false)).booleanValue();
        this.resubscribeOnError = ((Boolean) ConfigLoading.load(map, RESUBSCRIBE_ON_ERROR_CONFIG, Boolean::valueOf, true)).booleanValue();
    }

    public Function<Publisher<Acknowledgeable<ProducerRecord<K, V>>>, Flux<Acknowledgeable<SenderResult<V>>>> sendAcknowledgeable() {
        return this::sendAcknowledgeable;
    }

    public Flux<Acknowledgeable<SenderResult<V>>> sendAcknowledgeable(Publisher<Acknowledgeable<ProducerRecord<K, V>>> publisher) {
        return Flux.from(publisher).map(this::createAcknowledgeableValuedSenderRecord).transform(this::sendRecords).map(AcknowledgeableSenderResult::fromSenderResult);
    }

    public Function<Publisher<ProducerRecord<K, V>>, Flux<SenderResult<V>>> send() {
        return this::send;
    }

    public Flux<SenderResult<V>> send(Publisher<ProducerRecord<K, V>> publisher) {
        return Flux.from(publisher).map(this::createValuedSenderRecord).transform(this::sendRecords);
    }

    private SenderOptions<K, V> createSenderOptions(Map<String, Object> map) {
        SenderOptions<K, V> create = SenderOptions.create(map);
        String registerNewClient = registerNewClient(Objects.toString(create.producerProperty("client.id")));
        create.producerProperty("client.id", registerNewClient);
        create.scheduler(Schedulers.newSingle(KafkaSenderFactory.class.getSimpleName() + "-" + registerNewClient));
        create.maxInFlight(((Integer) ConfigLoading.load(map, MAX_IN_FLIGHT_CONFIG, Integer::valueOf, Integer.valueOf(create.maxInFlight()))).intValue());
        create.stopOnError(((Boolean) ConfigLoading.load(map, STOP_ON_ERROR_CONFIG, Boolean::valueOf, false)).booleanValue());
        return create;
    }

    private <R> Flux<SenderResult<R>> sendRecords(Flux<SenderRecord<K, V, R>> flux) {
        KafkaSender<K, V> kafkaSender = this.kafkaSender;
        Objects.requireNonNull(kafkaSender);
        return flux.transform((v1) -> {
            return r1.send(v1);
        }).doOnError(th -> {
            LOGGER.warn("An Error was encountered while trying to send to Kafka. resubscribeOnError={}", Boolean.valueOf(this.resubscribeOnError), th);
        }).retry(th2 -> {
            return this.resubscribeOnError;
        });
    }

    private SenderRecord<K, V, Acknowledgeable<V>> createAcknowledgeableValuedSenderRecord(Acknowledgeable<ProducerRecord<K, V>> acknowledgeable) {
        return (SenderRecord<K, V, Acknowledgeable<V>>) createSenderRecord((ProducerRecord) acknowledgeable.get(), acknowledgeable.map((v0) -> {
            return v0.value();
        }));
    }

    private SenderRecord<K, V, V> createValuedSenderRecord(ProducerRecord<K, V> producerRecord) {
        return (SenderRecord<K, V, V>) createSenderRecord(producerRecord, producerRecord.value());
    }

    private <T> SenderRecord<K, V, T> createSenderRecord(ProducerRecord<K, V> producerRecord, T t) {
        return (this.headersEnabled || !producerRecord.headers().iterator().hasNext()) ? SenderRecord.create(producerRecord, t) : SenderRecord.create(new ProducerRecord(producerRecord.topic(), producerRecord.partition(), producerRecord.timestamp(), producerRecord.key(), producerRecord.value()), t);
    }

    private static String registerNewClient(String str) {
        return str + "-" + REGISTRATION_COUNTS_BY_CLIENT_ID.computeIfAbsent(str, str2 -> {
            return new AtomicLong();
        }).incrementAndGet();
    }
}
