package edu.iu.dsc.tws.examples.connectors;

import edu.iu.dsc.tws.api.JobConfig;
import edu.iu.dsc.tws.api.Twister2Job;
import edu.iu.dsc.tws.api.checkpointing.Snapshot;
import edu.iu.dsc.tws.api.comms.packing.types.primitive.LongPacker;
import edu.iu.dsc.tws.api.compute.IMessage;
import edu.iu.dsc.tws.api.compute.TaskContext;
import edu.iu.dsc.tws.api.compute.graph.OperationMode;
import edu.iu.dsc.tws.api.compute.nodes.ICompute;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.resource.IPersistentVolume;
import edu.iu.dsc.tws.api.resource.IVolatileVolume;
import edu.iu.dsc.tws.api.resource.IWorker;
import edu.iu.dsc.tws.api.resource.IWorkerController;
import edu.iu.dsc.tws.checkpointing.task.CheckpointableTask;
import edu.iu.dsc.tws.connectors.kafka.KafkaSource;
import edu.iu.dsc.tws.examples.ml.svm.constant.Constants;
import edu.iu.dsc.tws.rsched.core.ResourceAllocator;
import edu.iu.dsc.tws.rsched.job.Twister2Submitter;
import edu.iu.dsc.tws.task.ComputeEnvironment;
import edu.iu.dsc.tws.task.impl.ComputeGraphBuilder;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Properties;
import java.util.Set;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.kafka.clients.consumer.ConsumerRecord;

/* loaded from: input_file:edu/iu/dsc/tws/examples/connectors/KafkaExample.class */
public class KafkaExample implements IWorker {
    private static final Logger LOG = Logger.getLogger(KafkaSource.class.getName());
    private static final String CLI_TOPICS = "topics";
    private static final String CLI_SERVER = "server";

    /* loaded from: input_file:edu/iu/dsc/tws/examples/connectors/KafkaExample$KSink.class */
    public static class KSink implements ICompute, CheckpointableTask {
        private Long sum = 0L;
        private TaskContext context;

        public boolean execute(IMessage iMessage) {
            this.sum = Long.valueOf(this.sum.longValue() + Long.parseLong(iMessage.getContent().toString()));
            KafkaExample.LOG.info("Current sum in " + this.context.taskIndex() + " : " + this.sum);
            return true;
        }

        public void prepare(Config config, TaskContext taskContext) {
            this.context = taskContext;
        }

        public void restoreSnapshot(Snapshot snapshot) {
            this.sum = (Long) snapshot.getOrDefault("sum", 0L);
            KafkaExample.LOG.info("Restoring sum in " + this.context.taskIndex() + " : " + this.sum);
        }

        public void takeSnapshot(Snapshot snapshot) {
            snapshot.setValue("sum", this.sum);
        }

        public void initSnapshot(Snapshot snapshot) {
            snapshot.setPacker("sum", LongPacker.getInstance());
        }
    }

    /* loaded from: input_file:edu/iu/dsc/tws/examples/connectors/KafkaExample$KSource.class */
    public static class KSource extends KafkaSource {
        public Properties getConsumerProperties() {
            Properties properties = new Properties();
            properties.put("bootstrap.servers", this.cfg.getStringValue(KafkaExample.CLI_SERVER));
            properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            return properties;
        }

        public Set<String> getTopics() {
            return (Set) this.cfg.get(KafkaExample.CLI_TOPICS, Set.class);
        }

        public void writeRecord(ConsumerRecord consumerRecord) {
            this.context.write("edge", consumerRecord.value());
        }

        public Duration getPollingTimeout() {
            return Duration.ZERO;
        }
    }

    public static void main(String[] strArr) throws ParseException {
        Options options = new Options();
        options.addOption(CLI_SERVER, true, "Kafka bootstrap server in the format host:port");
        options.addOption(CLI_TOPICS, true, "Set of topics in the format topic1,topic2");
        CommandLine parse = new DefaultParser().parse(options, strArr);
        HashMap hashMap = new HashMap();
        hashMap.put(CLI_SERVER, "localhost:9092");
        hashMap.put(CLI_TOPICS, Collections.singleton("test2"));
        if (parse.hasOption(CLI_SERVER)) {
            hashMap.put("bootstrap.servers", parse.getOptionValue(CLI_SERVER));
        }
        if (parse.hasOption(CLI_TOPICS)) {
            hashMap.put(CLI_TOPICS, (Set) Arrays.stream(parse.getOptionValue(CLI_TOPICS).split(Constants.SimpleGraphConfig.DELIMITER)).map((v0) -> {
                return v0.trim();
            }).collect(Collectors.toSet()));
        }
        Config loadConfig = ResourceAllocator.loadConfig(new HashMap());
        JobConfig jobConfig = new JobConfig();
        jobConfig.putAll(hashMap);
        Twister2Submitter.submitJob(Twister2Job.newBuilder().setJobName(KafkaExample.class.getName()).setWorkerClass(KafkaExample.class).addComputeResource(1.0d, 1024, 1).setConfig(jobConfig).build(), loadConfig);
    }

    public void execute(Config config, int i, IWorkerController iWorkerController, IPersistentVolume iPersistentVolume, IVolatileVolume iVolatileVolume) {
        ComputeEnvironment init = ComputeEnvironment.init(config, i, iWorkerController, iPersistentVolume, iVolatileVolume);
        ComputeGraphBuilder newBuilder = ComputeGraphBuilder.newBuilder(config);
        newBuilder.setMode(OperationMode.STREAMING);
        newBuilder.addSource("ksource", new KSource(), 2);
        newBuilder.addCompute("sink", new KSink(), 2).direct("ksource").viaEdge("edge");
        init.buildAndExecute(newBuilder);
    }
}
