package org.apache.flink.connector.pulsar.source.offset;

import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.pulsar.source.AbstractPartition;
import org.apache.flink.connector.pulsar.source.StartOffsetInitializer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;

@Internal
/* loaded from: input_file:org/apache/flink/connector/pulsar/source/offset/SpecifiedStartOffsetInitializer.class */
public class SpecifiedStartOffsetInitializer implements StartOffsetInitializer {
    private static final long serialVersionUID = 1649702397250402877L;
    private final Map<AbstractPartition, MessageId> initialOffsets;
    private final MessageId defaultOffset;
    private final boolean inclusive;

    public SpecifiedStartOffsetInitializer(Map<AbstractPartition, MessageId> map, MessageId messageId, boolean z) {
        this.initialOffsets = Collections.unmodifiableMap(map);
        this.defaultOffset = messageId;
        this.inclusive = z;
    }

    @Override // org.apache.flink.connector.pulsar.source.StartOffsetInitializer
    public void initializeBeforeCreation(AbstractPartition abstractPartition, StartOffsetInitializer.CreationConfiguration creationConfiguration) {
        creationConfiguration.getConsumerConfigurationData().setResetIncludeHead(this.inclusive);
        creationConfiguration.setInitialMessageId(this.initialOffsets.getOrDefault(abstractPartition, this.defaultOffset));
    }

    @Override // org.apache.flink.connector.pulsar.source.StartOffsetInitializer
    public Optional<String> verifyOffset(AbstractPartition abstractPartition, Supplier<Optional<MessageId>> supplier, Supplier<Optional<Message<byte[]>>> supplier2) {
        MessageIdImpl messageIdImpl = (MessageId) this.initialOffsets.getOrDefault(abstractPartition, this.defaultOffset);
        if (messageIdImpl.equals(MessageId.earliest) || messageIdImpl.equals(MessageId.latest)) {
            return Optional.empty();
        }
        Optional<MessageId> optional = supplier.get();
        if (!optional.isPresent()) {
            return Optional.of(String.format("Cannot initialize to offset %s because topic is empty", messageIdImpl));
        }
        if (messageIdImpl.compareTo(optional.get()) > 0) {
            return Optional.of(String.format("The initial offset %s is beyond the last commit message id", messageIdImpl));
        }
        Optional<Message<byte[]>> optional2 = supplier2.get();
        if (this.inclusive) {
            if (!optional2.isPresent()) {
                return Optional.of(String.format("No data found at offset %s", messageIdImpl));
            }
            if (!optional2.get().getMessageId().equals(messageIdImpl)) {
                return Optional.of(String.format("Unexpected offset %s, but expected %s", optional2.get().getMessageId(), messageIdImpl));
            }
        } else if (optional2.isPresent()) {
            MessageIdImpl messageIdImpl2 = messageIdImpl;
            MessageIdImpl messageIdImpl3 = new MessageIdImpl(messageIdImpl2.getLedgerId(), messageIdImpl2.getEntryId() + 1, messageIdImpl2.getPartitionIndex());
            if (!optional2.get().getMessageId().equals(messageIdImpl3)) {
                return Optional.of(String.format("Unexpected offset %s, but expected %s", optional2.get().getMessageId(), messageIdImpl3));
            }
        }
        return Optional.empty();
    }
}
