package poussecafe.pulsar;

import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import poussecafe.messaging.Message;
import poussecafe.pulsar.PulsarMessageSender;
import poussecafe.runtime.OriginalAndMarshaledMessage;

/* loaded from: input_file:poussecafe/pulsar/AsyncPulsarMessageSender.class */
public class AsyncPulsarMessageSender extends PulsarMessageSender {
    private static final MessageAndCompletable STOP = new MessageAndCompletable();
    private Logger logger = LoggerFactory.getLogger(getClass());
    private BlockingQueue<MessageAndCompletable> pendingAcks = new LinkedBlockingQueue();
    private Thread ackReceptionThread = new Thread(this::waitAcks);

    /* loaded from: input_file:poussecafe/pulsar/AsyncPulsarMessageSender$Builder.class */
    public static class Builder implements PulsarMessageSender.Builder {
        private AsyncPulsarMessageSender sender = new AsyncPulsarMessageSender();

        @Override // poussecafe.pulsar.PulsarMessageSender.Builder
        public Builder configuration(PulsarMessagingConfiguration pulsarMessagingConfiguration) {
            this.sender.configuration = pulsarMessagingConfiguration;
            return this;
        }

        @Override // poussecafe.pulsar.PulsarMessageSender.Builder
        public Builder client(PulsarClient pulsarClient) {
            this.sender.client = pulsarClient;
            return this;
        }

        @Override // poussecafe.pulsar.PulsarMessageSender.Builder
        public AsyncPulsarMessageSender build() {
            Objects.requireNonNull(this.sender.configuration);
            Objects.requireNonNull(this.sender.client);
            this.sender.defaultTopicProducer = this.sender.createProducer(this.sender.configuration.defaultPublicationTopic());
            return this.sender;
        }
    }

    /* loaded from: input_file:poussecafe/pulsar/AsyncPulsarMessageSender$MessageAndCompletable.class */
    private static class MessageAndCompletable {
        OriginalAndMarshaledMessage message;
        CompletableFuture<MessageId> completable;

        private MessageAndCompletable() {
        }
    }

    private AsyncPulsarMessageSender() {
        this.ackReceptionThread.setDaemon(true);
        this.ackReceptionThread.setName("Ack reception thread for consumer " + toString());
        this.ackReceptionThread.start();
    }

    private void waitAcks() {
        int i = 0;
        while (true) {
            MessageAndCompletable messageAndCompletable = null;
            try {
                messageAndCompletable = this.pendingAcks.take();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            } catch (ExecutionException e2) {
                this.logger.error("Failed sending message {}", messageAndCompletable.message, e2);
            }
            if (messageAndCompletable == STOP) {
                return;
            }
            this.logger.debug("Got ID {} for message #{}", messageAndCompletable.completable.get(), Integer.valueOf(i));
            i++;
        }
    }

    @Override // poussecafe.pulsar.PulsarMessageSender
    protected ProducerBuilder<String> createProducerBuilder(String str) {
        return super.createProducerBuilder(str).blockIfQueueFull(true);
    }

    protected synchronized void sendMarshalledMessage(OriginalAndMarshaledMessage originalAndMarshaledMessage) {
        Message original = originalAndMarshaledMessage.original();
        CompletableFuture<MessageId> sendAsync = producer(original).sendAsync((String) originalAndMarshaledMessage.marshaled());
        MessageAndCompletable messageAndCompletable = new MessageAndCompletable();
        messageAndCompletable.message = originalAndMarshaledMessage;
        messageAndCompletable.completable = sendAsync;
        this.pendingAcks.add(messageAndCompletable);
    }

    @Override // poussecafe.pulsar.PulsarMessageSender
    public void close() {
        this.pendingAcks.add(STOP);
        try {
            this.ackReceptionThread.join();
        } catch (InterruptedException e) {
            this.logger.error("Thread was interrupted while joining ack reception thread");
            Thread.currentThread().interrupt();
        }
        super.close();
    }
}
