package poussecafe.pulsar;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import poussecafe.exception.PousseCafeException;
import poussecafe.jackson.JacksonMessageAdapter;
import poussecafe.messaging.Message;
import poussecafe.messaging.MessageSender;

/* loaded from: input_file:poussecafe/pulsar/PulsarMessageSender.class */
public abstract class PulsarMessageSender extends MessageSender {
    protected PulsarMessagingConfiguration configuration;
    protected PulsarClient client;
    protected Producer<String> defaultTopicProducer;
    private Map<String, Producer<String>> producers;
    private Logger logger;

    /* loaded from: input_file:poussecafe/pulsar/PulsarMessageSender$Builder.class */
    public interface Builder {
        Builder configuration(PulsarMessagingConfiguration pulsarMessagingConfiguration);

        Builder client(PulsarClient pulsarClient);

        PulsarMessageSender build();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public PulsarMessageSender() {
        super(new JacksonMessageAdapter());
        this.producers = new HashMap();
        this.logger = LoggerFactory.getLogger(getClass());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ProducerBuilder<String> createProducerBuilder(String str) {
        return this.client.newProducer(Schema.STRING).topic(str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Producer<String> createProducer(String str) {
        try {
            return createProducerBuilder(str).create();
        } catch (PulsarClientException e) {
            throw new PousseCafeException("Unable to send message to Pulsar broker", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Producer<String> producer(Message message) {
        Optional<String> chooseTopicForMessage = this.configuration.publicationTopicChooser().chooseTopicForMessage(message);
        return chooseTopicForMessage.isPresent() ? getOrCreateProducer(chooseTopicForMessage.get()) : this.defaultTopicProducer;
    }

    private Producer<String> getOrCreateProducer(String str) {
        Producer<String> producer = this.producers.get(str);
        if (producer == null) {
            producer = createProducer(str);
            this.producers.put(str, producer);
        }
        return producer;
    }

    public void close() {
        this.producers.values().forEach(this::closeAndCatch);
    }

    private void closeAndCatch(Producer<String> producer) {
        try {
            producer.close();
        } catch (PulsarClientException e) {
            this.logger.error("Unable to close producer", e);
        }
    }
}
