package spark.streaming.dstream;

import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer$;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.consumer.ZookeeperConsumerConnector;
import kafka.serializer.StringDecoder;
import scala.ScalaObject;
import scala.collection.Map;
import scala.reflect.Manifest$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import spark.storage.StorageLevel;
import spark.streaming.dstream.NetworkReceiver;

/* compiled from: KafkaInputDStream.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005ma!B\u0001\u0003\u0001\u0011A!!D&bM.\f'+Z2fSZ,'O\u0003\u0002\u0004\t\u00059Am\u001d;sK\u0006l'BA\u0003\u0007\u0003%\u0019HO]3b[&twMC\u0001\b\u0003\u0015\u0019\b/\u0019:l'\r\u0001\u0011b\u0005\t\u0004\u0015-iQ\"\u0001\u0002\n\u00051\u0011!a\u0004(fi^|'o\u001b*fG\u0016Lg/\u001a:\u0011\u00059\tR\"A\b\u000b\u0003A\tQa]2bY\u0006L!AE\b\u0003\u0007\u0005s\u0017\u0010\u0005\u0002\u000f)%\u0011Qc\u0004\u0002\f'\u000e\fG.Y(cU\u0016\u001cG\u000f\u0003\u0005\u0018\u0001\t\u0005\t\u0015!\u0003\u001a\u0003!Q8.U;peVl7\u0001\u0001\t\u00035uq!AD\u000e\n\u0005qy\u0011A\u0002)sK\u0012,g-\u0003\u0002\u001f?\t11\u000b\u001e:j]\u001eT!\u0001H\b\t\u0011\u0005\u0002!\u0011!Q\u0001\ne\tqa\u001a:pkBLE\r\u0003\u0005$\u0001\t\u0005\t\u0015!\u0003%\u0003\u0019!x\u000e]5dgB!Q\u0005K\r+\u001b\u00051#BA\u0014\u0010\u0003)\u0019w\u000e\u001c7fGRLwN\\\u0005\u0003S\u0019\u00121!T1q!\tq1&\u0003\u0002-\u001f\t\u0019\u0011J\u001c;\t\u00119\u0002!\u0011!Q\u0001\n=\na\"\u001b8ji&\fGn\u00144gg\u0016$8\u000f\u0005\u0003&QA\u001a\u0004C\u0001\u00062\u0013\t\u0011$AA\tLC\u001a\\\u0017\rU1si&$\u0018n\u001c8LKf\u0004\"A\u0004\u001b\n\u0005Uz!\u0001\u0002'p]\u001eD\u0001b\u000e\u0001\u0003\u0002\u0003\u0006I\u0001O\u0001\rgR|'/Y4f\u0019\u00164X\r\u001c\t\u0003sqj\u0011A\u000f\u0006\u0003w\u0019\tqa\u001d;pe\u0006<W-\u0003\u0002>u\ta1\u000b^8sC\u001e,G*\u001a<fY\")q\b\u0001C\u0001\u0001\u00061A(\u001b8jiz\"b!\u0011\"D\t\u00163\u0005C\u0001\u0006\u0001\u0011\u00159b\b1\u0001\u001a\u0011\u0015\tc\b1\u0001\u001a\u0011\u0015\u0019c\b1\u0001%\u0011\u0015qc\b1\u00010\u0011\u00159d\b1\u00019\u0011\u001dA\u0005A1A\u0005\u0002%\u000b!BW&`)&kUiT+U+\u0005Q\u0003BB&\u0001A\u0003%!&A\u0006[\u0017~#\u0016*T#P+R\u0003\u0003\u0002C'\u0001\u0011\u000b\u0007I\u0011\u0003(\u0002\u001d\tdwnY6HK:,'/\u0019;peV\tq\n\u0005\u0002Q#6\t\u0001!\u0003\u0002S\u0017\tq!\t\\8dW\u001e+g.\u001a:bi>\u0014\b\u0002\u0003+\u0001\u0011\u0003\u0005\u000b\u0015B(\u0002\u001f\tdwnY6HK:,'/\u0019;pe\u0002BqA\u0016\u0001A\u0002\u0013\u0005q+A\td_:\u001cX/\\3s\u0007>tg.Z2u_J,\u0012\u0001\u0017\t\u00033zk\u0011A\u0017\u0006\u00037r\u000b\u0001bY8ogVlWM\u001d\u0006\u0002;\u0006)1.\u00194lC&\u0011qL\u0017\u0002\u001b5>|7.Z3qKJ\u001cuN\\:v[\u0016\u00148i\u001c8oK\u000e$xN\u001d\u0005\bC\u0002\u0001\r\u0011\"\u0001c\u0003U\u0019wN\\:v[\u0016\u00148i\u001c8oK\u000e$xN]0%KF$\"a\u00194\u0011\u00059!\u0017BA3\u0010\u0005\u0011)f.\u001b;\t\u000f\u001d\u0004\u0017\u0011!a\u00011\u0006\u0019\u0001\u0010J\u0019\t\r%\u0004\u0001\u0015)\u0003Y\u0003I\u0019wN\\:v[\u0016\u00148i\u001c8oK\u000e$xN\u001d\u0011\t\u000b-\u0004A\u0011\u00017\u0002\r=t7\u000b^8q)\u0005\u0019\u0007\"\u00028\u0001\t\u0003a\u0017aB8o'R\f'\u000f\u001e\u0005\u0006a\u0002!I!]\u0001\u000bg\u0016$xJ\u001a4tKR\u001cHCA2s\u0011\u0015\u0019x\u000e1\u00010\u0003\u001dygMZ:fiN4A!\u001e\u0001\u0005m\nqQ*Z:tC\u001e,\u0007*\u00198eY\u0016\u00148\u0003\u0002;x\u007fN\u0001\"\u0001_?\u000e\u0003eT!A_>\u0002\t1\fgn\u001a\u0006\u0002y\u0006!!.\u0019<b\u0013\tq\u0018P\u0001\u0004PE*,7\r\u001e\t\u0004q\u0006\u0005\u0011bAA\u0002s\nA!+\u001e8oC\ndW\r\u0003\u0006\u0002\bQ\u0014\t\u0011)A\u0005\u0003\u0013\taa\u001d;sK\u0006l\u0007\u0003B-\u0002\feI1!!\u0004[\u0005-Y\u0015MZ6b'R\u0014X-Y7\t\r}\"H\u0011AA\t)\u0011\t\u0019\"!\u0006\u0011\u0005A#\b\u0002CA\u0004\u0003\u001f\u0001\r!!\u0003\t\r\u0005eA\u000f\"\u0001m\u0003\r\u0011XO\u001c")
/* loaded from: input_file:spark/streaming/dstream/KafkaReceiver.class */
public class KafkaReceiver extends NetworkReceiver<Object> implements ScalaObject {
    public final String spark$streaming$dstream$KafkaReceiver$$zkQuorum;
    public final String spark$streaming$dstream$KafkaReceiver$$groupId;
    private final Map<String, Object> topics;
    public final Map<KafkaPartitionKey, Object> spark$streaming$dstream$KafkaReceiver$$initialOffsets;
    private final StorageLevel storageLevel;
    private final int ZK_TIMEOUT;
    private NetworkReceiver<Object>.BlockGenerator blockGenerator;
    private ZookeeperConsumerConnector consumerConnector;

    /* compiled from: KafkaInputDStream.scala */
    /* loaded from: input_file:spark/streaming/dstream/KafkaReceiver$MessageHandler.class */
    public class MessageHandler implements Runnable, ScalaObject {
        private final KafkaStream<String> stream;
        public final KafkaReceiver $outer;

        @Override // java.lang.Runnable
        public void run() {
            spark$streaming$dstream$KafkaReceiver$MessageHandler$$$outer().logInfo(new KafkaReceiver$MessageHandler$$anonfun$run$1(this));
            this.stream.takeWhile(new KafkaReceiver$MessageHandler$$anonfun$run$2(this));
        }

        public KafkaReceiver spark$streaming$dstream$KafkaReceiver$MessageHandler$$$outer() {
            return this.$outer;
        }

        public MessageHandler(KafkaReceiver kafkaReceiver, KafkaStream<String> kafkaStream) {
            this.stream = kafkaStream;
            if (kafkaReceiver == null) {
                throw new NullPointerException();
            }
            this.$outer = kafkaReceiver;
        }
    }

    public int ZK_TIMEOUT() {
        return this.ZK_TIMEOUT;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v10 */
    /* JADX WARN: Type inference failed for: r0v5 */
    /* JADX WARN: Type inference failed for: r0v6, types: [java.lang.Throwable] */
    public NetworkReceiver<Object>.BlockGenerator blockGenerator() {
        if ((this.bitmap$0 & 8) == 0) {
            ?? r0 = this;
            synchronized (r0) {
                if ((this.bitmap$0 & 8) == 0) {
                    this.blockGenerator = new NetworkReceiver.BlockGenerator(this, this.storageLevel);
                    this.bitmap$0 = this.bitmap$0 | 8;
                }
                r0 = this;
                this.storageLevel = null;
            }
        }
        return this.blockGenerator;
    }

    public ZookeeperConsumerConnector consumerConnector() {
        return this.consumerConnector;
    }

    public void consumerConnector_$eq(ZookeeperConsumerConnector zookeeperConsumerConnector) {
        this.consumerConnector = zookeeperConsumerConnector;
    }

    @Override // spark.streaming.dstream.NetworkReceiver
    public void onStop() {
        blockGenerator().stop();
    }

    @Override // spark.streaming.dstream.NetworkReceiver
    public void onStart() {
        blockGenerator().start();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(BoxesRunTime.unboxToInt(this.topics.values().reduce(new KafkaReceiver$$anonfun$1(this))));
        logInfo(new KafkaReceiver$$anonfun$onStart$1(this));
        logInfo(new KafkaReceiver$$anonfun$onStart$2(this));
        Properties properties = new Properties();
        properties.put("zk.connect", this.spark$streaming$dstream$KafkaReceiver$$zkQuorum);
        properties.put("zk.connectiontimeout.ms", BoxesRunTime.boxToInteger(ZK_TIMEOUT()).toString());
        properties.put("groupid", this.spark$streaming$dstream$KafkaReceiver$$groupId);
        logInfo(new KafkaReceiver$$anonfun$onStart$3(this));
        consumerConnector_$eq((ZookeeperConsumerConnector) Consumer$.MODULE$.create(new ConsumerConfig(properties)));
        logInfo(new KafkaReceiver$$anonfun$onStart$4(this));
        setOffsets(this.spark$streaming$dstream$KafkaReceiver$$initialOffsets);
        consumerConnector().createMessageStreams(this.topics, new StringDecoder()).values().foreach(new KafkaReceiver$$anonfun$onStart$5(this, newFixedThreadPool));
    }

    private void setOffsets(Map<KafkaPartitionKey, Object> map) {
        map.foreach(new KafkaReceiver$$anonfun$setOffsets$1(this));
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public KafkaReceiver(String str, String str2, Map<String, Object> map, Map<KafkaPartitionKey, Object> map2, StorageLevel storageLevel) {
        super(Manifest$.MODULE$.Any());
        this.spark$streaming$dstream$KafkaReceiver$$zkQuorum = str;
        this.spark$streaming$dstream$KafkaReceiver$$groupId = str2;
        this.topics = map;
        this.spark$streaming$dstream$KafkaReceiver$$initialOffsets = map2;
        this.storageLevel = storageLevel;
        this.ZK_TIMEOUT = 10000;
        this.consumerConnector = null;
    }
}
