/*
 * Decompiled with CFR 0.152.
 */
package zipkin.collector.kafka;

import java.util.Collections;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import zipkin.Span;
import zipkin.SpanDecoder;
import zipkin.collector.Collector;
import zipkin.collector.CollectorMetrics;
import zipkin.storage.Callback;

final class KafkaStreamProcessor
implements Runnable {
    final KafkaStream<byte[], byte[]> stream;
    final Collector collector;
    final CollectorMetrics metrics;

    KafkaStreamProcessor(KafkaStream<byte[], byte[]> stream, Collector collector, CollectorMetrics metrics) {
        this.stream = stream;
        this.collector = collector;
        this.metrics = metrics;
    }

    @Override
    public void run() {
        ConsumerIterator messages = this.stream.iterator();
        while (messages.hasNext()) {
            byte[] bytes = (byte[])messages.next().message();
            this.metrics.incrementMessages();
            if (bytes.length < 2) {
                this.metrics.incrementMessagesDropped();
                continue;
            }
            if (!KafkaStreamProcessor.protobuf3(bytes) && bytes[0] <= 16 && bytes[0] != 12) {
                try {
                    this.metrics.incrementBytes(bytes.length);
                    Span span = SpanDecoder.THRIFT_DECODER.readSpan(bytes);
                    this.collector.accept(Collections.singletonList(span), Callback.NOOP);
                }
                catch (RuntimeException e) {
                    this.metrics.incrementMessagesDropped();
                }
                continue;
            }
            this.collector.acceptSpans(bytes, SpanDecoder.DETECTING_DECODER, Callback.NOOP);
        }
    }

    static boolean protobuf3(byte[] bytes) {
        return bytes[0] == 10 && bytes[1] != 0;
    }
}

