package poussecafe.pulsar;

import java.util.Objects;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import poussecafe.exception.PousseCafeException;
import poussecafe.jackson.JacksonMessageAdapter;
import poussecafe.messaging.MessageReceiver;
import poussecafe.processing.MessageBroker;
import poussecafe.processing.ReceivedMessage;
import poussecafe.runtime.OriginalAndMarshaledMessage;

/* loaded from: input_file:poussecafe/pulsar/PulsarMessageReceiver.class */
public class PulsarMessageReceiver extends MessageReceiver {
    private ConsumerFactory consumerFactory;
    private Consumer<String> consumer;
    private Thread receptionThread;
    private JacksonMessageAdapter messageAdapter;

    /* loaded from: input_file:poussecafe/pulsar/PulsarMessageReceiver$Builder.class */
    public static class Builder {
        private MessageBroker messageBroker;
        private ConsumerFactory consumerFactory;

        public Builder messageBroker(MessageBroker messageBroker) {
            this.messageBroker = messageBroker;
            return this;
        }

        public Builder consumerFactory(ConsumerFactory consumerFactory) {
            this.consumerFactory = consumerFactory;
            return this;
        }

        public PulsarMessageReceiver build() {
            Objects.requireNonNull(this.messageBroker);
            PulsarMessageReceiver pulsarMessageReceiver = new PulsarMessageReceiver(this.messageBroker);
            pulsarMessageReceiver.consumerFactory = this.consumerFactory;
            return pulsarMessageReceiver;
        }
    }

    private PulsarMessageReceiver(MessageBroker messageBroker) {
        super(messageBroker);
        this.messageAdapter = new JacksonMessageAdapter();
    }

    protected void actuallyStartReceiving() {
        this.consumer = this.consumerFactory.buildConsumer();
        startReceptionThread();
    }

    private void startReceptionThread() {
        this.receptionThread = new Thread(receptionRunnable());
        this.receptionThread.setDaemon(true);
        this.receptionThread.start();
    }

    private Runnable receptionRunnable() {
        return () -> {
            while (true) {
                Message<String> message = null;
                try {
                    message = this.consumer.receive();
                    String str = (String) message.getValue();
                    onMessage(new ReceivedMessage.Builder().payload(new OriginalAndMarshaledMessage.Builder().marshaled(str).original(this.messageAdapter.adaptSerializedMessage(str)).build()).acker(ackRunnable(message)).interrupter(this::stopReceiving).build());
                } catch (PulsarClientException e) {
                    this.logger.error("Error while consuming message, closing...", e);
                    return;
                } catch (Exception e2) {
                    this.logger.error("Error while handling message ({}), continuing consumption and acking anyway...", e2.getMessage());
                    this.logger.debug("Handling error stacktrace", e2);
                    if (message != null) {
                        ack(message);
                    }
                }
            }
        };
    }

    private Runnable ackRunnable(Message<String> message) {
        return () -> {
            ack(message);
        };
    }

    private void ack(Message<String> message) {
        try {
            this.consumer.acknowledge(message);
        } catch (PulsarClientException e) {
            throw new PousseCafeException("Unable to ack message", e);
        }
    }

    private void closeIfConnected() {
        if (this.consumer.isConnected()) {
            try {
                this.consumer.close();
            } catch (PulsarClientException e) {
                this.logger.warn("Error while closing consumer", e);
            }
        }
    }

    protected void actuallyStopReceiving() {
        closeIfConnected();
        this.receptionThread.interrupt();
    }

    public void join() {
        try {
            this.receptionThread.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PousseCafeException("Cloud not join reception thread", e);
        }
    }
}
