package org.apache.flink.connector.pulsar.source;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.TreeSet;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.pulsar.source.util.PulsarAdminUtils;
import org.apache.flink.util.Preconditions;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.shade.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.class */
public class PulsarSourceBuilder<OUT> {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarSourceBuilder.class);
    private PulsarSubscriber subscriber;
    private MessageDeserializer<OUT> messageDeserializer;
    private SplitSchedulingStrategy splitSchedulingStrategy;
    private StartOffsetInitializer startOffsetInitializer = StartOffsetInitializer.earliest();
    private StopCondition stopCondition = StopCondition.never();
    private Boundedness boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
    private Configuration configuration = new Configuration();
    private ClientConfigurationData clientConfigurationData = new ClientConfigurationData();
    private ConsumerConfigurationData<byte[]> consumerConfigurationData = new ConsumerConfigurationData<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    public PulsarSourceBuilder() {
        this.consumerConfigurationData.setSubscriptionMode(SubscriptionMode.NonDurable);
        this.consumerConfigurationData.setSubscriptionType(SubscriptionType.Exclusive);
        this.consumerConfigurationData.setSubscriptionName("flink-" + UUID.randomUUID());
    }

    public PulsarSourceBuilder<OUT> setTopics(SplitDivisionStrategy splitDivisionStrategy, String... strArr) {
        TreeSet newTreeSet = Sets.newTreeSet();
        Iterator it = ((List) Arrays.stream(strArr).collect(Collectors.toList())).iterator();
        while (it.hasNext()) {
            newTreeSet.add((String) it.next());
        }
        this.consumerConfigurationData.setTopicNames(newTreeSet);
        return setSubscriber(PulsarSubscriber.getTopicListSubscriber(splitDivisionStrategy, strArr));
    }

    public PulsarSourceBuilder<OUT> setTopicPattern(String str, SplitDivisionStrategy splitDivisionStrategy, String... strArr) {
        return setSubscriber(PulsarSubscriber.getTopicPatternSubscriber(str, splitDivisionStrategy, strArr));
    }

    public PulsarSourceBuilder<OUT> setSubscriber(PulsarSubscriber pulsarSubscriber) {
        Preconditions.checkState(pulsarSubscriber != null, "topics or topic pattern subscriber already set");
        this.subscriber = pulsarSubscriber;
        return this;
    }

    public PulsarSourceBuilder<OUT> setTopics(String... strArr) {
        return setTopics(NoSplitDivisionStrategy.NO_SPLIT, strArr);
    }

    public PulsarSourceBuilder<OUT> setTopicPattern(String str, String... strArr) {
        return setTopicPattern(str, NoSplitDivisionStrategy.NO_SPLIT, strArr);
    }

    public PulsarSourceBuilder<OUT> setSplitSchedulingStrategy(SplitSchedulingStrategy splitSchedulingStrategy) {
        this.splitSchedulingStrategy = splitSchedulingStrategy;
        return this;
    }

    public PulsarSourceBuilder<OUT> startAt(StartOffsetInitializer startOffsetInitializer) {
        this.startOffsetInitializer = startOffsetInitializer;
        return this;
    }

    public PulsarSourceBuilder<OUT> stopAt(StopCondition stopCondition) {
        this.boundedness = Boundedness.BOUNDED;
        this.stopCondition = stopCondition;
        return this;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public <T> PulsarSourceBuilder<T> setDeserializer(MessageDeserializer<T> messageDeserializer) {
        this.messageDeserializer = messageDeserializer;
        return this;
    }

    public PulsarSourceBuilder<OUT> configure(Consumer<Configuration> consumer) {
        consumer.accept(this.configuration);
        return this;
    }

    public PulsarSourceBuilder<OUT> configurePulsarClient(Consumer<ClientConfigurationData> consumer) {
        consumer.accept(this.clientConfigurationData);
        return this;
    }

    public PulsarSourceBuilder<OUT> configurePulsarConsumer(Consumer<ConsumerConfigurationData> consumer) {
        consumer.accept(this.consumerConfigurationData);
        return this;
    }

    public PulsarSource<OUT> build() {
        sanityCheck();
        if (this.splitSchedulingStrategy == null) {
            this.splitSchedulingStrategy = new HashSplitSchedulingStrategy();
        }
        return new PulsarSource<>(this.subscriber, this.startOffsetInitializer, this.stopCondition, this.boundedness, this.messageDeserializer, this.configuration, this.clientConfigurationData, this.consumerConfigurationData, this.splitSchedulingStrategy);
    }

    private <T> boolean maybeOverride(ConfigOption<T> configOption, T t, boolean z) {
        boolean z2 = false;
        Object obj = this.configuration.get(configOption);
        if (obj == null) {
            this.configuration.set(configOption, t);
        } else if (z) {
            LOG.warn(String.format("Configuration %s is provided but will be overridden from %s to %s", configOption, obj, t));
            this.configuration.set(configOption, t);
            z2 = true;
        }
        return z2;
    }

    private void sanityCheck() {
        Preconditions.checkNotNull(this.subscriber, "No subscribe mode is specified, should be one of topics or topic pattern.");
        Preconditions.checkNotNull(this.messageDeserializer, "Message deserializer is required but not provided.");
        if (maybeOverride(PulsarSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS, -1L, this.boundedness == Boundedness.BOUNDED)) {
            LOG.warn("{} property is overridden to -1 because the source is bounded.", PulsarSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS);
        }
        String string = this.configuration.getString(PulsarSourceOptions.ADMIN_URL);
        Preconditions.checkNotNull(string, PulsarSourceOptions.ADMIN_URL.key() + " not set.");
        try {
            new ClientBuilderImpl(this.clientConfigurationData).build();
            try {
                PulsarAdminUtils.newAdminFromConf(string, this.clientConfigurationData);
            } catch (PulsarClientException e) {
                throw new IllegalStateException("Cannot initialize pulsar admin", e);
            }
        } catch (PulsarClientException e2) {
            throw new IllegalStateException("Cannot initialize pulsar client", e2);
        }
    }
}
