package org.redkalex.mq.pulsar;

import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.logging.Level;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Messages;
import org.redkale.mq.MessageAgent;
import org.redkale.mq.MessageConsumer;
import org.redkale.mq.MessageProcessor;
import org.redkale.mq.MessageRecord;
import org.redkalex.mq.pulsar.PulsarMessageAgent;

/* loaded from: input_file:org/redkalex/mq/pulsar/PulsarMessageConsumer.class */
public class PulsarMessageConsumer extends MessageConsumer implements Runnable {
    protected Properties config;
    protected Thread thread;
    protected CompletableFuture<Void> startFuture;
    protected CompletableFuture<Void> closeFuture;
    protected Consumer<MessageRecord> consumer;
    protected boolean reconnecting;
    protected final Object resumeLock;

    public PulsarMessageConsumer(MessageAgent messageAgent, String[] strArr, String str, MessageProcessor messageProcessor, String str2, Properties properties) {
        super(messageAgent, strArr, str, messageProcessor);
        this.resumeLock = new Object();
        this.config = new Properties();
    }

    public void retryConnect() {
        if (this.reconnecting) {
            try {
                this.consumer = ((PulsarMessageAgent) this.messageAgent).client.newConsumer(PulsarMessageAgent.MessageRecordSchema.INSTANCE).topic(this.topics).subscribe();
                synchronized (this.resumeLock) {
                    this.resumeLock.notifyAll();
                }
            } catch (Exception e) {
            }
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            this.consumer = ((PulsarMessageAgent) this.messageAgent).client.newConsumer(PulsarMessageAgent.MessageRecordSchema.INSTANCE).topic(this.topics).subscribe();
            this.startFuture.complete(null);
            if (this.logger.isLoggable(Level.FINE)) {
                this.logger.log(Level.FINE, MessageConsumer.class.getSimpleName() + " [" + Arrays.toString(this.topics) + "] startuped");
            }
            while (!this.closed) {
                try {
                    Messages batchReceive = this.consumer.batchReceive();
                    if (this.reconnecting) {
                        this.reconnecting = false;
                    }
                    if (batchReceive != null && batchReceive.size() >= 1) {
                        this.consumer.acknowledgeAsync(batchReceive).whenComplete((r7, th) -> {
                            if (th != null) {
                                this.logger.log(Level.SEVERE, Arrays.toString(this.topics) + " consumer error: " + batchReceive, th);
                            }
                        });
                        MessageRecord messageRecord = null;
                        try {
                            this.processor.begin(batchReceive.size());
                            Iterator it = batchReceive.iterator();
                            while (it.hasNext()) {
                                messageRecord = (MessageRecord) ((Message) it.next()).getValue();
                                this.processor.process(messageRecord, (Runnable) null);
                            }
                            this.processor.commit();
                        } catch (Throwable th2) {
                            this.logger.log(Level.SEVERE, MessageProcessor.class.getSimpleName() + " process " + messageRecord + " error", th2);
                        }
                    }
                } catch (Exception e) {
                    this.logger.log(Level.WARNING, getClass().getSimpleName() + " poll error", (Throwable) e);
                    this.consumer.close();
                    this.consumer = null;
                    if (!this.closed) {
                        this.reconnecting = true;
                        ((PulsarMessageAgent) this.messageAgent).startReconnect();
                        synchronized (this.resumeLock) {
                            this.resumeLock.wait();
                        }
                    }
                }
            }
            if (this.consumer != null) {
                this.consumer.close();
            }
            if (this.closeFuture != null) {
                this.closeFuture.complete(null);
                if (this.logger.isLoggable(Level.FINE)) {
                    this.logger.log(Level.FINE, MessageConsumer.class.getSimpleName() + " [" + Arrays.toString(this.topics) + "] shutdowned");
                }
            }
        } catch (Throwable th3) {
            if (this.closeFuture != null && !this.closeFuture.isDone()) {
                this.closeFuture.complete(null);
                if (this.logger.isLoggable(Level.FINE)) {
                    this.logger.log(Level.FINE, MessageConsumer.class.getSimpleName() + " [" + Arrays.toString(this.topics) + "] shutdowned");
                }
            }
            this.logger.log(Level.SEVERE, MessageConsumer.class.getSimpleName() + "(" + Arrays.toString(this.topics) + ") occur error", th3);
        }
    }

    public synchronized CompletableFuture<Void> startup() {
        if (this.startFuture != null) {
            return this.startFuture;
        }
        this.thread = new Thread(this);
        this.thread.setName("MQ-" + this.consumerid + "-Thread");
        this.startFuture = new CompletableFuture<>();
        if (this.logger.isLoggable(Level.FINE)) {
            this.logger.log(Level.FINE, MessageConsumer.class.getSimpleName() + " [" + Arrays.toString(this.topics) + "] startuping");
        }
        this.thread.start();
        return this.startFuture;
    }

    public synchronized CompletableFuture<Void> shutdown() {
        if (this.consumer == null) {
            return CompletableFuture.completedFuture(null);
        }
        if (this.closeFuture != null) {
            return this.closeFuture;
        }
        this.closeFuture = new CompletableFuture<>();
        if (this.logger.isLoggable(Level.FINE)) {
            this.logger.log(Level.FINE, MessageConsumer.class.getSimpleName() + " [" + Arrays.toString(this.topics) + "] shutdownling");
        }
        this.closed = true;
        synchronized (this.resumeLock) {
            this.resumeLock.notifyAll();
        }
        return this.closeFuture;
    }
}
