/*
 * Decompiled with CFR 0.152.
 */
package zipkin.collector.kafka10;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin.Codec;
import zipkin.collector.Collector;
import zipkin.collector.CollectorMetrics;
import zipkin.collector.kafka10.KafkaCollector;
import zipkin.storage.Callback;

final class KafkaCollectorWorker
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaCollectorWorker.class);
    final Consumer<byte[], byte[]> kafkaConsumer;
    final Collector collector;
    final CollectorMetrics metrics;
    final AtomicReference<List<TopicPartition>> assignedPartitions = new AtomicReference(Collections.emptyList());

    KafkaCollectorWorker(KafkaCollector.Builder builder) {
        this.kafkaConsumer = new KafkaConsumer(builder.properties);
        List<String> topics = Arrays.asList(builder.topic.split(","));
        this.kafkaConsumer.subscribe(topics, new ConsumerRebalanceListener(){

            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                KafkaCollectorWorker.this.assignedPartitions.set(Collections.emptyList());
            }

            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                KafkaCollectorWorker.this.assignedPartitions.set(Collections.unmodifiableList(new ArrayList<TopicPartition>(partitions)));
            }
        });
        this.collector = builder.delegate.build();
        this.metrics = builder.metrics;
    }

    @Override
    public void run() {
        try {
            LOG.info("Kafka consumer starting polling loop.");
            block2: while (true) {
                ConsumerRecords consumerRecords = this.kafkaConsumer.poll(1000L);
                LOG.debug("Kafka polling returned batch of {} messages.", (Object)consumerRecords.count());
                Iterator iterator = consumerRecords.iterator();
                while (true) {
                    if (!iterator.hasNext()) continue block2;
                    ConsumerRecord record = (ConsumerRecord)iterator.next();
                    this.metrics.incrementMessages();
                    byte[] bytes = (byte[])record.value();
                    if (bytes.length == 0) {
                        this.metrics.incrementMessagesDropped();
                        continue;
                    }
                    if (bytes[0] == 91) {
                        this.collector.acceptSpans(bytes, (Codec)Codec.JSON, Callback.NOOP);
                        continue;
                    }
                    if (bytes[0] == 12) {
                        this.collector.acceptSpans(bytes, (Codec)Codec.THRIFT, Callback.NOOP);
                        continue;
                    }
                    this.collector.acceptSpans(Collections.singletonList(bytes), (Codec)Codec.THRIFT, Callback.NOOP);
                }
                break;
            }
        }
        catch (Throwable throwable) {
            LOG.info("Kafka consumer polling loop stopped.");
            LOG.info("Closing Kafka consumer...");
            this.kafkaConsumer.close();
            LOG.info("Kafka consumer closed.");
            throw throwable;
        }
    }
}

