package org.reactivecommons.async.kafka.communications;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.reactivecommons.async.commons.converters.MessageConverter;
import org.reactivecommons.async.kafka.KafkaMessage;
import org.reactivecommons.async.kafka.communications.topology.TopologyCreator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;

/* loaded from: input_file:org/reactivecommons/async/kafka/communications/ReactiveMessageSender.class */
public class ReactiveMessageSender {
    private final ConcurrentHashMap<String, MonoSink<Void>> confirmations = new ConcurrentHashMap<>();
    private final CopyOnWriteArrayList<FluxSink<SenderRecord<String, byte[], String>>> fluxSinks = new CopyOnWriteArrayList<>();
    private final AtomicLong counter = new AtomicLong();
    private final ExecutorService executorServiceConfirm = Executors.newFixedThreadPool(13, runnable -> {
        return new Thread(runnable, "KMessageSender1-" + this.counter.getAndIncrement());
    });
    private final ExecutorService executorServiceEmit = Executors.newFixedThreadPool(13, runnable -> {
        return new Thread(runnable, "KMessageSender2-" + this.counter.getAndIncrement());
    });
    private final int senderCount = 4;
    private final MessageConverter messageConverter;
    private final TopologyCreator topologyCreator;

    public ReactiveMessageSender(KafkaSender<String, byte[]> kafkaSender, MessageConverter messageConverter, TopologyCreator topologyCreator) {
        this.messageConverter = messageConverter;
        this.topologyCreator = topologyCreator;
        for (int i = 0; i < 4; i++) {
            CopyOnWriteArrayList<FluxSink<SenderRecord<String, byte[], String>>> copyOnWriteArrayList = this.fluxSinks;
            Objects.requireNonNull(copyOnWriteArrayList);
            kafkaSender.send(Flux.create((v1) -> {
                r0.add(v1);
            })).doOnNext(this::confirm).subscribe();
        }
    }

    public <V> Mono<Void> send(V v) {
        return Mono.create(monoSink -> {
            SenderRecord<String, byte[], String> createRecord = createRecord(v);
            this.confirmations.put((String) createRecord.key(), monoSink);
            this.executorServiceEmit.submit(() -> {
                return this.fluxSinks.get((int) (System.currentTimeMillis() % 4)).next(createRecord);
            });
        });
    }

    private void confirm(SenderResult<String> senderResult) {
        this.executorServiceConfirm.submit(() -> {
            MonoSink<Void> remove = this.confirmations.remove(senderResult.correlationMetadata());
            if (remove != null) {
                if (senderResult.exception() != null) {
                    remove.error(senderResult.exception());
                } else {
                    remove.success();
                }
            }
        });
    }

    private <V> SenderRecord<String, byte[], String> createRecord(V v) {
        KafkaMessage kafkaMessage = (KafkaMessage) this.messageConverter.toMessage(v);
        return SenderRecord.create(createProducerRecord(kafkaMessage), kafkaMessage.getProperties().getKey());
    }

    private ProducerRecord<String, byte[]> createProducerRecord(KafkaMessage kafkaMessage) {
        this.topologyCreator.checkTopic(kafkaMessage.getProperties().getTopic());
        return new ProducerRecord<>(kafkaMessage.getProperties().getTopic(), (Integer) null, kafkaMessage.getProperties().getKey(), kafkaMessage.getBody(), (List) kafkaMessage.getProperties().getHeaders().entrySet().stream().map(entry -> {
            return new RecordHeader((String) entry.getKey(), entry.getValue().toString().getBytes(StandardCharsets.UTF_8));
        }).collect(Collectors.toList()));
    }
}
