package io.mantisrx.connector.kafka.source;

import com.netflix.spectator.api.Registry;
import com.netflix.spectator.impl.Preconditions;
import io.mantisrx.connector.kafka.KafkaSourceParameters;
import io.mantisrx.connector.kafka.source.assignor.StaticPartitionAssignor;
import io.mantisrx.connector.kafka.source.assignor.StaticPartitionAssignorImpl;
import io.mantisrx.connector.kafka.source.checkpoint.strategy.CheckpointStrategy;
import io.mantisrx.connector.kafka.source.checkpoint.strategy.CheckpointStrategyFactory;
import io.mantisrx.connector.kafka.source.checkpoint.strategy.CheckpointStrategyOptions;
import io.mantisrx.connector.kafka.source.checkpoint.trigger.CheckpointTrigger;
import io.mantisrx.connector.kafka.source.checkpoint.trigger.CheckpointTriggerFactory;
import io.mantisrx.connector.kafka.source.metrics.ConsumerMetrics;
import io.mantisrx.runtime.Context;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscription;

/* loaded from: input_file:io/mantisrx/connector/kafka/source/MantisKafkaConsumer.class */
public class MantisKafkaConsumer<S> {
    private static final Logger LOGGER = LoggerFactory.getLogger(MantisKafkaConsumer.class);
    private final int consumerId;
    private final KafkaConsumer<String, byte[]> consumer;
    private final CheckpointStrategy<S> strategy;
    private final CheckpointTrigger trigger;
    private final ConsumerMetrics consumerMetrics;
    private final TopicPartitionStateManager partitionStateManager;
    private final AtomicLong pollTimestamp = new AtomicLong(System.currentTimeMillis());
    private final AtomicLong pollReturnedDataTimestamp = new AtomicLong(System.currentTimeMillis());
    private volatile Subscription metricSubscription = null;

    /* loaded from: input_file:io/mantisrx/connector/kafka/source/MantisKafkaConsumer$Builder.class */
    static class Builder {
        private Context context;
        private int consumerIndex;
        private int totalNumConsumersForJob;
        private Registry registry;
        private MantisKafkaSourceConfig kafkaSourceConfig;
        private static final AtomicInteger consumerId = new AtomicInteger(0);
        private final StaticPartitionAssignor staticPartitionAssignor = new StaticPartitionAssignorImpl();

        public Builder withContext(Context context) {
            this.context = context;
            return this;
        }

        public Builder withKafkaSourceConfig(MantisKafkaSourceConfig mantisKafkaSourceConfig) {
            this.kafkaSourceConfig = mantisKafkaSourceConfig;
            return this;
        }

        public Builder withConsumerIndex(int i) {
            this.consumerIndex = i;
            return this;
        }

        public Builder withTotalNumConsumersForJob(int i) {
            this.totalNumConsumersForJob = i;
            return this;
        }

        public Builder withRegistry(Registry registry) {
            this.registry = registry;
            return this;
        }

        private void doStaticPartitionAssignment(KafkaConsumer<String, byte[]> kafkaConsumer, ConsumerRebalanceListener consumerRebalanceListener, int i, int i2, Map<String, Integer> map, Registry registry) {
            if (i2 <= 0) {
                MantisKafkaConsumer.LOGGER.error("total num consumers {} is invalid", Integer.valueOf(i2));
                this.context.completeAndExit();
                return;
            }
            if (i < 0 || i >= i2) {
                MantisKafkaConsumer.LOGGER.error("consumerIndex {} is invalid (numConsumers: {})", Integer.valueOf(i), Integer.valueOf(i2));
                this.context.completeAndExit();
                return;
            }
            List<TopicPartition> assignPartitionsToConsumer = this.staticPartitionAssignor.assignPartitionsToConsumer(i, map, i2);
            if (assignPartitionsToConsumer.isEmpty()) {
                MantisKafkaConsumer.LOGGER.error("topic partitions to assign list is empty");
                throw new RuntimeException("static partition assignment is enabled and no topic partitions were assigned, please check numPartitionsPerTopic job param is set correctly and the job has num(kafka consumer) <= num(partition)");
            }
            MantisKafkaConsumer.LOGGER.info("Statically assigned topic partitions(): {}", assignPartitionsToConsumer);
            assignPartitionsToConsumer.forEach(topicPartition -> {
                registry.gauge("staticPartitionAssigned", new String[]{KafkaSourceParameters.TOPIC, topicPartition.topic(), "partition", String.valueOf(topicPartition.partition())}).set(1.0d);
            });
            kafkaConsumer.assign(assignPartitionsToConsumer);
            consumerRebalanceListener.onPartitionsAssigned(assignPartitionsToConsumer);
        }

        public MantisKafkaConsumer<?> build() {
            Preconditions.checkNotNull(this.context, "context");
            Preconditions.checkNotNull(this.kafkaSourceConfig, "kafkaSourceConfig");
            Preconditions.checkNotNull(this.registry, "registry");
            Preconditions.checkArg(this.consumerIndex >= 0, "consumerIndex must be greater than or equal to 0");
            Preconditions.checkArg(this.totalNumConsumersForJob > 0, "total number of consumers for job must be greater than 0");
            int incrementAndGet = consumerId.incrementAndGet();
            Map<String, Object> consumerProperties = this.kafkaSourceConfig.getConsumerConfig().getConsumerProperties();
            String format = String.format("%s-%d-%d", this.context.getJobId(), Integer.valueOf(this.context.getWorkerInfo().getWorkerNumber()), Integer.valueOf(incrementAndGet));
            consumerProperties.put("client.id", format);
            KafkaConsumer kafkaConsumer = new KafkaConsumer(consumerProperties);
            TopicPartitionStateManager topicPartitionStateManager = new TopicPartitionStateManager(this.registry, format, this.kafkaSourceConfig.getRetryCheckpointCheckDelayMs());
            ConsumerMetrics consumerMetrics = new ConsumerMetrics(this.registry, incrementAndGet, this.context);
            CheckpointStrategy<?> newInstance = CheckpointStrategyFactory.getNewInstance(this.context, kafkaConsumer, this.kafkaSourceConfig.getCheckpointStrategy(), consumerMetrics);
            if (this.kafkaSourceConfig.getStaticPartitionAssignmentEnabled().booleanValue()) {
                KafkaConsumerRebalanceListener kafkaConsumerRebalanceListener = new KafkaConsumerRebalanceListener(kafkaConsumer, topicPartitionStateManager, newInstance);
                this.kafkaSourceConfig.getTopicPartitionCounts().ifPresent(map -> {
                    doStaticPartitionAssignment(kafkaConsumer, kafkaConsumerRebalanceListener, this.consumerIndex, this.totalNumConsumersForJob, map, this.registry);
                });
            } else if (this.kafkaSourceConfig.getCheckpointStrategy() != CheckpointStrategyOptions.NONE) {
                kafkaConsumer.subscribe(this.kafkaSourceConfig.getTopics(), new KafkaConsumerRebalanceListener(kafkaConsumer, topicPartitionStateManager, newInstance));
            } else {
                kafkaConsumer.subscribe(this.kafkaSourceConfig.getTopics());
            }
            return new MantisKafkaConsumer<>(incrementAndGet, kafkaConsumer, topicPartitionStateManager, newInstance, CheckpointTriggerFactory.getNewInstance(this.kafkaSourceConfig), consumerMetrics);
        }
    }

    public MantisKafkaConsumer(int i, KafkaConsumer<String, byte[]> kafkaConsumer, TopicPartitionStateManager topicPartitionStateManager, CheckpointStrategy<S> checkpointStrategy, CheckpointTrigger checkpointTrigger, ConsumerMetrics consumerMetrics) {
        this.consumerId = i;
        this.consumerMetrics = consumerMetrics;
        this.consumer = kafkaConsumer;
        this.partitionStateManager = topicPartitionStateManager;
        this.strategy = checkpointStrategy;
        this.trigger = checkpointTrigger;
        setupMetricPublish();
    }

    private void setupMetricPublish() {
        if (this.metricSubscription == null) {
            this.metricSubscription = Observable.interval(1L, TimeUnit.SECONDS).subscribe(l -> {
                this.consumerMetrics.recordTimeSinceLastPollMs(timeSinceLastPollMs());
                this.consumerMetrics.recordTimeSinceLastPollWithDataMs(timeSinceLastPollWithDataMs());
            });
        }
    }

    public int getConsumerId() {
        return this.consumerId;
    }

    public KafkaConsumer<String, byte[]> getConsumer() {
        return this.consumer;
    }

    public CheckpointStrategy<S> getStrategy() {
        return this.strategy;
    }

    public CheckpointTrigger getTrigger() {
        return this.trigger;
    }

    public TopicPartitionStateManager getPartitionStateManager() {
        return this.partitionStateManager;
    }

    public long timeSinceLastPollMs() {
        return System.currentTimeMillis() - this.pollTimestamp.get();
    }

    public long timeSinceLastPollWithDataMs() {
        return System.currentTimeMillis() - this.pollReturnedDataTimestamp.get();
    }

    public ConsumerMetrics getConsumerMetrics() {
        return this.consumerMetrics;
    }

    public void close() {
        if (this.metricSubscription != null && !this.metricSubscription.isUnsubscribed()) {
            this.metricSubscription.unsubscribe();
        }
        if (this.trigger.isActive()) {
            Set assignment = this.consumer.assignment();
            LOGGER.warn("clearing partition state when closing consumer {}, partitions {}", toString(), assignment.toString());
            assignment.stream().forEach(topicPartition -> {
                this.partitionStateManager.resetCounters(topicPartition);
            });
            this.consumer.close();
            this.trigger.shutdown();
        }
    }

    public ConsumerRecords<String, byte[]> poll(long j) {
        long currentTimeMillis = System.currentTimeMillis();
        this.pollTimestamp.set(currentTimeMillis);
        ConsumerRecords<String, byte[]> poll = this.consumer.poll(Duration.ofMillis(j));
        if (poll.count() > 0) {
            this.pollReturnedDataTimestamp.set(currentTimeMillis);
        }
        return poll;
    }

    public Set<TopicPartition> assignment() {
        return this.consumer.assignment();
    }

    public Map<String, List<PartitionInfo>> listTopics() {
        return this.consumer.listTopics();
    }

    public long position(TopicPartition topicPartition) {
        return this.consumer.position(topicPartition);
    }

    public void seekToBeginning(TopicPartition... topicPartitionArr) {
        this.consumer.seekToBeginning(Arrays.asList(topicPartitionArr));
    }

    public void pause(TopicPartition... topicPartitionArr) {
        LOGGER.debug("pausing {} partitions", Integer.valueOf(topicPartitionArr.length));
        this.consumer.pause(Arrays.asList(topicPartitionArr));
        this.consumerMetrics.incrementPausePartitionCount();
    }

    public void resume(TopicPartition... topicPartitionArr) {
        try {
            LOGGER.debug("resuming {} partitions", Integer.valueOf(topicPartitionArr.length));
            this.consumer.resume(Arrays.asList(topicPartitionArr));
            this.consumerMetrics.incrementResumePartitionCount();
        } catch (IllegalStateException e) {
            LOGGER.warn("resuming partitions failed", e);
        }
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        MantisKafkaConsumer mantisKafkaConsumer = (MantisKafkaConsumer) obj;
        return this.consumerId == mantisKafkaConsumer.consumerId && this.consumer.equals(mantisKafkaConsumer.consumer) && this.strategy.equals(mantisKafkaConsumer.strategy) && this.trigger.equals(mantisKafkaConsumer.trigger) && this.consumerMetrics.equals(mantisKafkaConsumer.consumerMetrics) && this.partitionStateManager.equals(mantisKafkaConsumer.partitionStateManager);
    }

    public int hashCode() {
        return Objects.hash(Integer.valueOf(this.consumerId), this.consumer, this.strategy, this.trigger, this.consumerMetrics, this.partitionStateManager);
    }

    public String toString() {
        return "MantisKafkaConsumer{consumerId=" + this.consumerId + ", consumer=" + this.consumer + ", strategy=" + this.strategy + ", trigger=" + this.trigger + '}';
    }
}
