/*
 * Decompiled with CFR 0.152.
 */
package com.expediagroup.rhapsody.kafka.factory;

import com.expediagroup.rhapsody.api.Acknowledgeable;
import com.expediagroup.rhapsody.kafka.factory.KafkaConfigFactory;
import com.expediagroup.rhapsody.kafka.sending.AcknowledgeableSenderResult;
import com.expediagroup.rhapsody.util.ConfigLoading;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;

public class KafkaSenderFactory<K, V> {
    public static final String MAX_IN_FLIGHT_CONFIG = "max.in.flight";
    public static final String HEADERS_ENABLED_CONFIG = "headers.enabled";
    public static final String STOP_ON_ERROR_CONFIG = "stop.on.error";
    public static final String RESUBSCRIBE_ON_ERROR_CONFIG = "resubscribe.on.error";
    private static final boolean DEFAULT_HEADERS_ENABLED = false;
    private static final boolean DEFAULT_STOP_ON_ERROR = false;
    private static final boolean DEFAULT_RESUBSCRIBE_ON_ERROR = true;
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSenderFactory.class);
    private static final Map<String, AtomicLong> REGISTRATION_COUNTS_BY_CLIENT_ID = new ConcurrentHashMap<String, AtomicLong>();
    private final KafkaSender<K, V> kafkaSender;
    private final boolean headersEnabled;
    private final boolean resubscribeOnError;

    public KafkaSenderFactory(KafkaConfigFactory configFactory) {
        Map properties = (Map)configFactory.create();
        this.kafkaSender = KafkaSender.create(this.createSenderOptions(properties));
        this.headersEnabled = (Boolean)ConfigLoading.load((Map)properties, (String)HEADERS_ENABLED_CONFIG, Boolean::valueOf, (Object)false);
        this.resubscribeOnError = (Boolean)ConfigLoading.load((Map)properties, (String)RESUBSCRIBE_ON_ERROR_CONFIG, Boolean::valueOf, (Object)true);
    }

    public Flux<Acknowledgeable<SenderResult<V>>> sendAcknowledgeable(Publisher<Acknowledgeable<ProducerRecord<K, V>>> acknowledgeables) {
        return Flux.from(acknowledgeables).map(this::createAcknowledgeableValuedSenderRecord).transform(this::sendRecords).map(AcknowledgeableSenderResult::fromSenderResult);
    }

    public Flux<SenderResult<V>> send(Publisher<ProducerRecord<K, V>> producerRecords) {
        return Flux.from(producerRecords).map(this::createValuedSenderRecord).transform(this::sendRecords);
    }

    private SenderOptions<K, V> createSenderOptions(Map<String, Object> properties) {
        SenderOptions senderOptions = SenderOptions.create(properties);
        String uniqueClientId = KafkaSenderFactory.registerNewClient(Objects.toString(senderOptions.producerProperty("client.id")));
        senderOptions.producerProperty("client.id", (Object)uniqueClientId);
        senderOptions.scheduler(Schedulers.newSingle((String)(KafkaSenderFactory.class.getSimpleName() + "-" + uniqueClientId)));
        senderOptions.maxInFlight(((Integer)ConfigLoading.load(properties, (String)MAX_IN_FLIGHT_CONFIG, Integer::valueOf, (Object)senderOptions.maxInFlight())).intValue());
        senderOptions.stopOnError(((Boolean)ConfigLoading.load(properties, (String)STOP_ON_ERROR_CONFIG, Boolean::valueOf, (Object)false)).booleanValue());
        return senderOptions;
    }

    private <R> Flux<SenderResult<R>> sendRecords(Flux<SenderRecord<K, V, R>> senderRecords) {
        return senderRecords.transform(arg_0 -> this.kafkaSender.send(arg_0)).doOnError(error -> LOGGER.warn("An Error was encountered while trying to send to Kafka. resubscribeOnError={}", (Object)this.resubscribeOnError, error)).retry(error -> this.resubscribeOnError);
    }

    private SenderRecord<K, V, Acknowledgeable<V>> createAcknowledgeableValuedSenderRecord(Acknowledgeable<ProducerRecord<K, V>> acknowledgeable) {
        return this.createSenderRecord((ProducerRecord)acknowledgeable.get(), acknowledgeable.map(ProducerRecord::value));
    }

    private SenderRecord<K, V, V> createValuedSenderRecord(ProducerRecord<K, V> producerRecord) {
        return this.createSenderRecord(producerRecord, producerRecord.value());
    }

    private <T> SenderRecord<K, V, T> createSenderRecord(ProducerRecord<K, V> record, T correlationMetadata) {
        return this.headersEnabled || !record.headers().iterator().hasNext() ? SenderRecord.create(record, correlationMetadata) : SenderRecord.create((ProducerRecord)new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(), record.value()), correlationMetadata);
    }

    private static String registerNewClient(String clientId) {
        return clientId + "-" + REGISTRATION_COUNTS_BY_CLIENT_ID.computeIfAbsent(clientId, key -> new AtomicLong()).incrementAndGet();
    }
}

