package poussecafe.pulsar;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import poussecafe.exception.PousseCafeException;
import poussecafe.messaging.Message;
import poussecafe.pulsar.PulsarMessageSender;
import poussecafe.runtime.OriginalAndMarshaledMessage;

/* loaded from: input_file:poussecafe/pulsar/SyncPulsarMessageSender.class */
public class SyncPulsarMessageSender extends PulsarMessageSender {

    /* loaded from: input_file:poussecafe/pulsar/SyncPulsarMessageSender$Builder.class */
    public static class Builder implements PulsarMessageSender.Builder {
        private SyncPulsarMessageSender sender = new SyncPulsarMessageSender();

        @Override // poussecafe.pulsar.PulsarMessageSender.Builder
        public Builder configuration(PulsarMessagingConfiguration pulsarMessagingConfiguration) {
            this.sender.configuration = pulsarMessagingConfiguration;
            return this;
        }

        @Override // poussecafe.pulsar.PulsarMessageSender.Builder
        public Builder client(PulsarClient pulsarClient) {
            this.sender.client = pulsarClient;
            return this;
        }

        @Override // poussecafe.pulsar.PulsarMessageSender.Builder
        public SyncPulsarMessageSender build() {
            Objects.requireNonNull(this.sender.configuration);
            Objects.requireNonNull(this.sender.client);
            this.sender.defaultTopicProducer = this.sender.createProducer(this.sender.configuration.defaultPublicationTopic());
            return this.sender;
        }
    }

    private SyncPulsarMessageSender() {
    }

    protected synchronized void sendMarshalledMessage(OriginalAndMarshaledMessage originalAndMarshaledMessage) {
        Message original = originalAndMarshaledMessage.original();
        try {
            producer(original).send((String) originalAndMarshaledMessage.marshaled());
        } catch (PulsarClientException e) {
            throw new PousseCafeException("Unable to send message", e);
        }
    }

    public void sendMarshalledMessages(List<OriginalAndMarshaledMessage> list) {
        Iterator<Map.Entry<Producer<String>, List<OriginalAndMarshaledMessage>>> it = messagesPerProducer(list).entrySet().iterator();
        while (it.hasNext()) {
            sendBulk(it.next());
        }
    }

    private Map<Producer<String>, List<OriginalAndMarshaledMessage>> messagesPerProducer(List<OriginalAndMarshaledMessage> list) {
        HashMap hashMap = new HashMap();
        for (OriginalAndMarshaledMessage originalAndMarshaledMessage : list) {
            ((List) hashMap.computeIfAbsent(producer(originalAndMarshaledMessage.original()), producer -> {
                return new ArrayList();
            })).add(originalAndMarshaledMessage);
        }
        return hashMap;
    }

    private void sendBulk(Map.Entry<Producer<String>, List<OriginalAndMarshaledMessage>> entry) {
        Producer<String> key = entry.getKey();
        Iterator<OriginalAndMarshaledMessage> it = entry.getValue().iterator();
        while (it.hasNext()) {
            key.sendAsync((String) it.next().marshaled());
        }
        try {
            key.flush();
        } catch (PulsarClientException e) {
            throw new PousseCafeException("Unable to send messages to Pulsar broker", e);
        }
    }
}
