/*
 * Decompiled with CFR 0.152.
 */
package com.github.harbby.spark.sql.kafka;

import com.github.harbby.spark.sql.kafka.model.KafkaPartitionOffset;
import java.io.Closeable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Queue;
import kafka.common.TopicAndPartition;
import org.apache.spark.streaming.kafka.KafkaCluster;
import org.apache.spark.streaming.kafka.OffsetRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spark_project.guava.base.Preconditions;
import org.spark_project.jetty.util.ConcurrentArrayQueue;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import scala.collection.immutable.Map$;
import scala.collection.mutable.Map;

public class KafkaOffsetCommitter
extends Thread
implements Closeable {
    private static final Logger logger = LoggerFactory.getLogger(KafkaOffsetCommitter.class);
    private final KafkaCluster kafkaCluster;
    private final String groupId;
    private volatile boolean running = false;
    private final int commitInterval;
    private final Queue<KafkaPartitionOffset> commitQueue = new ConcurrentArrayQueue(1024);

    public KafkaOffsetCommitter(KafkaCluster kafkaCluster, String groupId, int commitInterval) {
        Preconditions.checkArgument((commitInterval >= 5000 ? 1 : 0) != 0, (Object)"commitInterval must >= 5000");
        this.commitInterval = commitInterval;
        this.kafkaCluster = kafkaCluster;
        this.groupId = groupId;
    }

    @Override
    public synchronized void start() {
        this.setDaemon(true);
        super.start();
        this.running = true;
    }

    public void addAll(OffsetRange[] offsetRanges) {
        if (this.running) {
            for (OffsetRange offsetRange : offsetRanges) {
                KafkaPartitionOffset kafkaPartitionOffset = new KafkaPartitionOffset(offsetRange.topicAndPartition(), offsetRange.untilOffset());
                this.commitQueue.offer(kafkaPartitionOffset);
            }
        }
    }

    public void addAll(KafkaPartitionOffset[] partitionOffsets) {
        if (this.running) {
            this.commitQueue.addAll(Arrays.asList(partitionOffsets));
        }
    }

    @Override
    public void close() {
        this.running = false;
    }

    @Override
    public void run() {
        while (this.running) {
            try {
                Thread.sleep(this.commitInterval);
                this.commitAll();
            }
            catch (Throwable t) {
                logger.error("The offset committer encountered an error: {}", (Object)t.getMessage(), (Object)t);
            }
        }
        this.running = false;
    }

    private void commitAll() throws Exception {
        HashMap<TopicAndPartition, Long> m = new HashMap<TopicAndPartition, Long>();
        KafkaPartitionOffset osr = this.commitQueue.poll();
        while (null != osr) {
            TopicAndPartition tp = osr.getTopicPartition();
            Long x = (Long)m.get(tp);
            long offset = null == x ? osr.getOffset() : Math.max(x, osr.getOffset());
            m.put(tp, offset);
            osr = this.commitQueue.poll();
        }
        if (!m.isEmpty()) {
            this.commitKafkaOffsets(m);
        }
    }

    private void commitKafkaOffsets(java.util.Map<TopicAndPartition, Long> internalOffsets) throws Exception {
        logger.info("committing offset to kafka, {}", internalOffsets);
        Seq fromOffsetsAsJava = ((Map)JavaConverters.mapAsScalaMapConverter(internalOffsets).asScala()).toSeq();
        this.kafkaCluster.setConsumerOffsets(this.groupId, (scala.collection.immutable.Map)Map$.MODULE$.apply(fromOffsetsAsJava));
    }
}

