/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.source;

import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.base.Preconditions;
import org.apache.pulsar.functions.source.MultiConsumerPulsarSourceConfig;
import org.apache.pulsar.functions.source.PulsarSourceConsumerConfig;
import org.apache.pulsar.functions.source.PushPulsarSource;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MultiConsumerPulsarSource<T>
extends PushPulsarSource<T>
implements MessageListener<T> {
    private static final Logger log = LoggerFactory.getLogger(MultiConsumerPulsarSource.class);
    private final MultiConsumerPulsarSourceConfig pulsarSourceConfig;
    private final ClassLoader functionClassLoader;
    private final List<Consumer<T>> inputConsumers = new LinkedList<Consumer<T>>();

    public MultiConsumerPulsarSource(PulsarClient pulsarClient, MultiConsumerPulsarSourceConfig pulsarSourceConfig, Map<String, String> properties, ClassLoader functionClassLoader) {
        super(pulsarClient, pulsarSourceConfig, properties, functionClassLoader);
        this.pulsarSourceConfig = pulsarSourceConfig;
        this.functionClassLoader = functionClassLoader;
    }

    @Override
    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
        log.info("Opening pulsar source with config: {}", (Object)this.pulsarSourceConfig);
        Map<String, PulsarSourceConsumerConfig<T>> configs = this.setupConsumerConfigs();
        for (Map.Entry<String, PulsarSourceConsumerConfig<T>> e : configs.entrySet()) {
            String topic = e.getKey();
            PulsarSourceConsumerConfig<T> conf = e.getValue();
            log.info("Creating consumers for topic : {}, schema : {}, schemaInfo: {}", new Object[]{topic, conf.getSchema(), conf.getSchema().getSchemaInfo()});
            ConsumerBuilder cb = this.createConsumeBuilder(topic, conf);
            cb.messageListener(this);
            Consumer consumer = cb.subscribeAsync().join();
            this.inputConsumers.add(consumer);
        }
    }

    @Override
    public void received(Consumer<T> consumer, Message<T> message) {
        this.consume(this.buildRecord(consumer, message));
    }

    @Override
    public void close() throws Exception {
        if (this.inputConsumers != null) {
            this.inputConsumers.forEach(consumer -> {
                try {
                    consumer.close();
                }
                catch (PulsarClientException pulsarClientException) {
                    // empty catch block
                }
            });
        }
    }

    private Map<String, PulsarSourceConsumerConfig<T>> setupConsumerConfigs() throws ClassNotFoundException {
        TreeMap<String, PulsarSourceConsumerConfig<T>> configs = new TreeMap<String, PulsarSourceConsumerConfig<T>>();
        Class typeArg = Reflections.loadClass(this.pulsarSourceConfig.getTypeClassName(), this.functionClassLoader);
        Preconditions.checkArgument(!Void.class.equals((Object)typeArg), "Input type of Pulsar Function cannot be Void");
        this.pulsarSourceConfig.getTopicSchema().forEach((topic, conf) -> configs.put((String)topic, this.buildPulsarSourceConsumerConfig((String)topic, (ConsumerConfig)conf, typeArg)));
        return configs;
    }

    @Override
    public List<Consumer<T>> getInputConsumers() {
        return this.inputConsumers;
    }
}

