package org.apache.flink.streaming.connectors.pulsar.internal;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.flink.streaming.connectors.pulsar.util.MessageIdUtils;
import org.apache.flink.streaming.util.serialization.PulsarDeserializationSchema;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/pulsar/internal/ReaderThread.class */
public class ReaderThread<T> extends Thread {
    private static final Logger log = LoggerFactory.getLogger(ReaderThread.class);
    protected final PulsarFetcher<T> owner;
    protected final PulsarTopicState<T> state;
    protected final ClientConfigurationData clientConf;
    protected final Map<String, Object> readerConf;
    protected final int pollTimeoutMs;
    protected final ExceptionProxy exceptionProxy;
    protected final TopicRange topicRange;
    protected MessageId startMessageId;
    protected boolean excludeMessageId;
    private boolean failOnDataLoss;
    private boolean useEarliestWhenDataLoss;
    protected volatile boolean running;
    protected volatile boolean closed;
    protected final PulsarDeserializationSchema<T> deserializer;
    protected volatile Reader<T> reader;

    public ReaderThread(PulsarFetcher<T> pulsarFetcher, PulsarTopicState pulsarTopicState, ClientConfigurationData clientConfigurationData, Map<String, Object> map, PulsarDeserializationSchema<T> pulsarDeserializationSchema, int i, ExceptionProxy exceptionProxy) {
        this.excludeMessageId = false;
        this.failOnDataLoss = true;
        this.useEarliestWhenDataLoss = false;
        this.running = true;
        this.closed = false;
        this.reader = null;
        this.owner = pulsarFetcher;
        this.state = pulsarTopicState;
        this.clientConf = clientConfigurationData;
        this.readerConf = map;
        this.deserializer = pulsarDeserializationSchema;
        this.pollTimeoutMs = i;
        this.exceptionProxy = exceptionProxy;
        this.topicRange = pulsarTopicState.getTopicRange();
        this.startMessageId = pulsarTopicState.getOffset();
    }

    public ReaderThread(PulsarFetcher<T> pulsarFetcher, PulsarTopicState pulsarTopicState, ClientConfigurationData clientConfigurationData, Map<String, Object> map, PulsarDeserializationSchema<T> pulsarDeserializationSchema, int i, ExceptionProxy exceptionProxy, boolean z, boolean z2, boolean z3) {
        this(pulsarFetcher, pulsarTopicState, clientConfigurationData, map, pulsarDeserializationSchema, i, exceptionProxy);
        this.failOnDataLoss = z;
        this.useEarliestWhenDataLoss = z2;
        this.excludeMessageId = z3;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        log.info("Starting to fetch from {} at {}, failOnDataLoss {}", new Object[]{this.topicRange, this.startMessageId, Boolean.valueOf(this.failOnDataLoss)});
        try {
            try {
                handleTooLargeCursor();
                createActualReader();
                log.info("Starting to read {} with reader thread {}", this.topicRange, getName());
                while (this.running) {
                    Message<T> readNext = this.reader.readNext(this.pollTimeoutMs, TimeUnit.MILLISECONDS);
                    if (readNext != null) {
                        emitRecord(readNext);
                    }
                }
                if (this.reader != null) {
                    try {
                        close();
                    } catch (Throwable th) {
                        log.error("Error while closing Pulsar reader " + th.toString());
                    }
                }
            } catch (Throwable th2) {
                this.exceptionProxy.reportError(th2);
                if (this.reader != null) {
                    try {
                        close();
                    } catch (Throwable th3) {
                        log.error("Error while closing Pulsar reader " + th3.toString());
                    }
                }
            }
        } catch (Throwable th4) {
            if (this.reader != null) {
                try {
                    close();
                } catch (Throwable th5) {
                    log.error("Error while closing Pulsar reader " + th5.toString());
                }
            }
            throw th4;
        }
    }

    protected void createActualReader() throws PulsarClientException {
        ReaderBuilder<T> loadConf = CachedPulsarClient.getOrCreate(this.clientConf).newReader(this.deserializer.getSchema()).topic(this.topicRange.getTopic()).startMessageId(this.startMessageId).loadConf(this.readerConf);
        log.info("Create a reader at topic {} starting from message {} (inclusive) : config = {}", new Object[]{this.topicRange, this.startMessageId, this.readerConf});
        if (!this.excludeMessageId) {
            loadConf.startMessageIdInclusive();
        }
        if (!this.topicRange.isFullRange()) {
            loadConf.keyHashRange(this.topicRange.getPulsarRange());
        }
        this.reader = loadConf.create();
    }

    protected void handleTooLargeCursor() {
        if (this.startMessageId.equals(MessageId.earliest) || this.startMessageId.equals(MessageId.latest) || ((MessageIdImpl) this.startMessageId).getEntryId() == -1) {
            return;
        }
        MessageId lastMessageId = this.owner.getMetaDataReader().getLastMessageId(this.topicRange.getTopic());
        if (MessageIdUtils.prev(this.startMessageId).compareTo(lastMessageId) <= 0) {
            return;
        }
        if (this.failOnDataLoss) {
            log.error("the start message id is beyond the last commit message id, with topic:{}", this.topicRange);
            throw new RuntimeException("start message id beyond the last commit");
        }
        if (this.useEarliestWhenDataLoss) {
            log.info("reset message to earliest");
            this.startMessageId = MessageId.earliest;
        } else {
            log.info("reset message to valid offset {}", lastMessageId);
            this.startMessageId = lastMessageId;
        }
    }

    protected void emitRecord(Message<T> message) throws IOException {
        MessageId messageId = message.getMessageId();
        T deserialize2 = this.deserializer.deserialize2(message);
        if (this.deserializer.isEndOfStream(deserialize2)) {
            this.running = false;
        } else {
            this.owner.emitRecordsWithTimestamps(deserialize2, this.state, messageId, message.getEventTime());
        }
    }

    public void cancel() throws IOException {
        this.running = false;
        if (this.reader != null) {
            try {
                close();
            } catch (IOException e) {
                log.error("failed to close reader. ", e);
            }
        }
        interrupt();
    }

    public void close() throws IOException {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.reader.close();
        log.info("Reader closed");
    }

    public boolean isRunning() {
        return this.running;
    }

    private void reportDataLoss(String str) {
        this.running = false;
        this.exceptionProxy.reportError(new IllegalStateException(str + PulsarOptions.INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE));
    }

    public static boolean messageIdRoughEquals(MessageId messageId, MessageId messageId2) {
        if (messageId == null || messageId2 == null) {
            return false;
        }
        if ((messageId instanceof BatchMessageIdImpl) && (messageId2 instanceof BatchMessageIdImpl)) {
            return messageId.equals(messageId2);
        }
        if ((messageId instanceof MessageIdImpl) && (messageId2 instanceof BatchMessageIdImpl)) {
            BatchMessageIdImpl batchMessageIdImpl = (BatchMessageIdImpl) messageId2;
            return messageId.equals(new MessageIdImpl(batchMessageIdImpl.getLedgerId(), batchMessageIdImpl.getEntryId(), batchMessageIdImpl.getPartitionIndex()));
        }
        if ((messageId2 instanceof MessageIdImpl) && (messageId instanceof BatchMessageIdImpl)) {
            BatchMessageIdImpl batchMessageIdImpl2 = (BatchMessageIdImpl) messageId;
            return messageId2.equals(new MessageIdImpl(batchMessageIdImpl2.getLedgerId(), batchMessageIdImpl2.getEntryId(), batchMessageIdImpl2.getPartitionIndex()));
        }
        if ((messageId instanceof MessageIdImpl) && (messageId2 instanceof MessageIdImpl)) {
            return messageId.equals(messageId2);
        }
        throw new IllegalStateException(String.format("comparing messageIds of type %s, %s", messageId.getClass().toString(), messageId2.getClass().toString()));
    }
}
