package poussecafe.pulsar;

import java.util.Objects;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import poussecafe.exception.PousseCafeException;
import poussecafe.jackson.JacksonMessageAdapter;
import poussecafe.messaging.MessageReceiver;
import poussecafe.runtime.MessageConsumer;

/* loaded from: input_file:poussecafe/pulsar/PulsarMessageReceiver.class */
public class PulsarMessageReceiver extends MessageReceiver {
    private PulsarMessagingConfiguration configuration;
    private Consumer<String> consumer;
    private Thread receptionThread;

    /* loaded from: input_file:poussecafe/pulsar/PulsarMessageReceiver$Builder.class */
    public static class Builder {
        private MessageConsumer messageConsumer;
        private PulsarMessagingConfiguration configuration;

        public Builder messageConsumer(MessageConsumer messageConsumer) {
            this.messageConsumer = messageConsumer;
            return this;
        }

        public Builder configuration(PulsarMessagingConfiguration pulsarMessagingConfiguration) {
            this.configuration = pulsarMessagingConfiguration;
            return this;
        }

        public PulsarMessageReceiver build() {
            Objects.requireNonNull(this.messageConsumer);
            PulsarMessageReceiver pulsarMessageReceiver = new PulsarMessageReceiver(this.messageConsumer);
            pulsarMessageReceiver.configuration = this.configuration;
            return pulsarMessageReceiver;
        }
    }

    private PulsarMessageReceiver(MessageConsumer messageConsumer) {
        super(new JacksonMessageAdapter(), messageConsumer);
    }

    protected void actuallyStartReceiving() {
        try {
            this.consumer = PulsarClient.builder().serviceUrl(this.configuration.brokerUrl()).build().newConsumer(Schema.STRING).topics(this.configuration.topics()).subscriptionType(SubscriptionType.Shared).subscriptionName(this.configuration.subscriptionName()).subscribe();
            startReceptionThread();
        } catch (PulsarClientException e) {
            throw new PousseCafeException("Unable to connect to Pulsar broker", e);
        }
    }

    private void startReceptionThread() {
        this.receptionThread = new Thread(receptionRunnable());
        this.receptionThread.setDaemon(true);
        this.receptionThread.start();
    }

    private Runnable receptionRunnable() {
        return () -> {
            Message message = null;
            while (true) {
                try {
                    try {
                        message = this.consumer.receive();
                        onMessage(message.getValue());
                        if (message != null) {
                            try {
                                this.consumer.acknowledge(message);
                            } catch (PulsarClientException e) {
                                this.logger.error("Unable to acknowledge message");
                            }
                            message = null;
                        }
                    } catch (Exception e2) {
                        this.logger.error("Error while handling message ({}), continuing consumption anyway...", e2.getMessage());
                        this.logger.debug("Handling error stacktrace", e2);
                        if (message != null) {
                            try {
                                this.consumer.acknowledge(message);
                            } catch (PulsarClientException e3) {
                                this.logger.error("Unable to acknowledge message");
                            }
                            message = null;
                        }
                    }
                } catch (Throwable th) {
                    if (message != null) {
                        try {
                            this.consumer.acknowledge(message);
                        } catch (PulsarClientException e4) {
                            this.logger.error("Unable to acknowledge message");
                        }
                    }
                    throw th;
                }
            }
        };
    }

    private void closeIfConnected() {
        if (this.consumer.isConnected()) {
            try {
                this.consumer.close();
            } catch (PulsarClientException e) {
                this.logger.warn("Error while closing consumer", e);
            }
        }
    }

    protected void actuallyStopReceiving() {
        closeIfConnected();
        this.receptionThread.interrupt();
    }
}
