package io.mantisrx.connector.kafka.source;

import com.netflix.spectator.api.Registry;
import io.mantisrx.connector.kafka.KafkaAckable;
import io.mantisrx.connector.kafka.KafkaData;
import io.mantisrx.connector.kafka.KafkaDataNotification;
import io.mantisrx.connector.kafka.KafkaSourceParameters;
import io.mantisrx.connector.kafka.source.MantisKafkaConsumer;
import io.mantisrx.connector.kafka.source.checkpoint.strategy.CheckpointStrategy;
import io.mantisrx.connector.kafka.source.checkpoint.strategy.CheckpointStrategyOptions;
import io.mantisrx.connector.kafka.source.checkpoint.trigger.CheckpointTrigger;
import io.mantisrx.connector.kafka.source.metrics.ConsumerMetrics;
import io.mantisrx.connector.kafka.source.serde.ParseException;
import io.mantisrx.connector.kafka.source.serde.Parser;
import io.mantisrx.connector.kafka.source.serde.ParserType;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.parameter.ParameterDefinition;
import io.mantisrx.runtime.parameter.type.BooleanParameter;
import io.mantisrx.runtime.parameter.type.IntParameter;
import io.mantisrx.runtime.parameter.type.StringParameter;
import io.mantisrx.runtime.parameter.validator.Validators;
import io.mantisrx.runtime.source.Index;
import io.mantisrx.runtime.source.Source;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.record.InvalidRecordException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscription;
import rx.observables.SyncOnSubscribe;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;

/* loaded from: input_file:io/mantisrx/connector/kafka/source/KafkaSource.class */
public class KafkaSource implements Source<KafkaAckable> {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSource.class);
    private final Registry registry;
    private Subscription ackSubjectSubscription;
    private final AtomicBoolean done = new AtomicBoolean();
    private final Map<Integer, MantisKafkaConsumer<?>> idToConsumerMap = new HashMap();
    private final SerializedSubject<KafkaDataNotification, KafkaDataNotification> ackSubject = new SerializedSubject<>(PublishSubject.create());

    public KafkaSource(Registry registry) {
        this.registry = registry;
    }

    private Observable<MantisKafkaConsumer<?>> createConsumers(Context context, MantisKafkaSourceConfig mantisKafkaSourceConfig, int i) {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < mantisKafkaSourceConfig.getNumConsumerInstances(); i2++) {
            MantisKafkaConsumer<?> build = new MantisKafkaConsumer.Builder().withKafkaSourceConfig(mantisKafkaSourceConfig).withTotalNumConsumersForJob(i * mantisKafkaSourceConfig.getNumConsumerInstances()).withContext(context).withConsumerIndex(context.getWorkerInfo().getWorkerIndex() + (i * i2)).withRegistry(this.registry).build();
            this.idToConsumerMap.put(Integer.valueOf(build.getConsumerId()), build);
            LOGGER.info("created consumer {}", build);
            arrayList.add(build);
        }
        return Observable.from(arrayList);
    }

    private int getPayloadSize(ConsumerRecord<String, byte[]> consumerRecord) {
        return ((byte[]) consumerRecord.value()).length + 100;
    }

    private Observable<KafkaAckable> createBackPressuredConsumerObs(MantisKafkaConsumer<?> mantisKafkaConsumer, MantisKafkaSourceConfig mantisKafkaSourceConfig) {
        CheckpointStrategy<?> strategy = mantisKafkaConsumer.getStrategy();
        CheckpointTrigger trigger = mantisKafkaConsumer.getTrigger();
        ConsumerMetrics consumerMetrics = mantisKafkaConsumer.getConsumerMetrics();
        TopicPartitionStateManager partitionStateManager = mantisKafkaConsumer.getPartitionStateManager();
        int consumerId = mantisKafkaConsumer.getConsumerId();
        return Observable.create(SyncOnSubscribe.createStateful(() -> {
            ConsumerRecords<String, byte[]> poll = mantisKafkaConsumer.poll(mantisKafkaSourceConfig.getConsumerPollTimeoutMs());
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("topic listing for consumer {}", mantisKafkaConsumer.listTopics());
            }
            LOGGER.info("consumer subscribed to topic-partitions {}", mantisKafkaConsumer.assignment());
            return poll.iterator();
        }, (it, observer) -> {
            Iterator it = it;
            Set<TopicPartition> assignment = mantisKafkaConsumer.assignment();
            if (trigger.shouldCheckpoint()) {
                long currentTimeMillis = System.currentTimeMillis();
                strategy.persistCheckpoint(partitionStateManager.createCheckpoint(assignment));
                consumerMetrics.recordCheckpointDelay(System.currentTimeMillis() - currentTimeMillis);
                consumerMetrics.incrementCommitCount();
                trigger.reset();
            }
            if (this.done.get()) {
                mantisKafkaConsumer.close();
            } else {
                try {
                    if (!it.hasNext()) {
                        ConsumerRecords<String, byte[]> poll = mantisKafkaConsumer.poll(mantisKafkaSourceConfig.getConsumerPollTimeoutMs());
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug("poll returned {} records", Integer.valueOf(poll.count()));
                        }
                        it = poll.iterator();
                    }
                    if (it.hasNext()) {
                        ConsumerRecord<String, byte[]> consumerRecord = (ConsumerRecord) it.next();
                        TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
                        consumerMetrics.incrementInCount();
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug("updating read offset to " + consumerRecord.offset() + " read " + consumerRecord.value());
                        }
                        if (consumerRecord.value() != null) {
                            try {
                                trigger.update(getPayloadSize(consumerRecord));
                                if (mantisKafkaSourceConfig.getParseMessageInSource().booleanValue()) {
                                    Parser parser = ParserType.parser(mantisKafkaSourceConfig.getMessageParserType()).getParser();
                                    if (parser.canParse((byte[]) consumerRecord.value())) {
                                        KafkaAckable kafkaAckable = new KafkaAckable(new KafkaData(consumerRecord, Optional.ofNullable(parser.parseMessage((byte[]) consumerRecord.value())), Optional.ofNullable(consumerRecord.key()), consumerId), this.ackSubject);
                                        partitionStateManager.recordMessageRead(topicPartition, consumerRecord.offset());
                                        consumerMetrics.recordReadOffset(topicPartition, consumerRecord.offset());
                                        observer.onNext(kafkaAckable);
                                    } else {
                                        consumerMetrics.incrementParseFailureCount();
                                    }
                                } else {
                                    KafkaAckable kafkaAckable2 = new KafkaAckable(new KafkaData(consumerRecord, Optional.empty(), Optional.ofNullable(consumerRecord.key()), consumerId), this.ackSubject);
                                    partitionStateManager.recordMessageRead(topicPartition, consumerRecord.offset());
                                    consumerMetrics.recordReadOffset(topicPartition, consumerRecord.offset());
                                    observer.onNext(kafkaAckable2);
                                }
                            } catch (ParseException e) {
                                consumerMetrics.incrementErrorCount();
                                LOGGER.warn("failed to parse {}:{} message {}", new Object[]{consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), consumerRecord.value(), e});
                            }
                        } else {
                            consumerMetrics.incrementKafkaMessageValueNullCount();
                        }
                    } else {
                        consumerMetrics.incrementWaitForDataCount();
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug("Reached head of partition, waiting for more data");
                        }
                        TimeUnit.MILLISECONDS.sleep(200L);
                    }
                } catch (InvalidRecordException e2) {
                    consumerMetrics.incrementErrorCount();
                    LOGGER.warn("iterator error with invalid message. message will be dropped " + e2.getMessage());
                } catch (InterruptedException e3) {
                    LOGGER.error("consumer interrupted", e3);
                    Thread.currentThread().interrupt();
                } catch (Exception e4) {
                    consumerMetrics.incrementErrorCount();
                    LOGGER.warn("caught exception", e4);
                } catch (OffsetOutOfRangeException e5) {
                    LOGGER.warn("offsets out of range " + e5.partitions() + " will seek to beginning", e5);
                    for (TopicPartition topicPartition2 : e5.partitions()) {
                        LOGGER.info("partition {} consumer position {}", topicPartition2, Long.valueOf(mantisKafkaConsumer.position(topicPartition2)));
                    }
                    mantisKafkaConsumer.seekToBeginning((TopicPartition[]) e5.partitions().toArray(new TopicPartition[e5.partitions().size()]));
                } catch (TimeoutException e6) {
                    consumerMetrics.incrementWaitForDataCount();
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("Reached head of partition waiting for more data");
                    }
                } catch (KafkaException e7) {
                    consumerMetrics.incrementErrorCount();
                    LOGGER.warn("Other Kafka exception, message will be dropped. " + e7.getMessage());
                }
            }
            return it;
        }, it2 -> {
            LOGGER.info("closing Kafka consumer on unsubscribe" + mantisKafkaConsumer.toString());
            mantisKafkaConsumer.close();
        })).subscribeOn(Schedulers.newThread()).doOnUnsubscribe(() -> {
            LOGGER.info("consumer {} stopped due to unsubscribe", Integer.valueOf(consumerId));
        }).doOnError(th -> {
            LOGGER.error("consumer {} stopped due to error", Integer.valueOf(consumerId), th);
            consumerMetrics.incrementErrorCount();
        }).doOnTerminate(() -> {
            LOGGER.info("consumer {} terminated", Integer.valueOf(consumerId));
        });
    }

    public Observable<Observable<KafkaAckable>> call(Context context, Index index) {
        int totalNumWorkers = index.getTotalNumWorkers();
        MantisKafkaSourceConfig mantisKafkaSourceConfig = new MantisKafkaSourceConfig(context);
        startAckProcessor();
        return Observable.create(subscriber -> {
            createConsumers(context, mantisKafkaSourceConfig, totalNumWorkers).subscribe(mantisKafkaConsumer -> {
                subscriber.onNext(createBackPressuredConsumerObs(mantisKafkaConsumer, mantisKafkaSourceConfig));
            });
        }).doOnUnsubscribe(() -> {
            LOGGER.info("unsubscribed");
            this.done.set(true);
        }).doOnSubscribe(() -> {
            LOGGER.info("subscribed");
            this.done.set(false);
        });
    }

    public void close() throws IOException {
        this.done.set(true);
        stopAckProcessor();
    }

    private void processAckNotification(KafkaDataNotification kafkaDataNotification) {
        KafkaData value = kafkaDataNotification.getValue();
        TopicPartition topicPartition = new TopicPartition(value.getTopic(), value.getPartition());
        MantisKafkaConsumer<?> mantisKafkaConsumer = this.idToConsumerMap.get(Integer.valueOf(value.getMantisKafkaConsumerId()));
        if (mantisKafkaConsumer == null) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("got Ack for consumer id {} not in idToConsumerMap (topic {})", Integer.valueOf(value.getMantisKafkaConsumerId()), value.getTopic());
            }
        } else {
            mantisKafkaConsumer.getPartitionStateManager().recordMessageAck(topicPartition, value.getOffset());
            if (!kafkaDataNotification.isSuccess()) {
                LOGGER.debug("Got negative acknowledgement {}", kafkaDataNotification);
            }
            mantisKafkaConsumer.getConsumerMetrics().incrementProcessedCount();
        }
    }

    private void startAckProcessor() {
        LOGGER.info("Acknowledgement processor started");
        this.ackSubjectSubscription = this.ackSubject.subscribe(kafkaDataNotification -> {
            processAckNotification(kafkaDataNotification);
        });
    }

    private void stopAckProcessor() {
        if (this.ackSubjectSubscription != null) {
            this.ackSubjectSubscription.unsubscribe();
            this.ackSubjectSubscription = null;
        }
    }

    public List<ParameterDefinition<?>> getParameters() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new StringParameter().name(KafkaSourceParameters.TOPIC).description("Kafka topic to connect to").validator(Validators.notNullOrEmpty()).required().build());
        arrayList.add(new StringParameter().name(KafkaSourceParameters.CHECKPOINT_STRATEGY).description("checkpoint strategy one of " + CheckpointStrategyOptions.values() + " (ensure enable.auto.commit param is set to false when enabling this)").defaultValue(CheckpointStrategyOptions.NONE).validator(Validators.alwaysPass()).build());
        arrayList.add(new IntParameter().name(KafkaSourceParameters.NUM_KAFKA_CONSUMER_PER_WORKER).description("No. of Kafka consumer instances per Mantis worker").validator(Validators.range(1, 16)).defaultValue(1).build());
        arrayList.add(new IntParameter().name(KafkaSourceParameters.MAX_BYTES_IN_PROCESSING).description("The maximum amount of data per-consumer awaiting acks to trigger an offsets commit. These commits are in addition to any commits triggered by commitIntervalMs timer").defaultValue(Integer.valueOf(MantisKafkaSourceConfig.DEFAULT_MAX_BYTES_IN_PROCESSING)).validator(Validators.range(1, Integer.MAX_VALUE)).build());
        arrayList.add(new IntParameter().name(KafkaSourceParameters.CONSUMER_POLL_TIMEOUT_MS).validator(Validators.range(100, Integer.valueOf(MantisKafkaConsumerConfig.DEFAULT_SESSION_TIMEOUT_MS))).defaultValue(250).build());
        arrayList.add(new StringParameter().name(KafkaSourceParameters.PARSER_TYPE).validator(Validators.notNullOrEmpty()).defaultValue(ParserType.SIMPLE_JSON.getPropName()).build());
        arrayList.add(new BooleanParameter().name(KafkaSourceParameters.PARSE_MSG_IN_SOURCE).validator(Validators.alwaysPass()).defaultValue(true).build());
        arrayList.add(new BooleanParameter().name(KafkaSourceParameters.ENABLE_STATIC_PARTITION_ASSIGN).validator(Validators.alwaysPass()).defaultValue(false).description("Disable Kafka's default consumer group management and statically assign partitions to job workers. When enabling static partition assignments, disable auto-scaling and set the numPartitionsPerTopic job parameter").build());
        arrayList.add(new StringParameter().name(KafkaSourceParameters.TOPIC_PARTITION_COUNTS).validator(Validators.alwaysPass()).defaultValue("").description("Configures number of partitions on a kafka topic when static partition assignment is enabled. Format <topic1>:<numPartitions Topic1>,<topic2>:<numPartitions Topic2> Example: nf_errors_log:9,clevent:450").build());
        arrayList.addAll(MantisKafkaConsumerConfig.getJobParameterDefinitions());
        return arrayList;
    }
}
