/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.source;

import java.security.Provider;
import java.security.Security;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.functions.source.PulsarSourceConfig;
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.functions.utils.CryptoUtils;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.Reflections;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarSource<T>
extends PushSource<T>
implements MessageListener<T> {
    private static final Logger log = LoggerFactory.getLogger(PulsarSource.class);
    private final PulsarClient pulsarClient;
    private final PulsarSourceConfig pulsarSourceConfig;
    private final Map<String, String> properties;
    private final ClassLoader functionClassLoader;
    private List<String> inputTopics;
    private List<Consumer<T>> inputConsumers = new LinkedList<Consumer<T>>();
    private final TopicSchema topicSchema;

    public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig pulsarConfig, Map<String, String> properties, ClassLoader functionClassLoader) {
        this.pulsarClient = pulsarClient;
        this.pulsarSourceConfig = pulsarConfig;
        this.topicSchema = new TopicSchema(pulsarClient);
        this.properties = properties;
        this.functionClassLoader = functionClassLoader;
    }

    @Override
    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
        log.info("Opening pulsar source with config: {}", (Object)this.pulsarSourceConfig);
        Map<String, ConsumerConfig<T>> configs = this.setupConsumerConfigs();
        for (Map.Entry<String, ConsumerConfig<T>> e : configs.entrySet()) {
            String topic = e.getKey();
            ConsumerConfig<T> conf = e.getValue();
            log.info("Creating consumers for topic : {}, schema : {}, schemaInfo: {}", new Object[]{topic, conf.getSchema(), conf.getSchema().getSchemaInfo()});
            ConsumerBuilder<T> cb = this.pulsarClient.newConsumer(conf.getSchema()).subscriptionName(this.pulsarSourceConfig.getSubscriptionName()).subscriptionInitialPosition(this.pulsarSourceConfig.getSubscriptionPosition()).subscriptionType(this.pulsarSourceConfig.getSubscriptionType());
            if (conf.getConsumerProperties() != null && !conf.getConsumerProperties().isEmpty()) {
                cb.loadConf(new HashMap<String, Object>(conf.getConsumerProperties()));
            }
            cb.messageListener(this);
            cb = ((ConsumerConfig)conf).isRegexPattern ? cb.topicsPattern(topic) : cb.topics(Collections.singletonList(topic));
            if (conf.getReceiverQueueSize() != null) {
                cb = cb.receiverQueueSize(conf.getReceiverQueueSize());
            }
            if (conf.getCryptoKeyReader() != null) {
                cb = cb.cryptoKeyReader(conf.getCryptoKeyReader());
            }
            if (conf.getConsumerCryptoFailureAction() != null) {
                cb = cb.cryptoFailureAction(conf.getConsumerCryptoFailureAction());
            }
            cb = cb.properties(this.properties);
            if (this.pulsarSourceConfig.getNegativeAckRedeliveryDelayMs() != null && this.pulsarSourceConfig.getNegativeAckRedeliveryDelayMs() > 0L) {
                cb.negativeAckRedeliveryDelay(this.pulsarSourceConfig.getNegativeAckRedeliveryDelayMs(), TimeUnit.MILLISECONDS);
            }
            if (this.pulsarSourceConfig.getTimeoutMs() != null) {
                cb = cb.ackTimeout(this.pulsarSourceConfig.getTimeoutMs(), TimeUnit.MILLISECONDS);
            }
            if (this.pulsarSourceConfig.getMaxMessageRetries() != null && this.pulsarSourceConfig.getMaxMessageRetries() >= 0) {
                DeadLetterPolicy.DeadLetterPolicyBuilder deadLetterPolicyBuilder = DeadLetterPolicy.builder();
                deadLetterPolicyBuilder.maxRedeliverCount(this.pulsarSourceConfig.getMaxMessageRetries());
                if (this.pulsarSourceConfig.getDeadLetterTopic() != null && !this.pulsarSourceConfig.getDeadLetterTopic().isEmpty()) {
                    deadLetterPolicyBuilder.deadLetterTopic(this.pulsarSourceConfig.getDeadLetterTopic());
                }
                cb = cb.deadLetterPolicy(deadLetterPolicyBuilder.build());
            }
            Consumer<T> consumer = cb.subscribeAsync().join();
            this.inputConsumers.add(consumer);
        }
        this.inputTopics = this.inputConsumers.stream().flatMap(c -> c instanceof MultiTopicsConsumerImpl ? ((MultiTopicsConsumerImpl)c).getPartitionedTopics().stream() : Collections.singletonList(c.getTopic()).stream()).collect(Collectors.toList());
    }

    @Override
    public void received(Consumer<T> consumer, Message<T> message) {
        Schema schema = null;
        if (message instanceof MessageImpl) {
            MessageImpl impl = (MessageImpl)message;
            schema = impl.getSchema();
        }
        PulsarRecord record = PulsarRecord.builder().message(message).schema(schema).topicName(message.getTopicName()).ackFunction(() -> {
            if (this.pulsarSourceConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
                consumer.acknowledgeCumulativeAsync(message);
            } else {
                consumer.acknowledgeAsync(message);
            }
        }).failFunction(() -> {
            if (this.pulsarSourceConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
                throw new RuntimeException("Failed to process message: " + message.getMessageId());
            }
            consumer.negativeAcknowledge(message);
        }).build();
        this.consume(record);
    }

    @Override
    public void close() throws Exception {
        if (this.inputConsumers != null) {
            this.inputConsumers.forEach(consumer -> {
                try {
                    consumer.close();
                }
                catch (PulsarClientException pulsarClientException) {
                    // empty catch block
                }
            });
        }
    }

    @VisibleForTesting
    Map<String, ConsumerConfig<T>> setupConsumerConfigs() throws ClassNotFoundException {
        TreeMap<String, ConsumerConfig<T>> configs = new TreeMap<String, ConsumerConfig<T>>();
        Class typeArg = Reflections.loadClass(this.pulsarSourceConfig.getTypeClassName(), this.functionClassLoader);
        Preconditions.checkArgument(!Void.class.equals((Object)typeArg), "Input type of Pulsar Function cannot be Void");
        this.pulsarSourceConfig.getTopicSchema().forEach((topic, conf) -> {
            ConsumerConfig.ConsumerConfigBuilder<?> consumerConfBuilder = ConsumerConfig.builder().isRegexPattern(conf.isRegexPattern()).receiverQueueSize(conf.getReceiverQueueSize()).consumerProperties(conf.getConsumerProperties());
            Schema<?> schema = conf.getSerdeClassName() != null && !conf.getSerdeClassName().isEmpty() ? this.topicSchema.getSchema((String)topic, (Class<?>)typeArg, conf.getSerdeClassName(), true) : this.topicSchema.getSchema((String)topic, (Class<?>)typeArg, (org.apache.pulsar.shade.org.apache.pulsar.common.functions.ConsumerConfig)conf, true);
            consumerConfBuilder.schema(schema);
            if (conf.getCryptoConfig() != null) {
                if (Security.getProvider("BC") == null) {
                    Security.addProvider((Provider)new BouncyCastleProvider());
                }
                consumerConfBuilder.consumerCryptoFailureAction(conf.getCryptoConfig().getConsumerCryptoFailureAction());
                consumerConfBuilder.cryptoKeyReader(CryptoUtils.getCryptoKeyReaderInstance(conf.getCryptoConfig().getCryptoKeyReaderClassName(), conf.getCryptoConfig().getCryptoKeyReaderConfig(), this.functionClassLoader));
            }
            configs.put((String)topic, consumerConfBuilder.build());
        });
        return configs;
    }

    public List<String> getInputTopics() {
        return this.inputTopics;
    }

    public List<Consumer<T>> getInputConsumers() {
        return this.inputConsumers;
    }

    private static class ConsumerConfig<T> {
        private Schema<T> schema;
        private boolean isRegexPattern;
        private Integer receiverQueueSize;
        private Map<String, String> consumerProperties;
        private CryptoKeyReader cryptoKeyReader;
        private ConsumerCryptoFailureAction consumerCryptoFailureAction;

        ConsumerConfig(Schema<T> schema, boolean isRegexPattern, Integer receiverQueueSize, Map<String, String> consumerProperties, CryptoKeyReader cryptoKeyReader, ConsumerCryptoFailureAction consumerCryptoFailureAction) {
            this.schema = schema;
            this.isRegexPattern = isRegexPattern;
            this.receiverQueueSize = receiverQueueSize;
            this.consumerProperties = consumerProperties;
            this.cryptoKeyReader = cryptoKeyReader;
            this.consumerCryptoFailureAction = consumerCryptoFailureAction;
        }

        public static <T> ConsumerConfigBuilder<T> builder() {
            return new ConsumerConfigBuilder();
        }

        public Schema<T> getSchema() {
            return this.schema;
        }

        public boolean isRegexPattern() {
            return this.isRegexPattern;
        }

        public Integer getReceiverQueueSize() {
            return this.receiverQueueSize;
        }

        public Map<String, String> getConsumerProperties() {
            return this.consumerProperties;
        }

        public CryptoKeyReader getCryptoKeyReader() {
            return this.cryptoKeyReader;
        }

        public ConsumerCryptoFailureAction getConsumerCryptoFailureAction() {
            return this.consumerCryptoFailureAction;
        }

        public void setSchema(Schema<T> schema) {
            this.schema = schema;
        }

        public void setRegexPattern(boolean isRegexPattern) {
            this.isRegexPattern = isRegexPattern;
        }

        public void setReceiverQueueSize(Integer receiverQueueSize) {
            this.receiverQueueSize = receiverQueueSize;
        }

        public void setConsumerProperties(Map<String, String> consumerProperties) {
            this.consumerProperties = consumerProperties;
        }

        public void setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
            this.cryptoKeyReader = cryptoKeyReader;
        }

        public void setConsumerCryptoFailureAction(ConsumerCryptoFailureAction consumerCryptoFailureAction) {
            this.consumerCryptoFailureAction = consumerCryptoFailureAction;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof ConsumerConfig)) {
                return false;
            }
            ConsumerConfig other = (ConsumerConfig)o;
            if (!other.canEqual(this)) {
                return false;
            }
            if (this.isRegexPattern() != other.isRegexPattern()) {
                return false;
            }
            Integer this$receiverQueueSize = this.getReceiverQueueSize();
            Integer other$receiverQueueSize = other.getReceiverQueueSize();
            if (this$receiverQueueSize == null ? other$receiverQueueSize != null : !((Object)this$receiverQueueSize).equals(other$receiverQueueSize)) {
                return false;
            }
            Schema<T> this$schema = this.getSchema();
            Schema<T> other$schema = other.getSchema();
            if (this$schema == null ? other$schema != null : !this$schema.equals(other$schema)) {
                return false;
            }
            Map<String, String> this$consumerProperties = this.getConsumerProperties();
            Map<String, String> other$consumerProperties = other.getConsumerProperties();
            if (this$consumerProperties == null ? other$consumerProperties != null : !((Object)this$consumerProperties).equals(other$consumerProperties)) {
                return false;
            }
            CryptoKeyReader this$cryptoKeyReader = this.getCryptoKeyReader();
            CryptoKeyReader other$cryptoKeyReader = other.getCryptoKeyReader();
            if (this$cryptoKeyReader == null ? other$cryptoKeyReader != null : !this$cryptoKeyReader.equals(other$cryptoKeyReader)) {
                return false;
            }
            ConsumerCryptoFailureAction this$consumerCryptoFailureAction = this.getConsumerCryptoFailureAction();
            ConsumerCryptoFailureAction other$consumerCryptoFailureAction = other.getConsumerCryptoFailureAction();
            return !(this$consumerCryptoFailureAction == null ? other$consumerCryptoFailureAction != null : !((Object)((Object)this$consumerCryptoFailureAction)).equals((Object)other$consumerCryptoFailureAction));
        }

        protected boolean canEqual(Object other) {
            return other instanceof ConsumerConfig;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            result = result * 59 + (this.isRegexPattern() ? 79 : 97);
            Integer $receiverQueueSize = this.getReceiverQueueSize();
            result = result * 59 + ($receiverQueueSize == null ? 43 : ((Object)$receiverQueueSize).hashCode());
            Schema<T> $schema = this.getSchema();
            result = result * 59 + ($schema == null ? 43 : $schema.hashCode());
            Map<String, String> $consumerProperties = this.getConsumerProperties();
            result = result * 59 + ($consumerProperties == null ? 43 : ((Object)$consumerProperties).hashCode());
            CryptoKeyReader $cryptoKeyReader = this.getCryptoKeyReader();
            result = result * 59 + ($cryptoKeyReader == null ? 43 : $cryptoKeyReader.hashCode());
            ConsumerCryptoFailureAction $consumerCryptoFailureAction = this.getConsumerCryptoFailureAction();
            result = result * 59 + ($consumerCryptoFailureAction == null ? 43 : ((Object)((Object)$consumerCryptoFailureAction)).hashCode());
            return result;
        }

        public String toString() {
            return "PulsarSource.ConsumerConfig(schema=" + this.getSchema() + ", isRegexPattern=" + this.isRegexPattern() + ", receiverQueueSize=" + this.getReceiverQueueSize() + ", consumerProperties=" + this.getConsumerProperties() + ", cryptoKeyReader=" + this.getCryptoKeyReader() + ", consumerCryptoFailureAction=" + (Object)((Object)this.getConsumerCryptoFailureAction()) + ")";
        }

        public static class ConsumerConfigBuilder<T> {
            private Schema<T> schema;
            private boolean isRegexPattern;
            private Integer receiverQueueSize;
            private Map<String, String> consumerProperties;
            private CryptoKeyReader cryptoKeyReader;
            private ConsumerCryptoFailureAction consumerCryptoFailureAction;

            ConsumerConfigBuilder() {
            }

            public ConsumerConfigBuilder<T> schema(Schema<T> schema) {
                this.schema = schema;
                return this;
            }

            public ConsumerConfigBuilder<T> isRegexPattern(boolean isRegexPattern) {
                this.isRegexPattern = isRegexPattern;
                return this;
            }

            public ConsumerConfigBuilder<T> receiverQueueSize(Integer receiverQueueSize) {
                this.receiverQueueSize = receiverQueueSize;
                return this;
            }

            public ConsumerConfigBuilder<T> consumerProperties(Map<String, String> consumerProperties) {
                this.consumerProperties = consumerProperties;
                return this;
            }

            public ConsumerConfigBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
                this.cryptoKeyReader = cryptoKeyReader;
                return this;
            }

            public ConsumerConfigBuilder<T> consumerCryptoFailureAction(ConsumerCryptoFailureAction consumerCryptoFailureAction) {
                this.consumerCryptoFailureAction = consumerCryptoFailureAction;
                return this;
            }

            public ConsumerConfig<T> build() {
                return new ConsumerConfig<T>(this.schema, this.isRegexPattern, this.receiverQueueSize, this.consumerProperties, this.cryptoKeyReader, this.consumerCryptoFailureAction);
            }

            public String toString() {
                return "PulsarSource.ConsumerConfig.ConsumerConfigBuilder(schema=" + this.schema + ", isRegexPattern=" + this.isRegexPattern + ", receiverQueueSize=" + this.receiverQueueSize + ", consumerProperties=" + this.consumerProperties + ", cryptoKeyReader=" + this.cryptoKeyReader + ", consumerCryptoFailureAction=" + (Object)((Object)this.consumerCryptoFailureAction) + ")";
            }
        }
    }
}

