/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.connector.kafka.source;

import com.netflix.spectator.api.Registry;
import com.netflix.spectator.impl.Preconditions;
import io.mantisrx.connector.kafka.source.KafkaConsumerRebalanceListener;
import io.mantisrx.connector.kafka.source.MantisKafkaSourceConfig;
import io.mantisrx.connector.kafka.source.TopicPartitionStateManager;
import io.mantisrx.connector.kafka.source.assignor.StaticPartitionAssignor;
import io.mantisrx.connector.kafka.source.assignor.StaticPartitionAssignorImpl;
import io.mantisrx.connector.kafka.source.checkpoint.strategy.CheckpointStrategy;
import io.mantisrx.connector.kafka.source.checkpoint.strategy.CheckpointStrategyFactory;
import io.mantisrx.connector.kafka.source.checkpoint.trigger.CheckpointTrigger;
import io.mantisrx.connector.kafka.source.checkpoint.trigger.CheckpointTriggerFactory;
import io.mantisrx.connector.kafka.source.metrics.ConsumerMetrics;
import io.mantisrx.runtime.Context;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscription;

public class MantisKafkaConsumer<S> {
    private static final Logger LOGGER = LoggerFactory.getLogger(MantisKafkaConsumer.class);
    private final int consumerId;
    private final KafkaConsumer<String, byte[]> consumer;
    private final CheckpointStrategy<S> strategy;
    private final CheckpointTrigger trigger;
    private final ConsumerMetrics consumerMetrics;
    private final TopicPartitionStateManager partitionStateManager;
    private final AtomicLong pollTimestamp = new AtomicLong(System.currentTimeMillis());
    private final AtomicLong pollReturnedDataTimestamp = new AtomicLong(System.currentTimeMillis());
    private volatile Subscription metricSubscription = null;

    public MantisKafkaConsumer(int consumerId, KafkaConsumer<String, byte[]> consumer, TopicPartitionStateManager partitionStateManager, CheckpointStrategy<S> strategy, CheckpointTrigger trigger, ConsumerMetrics metrics) {
        this.consumerId = consumerId;
        this.consumerMetrics = metrics;
        this.consumer = consumer;
        this.partitionStateManager = partitionStateManager;
        this.strategy = strategy;
        this.trigger = trigger;
        this.setupMetricPublish();
    }

    private void setupMetricPublish() {
        if (this.metricSubscription == null) {
            this.metricSubscription = Observable.interval((long)1L, (TimeUnit)TimeUnit.SECONDS).subscribe(tick -> {
                this.consumerMetrics.recordTimeSinceLastPollMs(this.timeSinceLastPollMs());
                this.consumerMetrics.recordTimeSinceLastPollWithDataMs(this.timeSinceLastPollWithDataMs());
            });
        }
    }

    public int getConsumerId() {
        return this.consumerId;
    }

    public KafkaConsumer<String, byte[]> getConsumer() {
        return this.consumer;
    }

    public CheckpointStrategy<S> getStrategy() {
        return this.strategy;
    }

    public CheckpointTrigger getTrigger() {
        return this.trigger;
    }

    public TopicPartitionStateManager getPartitionStateManager() {
        return this.partitionStateManager;
    }

    public long timeSinceLastPollMs() {
        return System.currentTimeMillis() - this.pollTimestamp.get();
    }

    public long timeSinceLastPollWithDataMs() {
        return System.currentTimeMillis() - this.pollReturnedDataTimestamp.get();
    }

    public ConsumerMetrics getConsumerMetrics() {
        return this.consumerMetrics;
    }

    public void close() {
        if (this.metricSubscription != null && !this.metricSubscription.isUnsubscribed()) {
            this.metricSubscription.unsubscribe();
        }
        if (this.trigger.isActive()) {
            Set partitions = this.consumer.assignment();
            LOGGER.warn("clearing partition state when closing consumer {}, partitions {}", (Object)this.toString(), (Object)partitions.toString());
            partitions.stream().forEach(tp -> this.partitionStateManager.resetCounters((TopicPartition)tp));
            this.consumer.close();
            this.trigger.shutdown();
        }
    }

    public ConsumerRecords<String, byte[]> poll(long consumerPollTimeoutMs) {
        long now = System.currentTimeMillis();
        this.pollTimestamp.set(now);
        ConsumerRecords consumerRecords = this.consumer.poll(Duration.ofMillis(consumerPollTimeoutMs));
        if (consumerRecords.count() > 0) {
            this.pollReturnedDataTimestamp.set(now);
        }
        return consumerRecords;
    }

    public Set<TopicPartition> assignment() {
        return this.consumer.assignment();
    }

    public Map<String, List<PartitionInfo>> listTopics() {
        return this.consumer.listTopics();
    }

    public long position(TopicPartition partition) {
        return this.consumer.position(partition);
    }

    public void seekToBeginning(TopicPartition ... partitions) {
        this.consumer.seekToBeginning(Arrays.asList(partitions));
    }

    public void pause(TopicPartition ... partitions) {
        LOGGER.debug("pausing {} partitions", (Object)partitions.length);
        this.consumer.pause(Arrays.asList(partitions));
        this.consumerMetrics.incrementPausePartitionCount();
    }

    public void resume(TopicPartition ... partitions) {
        try {
            LOGGER.debug("resuming {} partitions", (Object)partitions.length);
            this.consumer.resume(Arrays.asList(partitions));
            this.consumerMetrics.incrementResumePartitionCount();
        }
        catch (IllegalStateException e) {
            LOGGER.warn("resuming partitions failed", (Throwable)e);
        }
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        MantisKafkaConsumer that = (MantisKafkaConsumer)o;
        return this.consumerId == that.consumerId && this.consumer.equals(that.consumer) && this.strategy.equals(that.strategy) && this.trigger.equals(that.trigger) && this.consumerMetrics.equals(that.consumerMetrics) && this.partitionStateManager.equals(that.partitionStateManager);
    }

    public int hashCode() {
        return Objects.hash(this.consumerId, this.consumer, this.strategy, this.trigger, this.consumerMetrics, this.partitionStateManager);
    }

    public String toString() {
        return "MantisKafkaConsumer{consumerId=" + this.consumerId + ", consumer=" + this.consumer + ", strategy=" + this.strategy + ", trigger=" + this.trigger + '}';
    }

    static class Builder {
        private Context context;
        private int consumerIndex;
        private int totalNumConsumersForJob;
        private Registry registry;
        private MantisKafkaSourceConfig kafkaSourceConfig;
        private static final AtomicInteger consumerId = new AtomicInteger(0);
        private final StaticPartitionAssignor staticPartitionAssignor = new StaticPartitionAssignorImpl();

        Builder() {
        }

        public Builder withContext(Context context) {
            this.context = context;
            return this;
        }

        public Builder withKafkaSourceConfig(MantisKafkaSourceConfig kafkaSourceConfig) {
            this.kafkaSourceConfig = kafkaSourceConfig;
            return this;
        }

        public Builder withConsumerIndex(int consumerIndex) {
            this.consumerIndex = consumerIndex;
            return this;
        }

        public Builder withTotalNumConsumersForJob(int totalNumConsumersForJob) {
            this.totalNumConsumersForJob = totalNumConsumersForJob;
            return this;
        }

        public Builder withRegistry(Registry registry) {
            this.registry = registry;
            return this;
        }

        private void doStaticPartitionAssignment(KafkaConsumer<String, byte[]> consumer, ConsumerRebalanceListener rebalanceListener, int consumerIndex, int totalNumConsumers, Map<String, Integer> topicPartitionCounts, Registry registry) {
            if (totalNumConsumers <= 0) {
                LOGGER.error("total num consumers {} is invalid", (Object)totalNumConsumers);
                this.context.completeAndExit();
                return;
            }
            if (consumerIndex < 0 || consumerIndex >= totalNumConsumers) {
                LOGGER.error("consumerIndex {} is invalid (numConsumers: {})", (Object)consumerIndex, (Object)totalNumConsumers);
                this.context.completeAndExit();
                return;
            }
            List<TopicPartition> topicPartitions = this.staticPartitionAssignor.assignPartitionsToConsumer(consumerIndex, topicPartitionCounts, totalNumConsumers);
            if (topicPartitions.isEmpty()) {
                LOGGER.error("topic partitions to assign list is empty");
                throw new RuntimeException("static partition assignment is enabled and no topic partitions were assigned, please check numPartitionsPerTopic job param is set correctly and the job has num(kafka consumer) <= num(partition)");
            }
            LOGGER.info("Statically assigned topic partitions(): {}", topicPartitions);
            topicPartitions.forEach(tp -> registry.gauge("staticPartitionAssigned", new String[]{"topic", tp.topic(), "partition", String.valueOf(tp.partition())}).set(1.0));
            consumer.assign(topicPartitions);
            rebalanceListener.onPartitionsAssigned(topicPartitions);
        }

        public MantisKafkaConsumer<?> build() {
            Preconditions.checkNotNull((Object)this.context, (String)"context");
            Preconditions.checkNotNull((Object)this.kafkaSourceConfig, (String)"kafkaSourceConfig");
            Preconditions.checkNotNull((Object)this.registry, (String)"registry");
            Preconditions.checkArg((this.consumerIndex >= 0 ? 1 : 0) != 0, (String)"consumerIndex must be greater than or equal to 0");
            Preconditions.checkArg((this.totalNumConsumersForJob > 0 ? 1 : 0) != 0, (String)"total number of consumers for job must be greater than 0");
            int kafkaConsumerId = consumerId.incrementAndGet();
            Map<String, Object> consumerProps = this.kafkaSourceConfig.getConsumerConfig().getConsumerProperties();
            String clientId = String.format("%s-%d-%d", this.context.getJobId(), this.context.getWorkerInfo().getWorkerNumber(), kafkaConsumerId);
            consumerProps.put("client.id", clientId);
            KafkaConsumer consumer = new KafkaConsumer(consumerProps);
            TopicPartitionStateManager partitionStateManager = new TopicPartitionStateManager(this.registry, clientId, this.kafkaSourceConfig.getRetryCheckpointCheckDelayMs());
            ConsumerMetrics metrics = new ConsumerMetrics(this.registry, kafkaConsumerId, this.context);
            CheckpointStrategy<?> strategy = CheckpointStrategyFactory.getNewInstance(this.context, consumer, this.kafkaSourceConfig.getCheckpointStrategy(), metrics);
            if (this.kafkaSourceConfig.getStaticPartitionAssignmentEnabled().booleanValue()) {
                KafkaConsumerRebalanceListener kafkaConsumerRebalanceListener = new KafkaConsumerRebalanceListener(consumer, partitionStateManager, strategy);
                this.kafkaSourceConfig.getTopicPartitionCounts().ifPresent(topicPartitionCounts -> this.doStaticPartitionAssignment((KafkaConsumer<String, byte[]>)consumer, kafkaConsumerRebalanceListener, this.consumerIndex, this.totalNumConsumersForJob, (Map<String, Integer>)topicPartitionCounts, this.registry));
            } else if (this.kafkaSourceConfig.getCheckpointStrategy() != "disableCheckpointing") {
                consumer.subscribe(this.kafkaSourceConfig.getTopics(), new KafkaConsumerRebalanceListener(consumer, partitionStateManager, strategy));
            } else {
                consumer.subscribe(this.kafkaSourceConfig.getTopics());
            }
            CheckpointTrigger trigger = CheckpointTriggerFactory.getNewInstance(this.kafkaSourceConfig);
            return new MantisKafkaConsumer(kafkaConsumerId, (KafkaConsumer<String, byte[]>)consumer, partitionStateManager, strategy, trigger, metrics);
        }
    }
}

