/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.core.reactive;

import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;
import reactor.kafka.sender.TransactionManager;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

public class ReactiveKafkaProducerTemplate<K, V>
implements AutoCloseable,
DisposableBean {
    private final KafkaSender<K, V> sender;
    private final RecordMessageConverter messageConverter;

    public ReactiveKafkaProducerTemplate(SenderOptions<K, V> senderOptions) {
        this(senderOptions, new MessagingMessageConverter());
    }

    public ReactiveKafkaProducerTemplate(SenderOptions<K, V> senderOptions, RecordMessageConverter messageConverter) {
        Assert.notNull(senderOptions, (String)"Sender options can not be null");
        Assert.notNull((Object)messageConverter, (String)"Message converter can not be null");
        this.sender = KafkaSender.create(senderOptions);
        this.messageConverter = messageConverter;
    }

    public <T> Flux<SenderResult<T>> sendTransactionally(Publisher<? extends SenderRecord<K, V, T>> records) {
        Flux sendTransactionally = this.sender.sendTransactionally((Publisher)Flux.just(records));
        return sendTransactionally.flatMap(Function.identity());
    }

    public <T> Mono<SenderResult<T>> sendTransactionally(SenderRecord<K, V, T> record) {
        Flux<SenderResult<T>> sendTransactionally = this.sendTransactionally((Publisher<? extends SenderRecord<K, V, T>>)Mono.just(record));
        return sendTransactionally.single();
    }

    public Mono<SenderResult<Void>> send(String topic, V value) {
        return this.send(new ProducerRecord(topic, value));
    }

    public Mono<SenderResult<Void>> send(String topic, K key, V value) {
        return this.send(new ProducerRecord(topic, key, value));
    }

    public Mono<SenderResult<Void>> send(String topic, int partition, K key, V value) {
        return this.send(new ProducerRecord(topic, Integer.valueOf(partition), key, value));
    }

    public Mono<SenderResult<Void>> send(String topic, int partition, long timestamp, K key, V value) {
        return this.send(new ProducerRecord(topic, Integer.valueOf(partition), Long.valueOf(timestamp), key, value));
    }

    public Mono<SenderResult<Void>> send(String topic, Message<?> message) {
        byte[] correlationId;
        ProducerRecord<?, ?> producerRecord = this.messageConverter.fromMessage(message, topic);
        if (!producerRecord.headers().iterator().hasNext() && (correlationId = (byte[])message.getHeaders().get((Object)"kafka_correlationId", byte[].class)) != null) {
            producerRecord.headers().add("kafka_correlationId", correlationId);
        }
        return this.send(producerRecord);
    }

    public Mono<SenderResult<Void>> send(ProducerRecord<K, V> record) {
        return this.send(SenderRecord.create(record, null));
    }

    public <T> Mono<SenderResult<T>> send(SenderRecord<K, V, T> record) {
        return this.send((Publisher<? extends SenderRecord<K, V, T>>)Mono.just(record)).single();
    }

    public <T> Flux<SenderResult<T>> send(Publisher<? extends SenderRecord<K, V, T>> records) {
        return this.sender.send(records);
    }

    public Mono<Void> flush() {
        return this.doOnProducer(producer -> {
            producer.flush();
            return null;
        });
    }

    public Flux<PartitionInfo> partitionsFromProducerFor(String topic) {
        Mono<List> partitionsInfo = this.doOnProducer(producer -> producer.partitionsFor(topic));
        return partitionsInfo.flatMapIterable(Function.identity());
    }

    public Flux<Tuple2<MetricName, ? extends Metric>> metricsFromProducer() {
        return this.doOnProducer(Producer::metrics).flatMapIterable(Map::entrySet).map(m -> Tuples.of(m.getKey(), m.getValue()));
    }

    public <T> Mono<T> doOnProducer(Function<Producer<K, V>, ? extends T> action) {
        return this.sender.doOnProducer(action);
    }

    public TransactionManager transactionManager() {
        return this.sender.transactionManager();
    }

    public void destroy() {
        this.doClose();
    }

    @Override
    public void close() {
        this.doClose();
    }

    private void doClose() {
        this.sender.close();
    }
}

