package io.bigdime.handler.kafka;

import io.bigdime.alert.Logger;
import io.bigdime.alert.LoggerFactory;
import io.bigdime.core.ActionEvent;
import io.bigdime.core.AdaptorConfigurationException;
import io.bigdime.core.HandlerException;
import io.bigdime.core.InvalidValueConfigurationException;
import io.bigdime.core.commons.AdaptorLogger;
import io.bigdime.core.commons.PropertyHelper;
import io.bigdime.core.commons.TimeManager;
import io.bigdime.core.config.AdaptorConfig;
import io.bigdime.core.config.AdaptorConfigConstants;
import io.bigdime.core.constants.ActionEventHeaderConstants;
import io.bigdime.core.handler.AbstractHandler;
import io.bigdime.core.handler.HandlerJournal;
import io.bigdime.core.handler.SimpleJournal;
import io.bigdime.core.runtimeinfo.RuntimeInfo;
import io.bigdime.core.runtimeinfo.RuntimeInfoStore;
import io.bigdime.core.runtimeinfo.RuntimeInfoStoreException;
import io.bigdime.handler.constants.KafkaReaderHandlerConstants;
import io.bigdime.libs.kafka.consumers.KafkaMessage;
import io.bigdime.libs.kafka.consumers.KafkaSimpleConsumer;
import io.bigdime.libs.kafka.exceptions.KafkaReaderException;
import io.bigdime.libs.kafka.utills.KafkaUtils;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hdfs.web.HftpFileSystem;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

@Scope("prototype")
@Component
/* loaded from: input_file:lib/bigdime-data-handlers-0.9.1.jar:io/bigdime/handler/kafka/KafkaReaderHandler.class */
public class KafkaReaderHandler extends AbstractHandler {
    private long totalInvocations;
    private static final String KAFKA_MESSAGE_READER_OFFSET = "kafka_message_offset";
    private static final String DF = "yyyyMMdd";
    private static final String PARTITION = "PARTITION";
    private KafkaSimpleConsumer kafkaSimpleConsumer;
    private KafkaInputDescriptor inputDescriptor;
    private List<KafkaMessage> kafkaMessages;
    private String handlerPhase;
    private long currentOffset;
    private long lastOffset;

    @Autowired
    private RuntimeInfoStore<RuntimeInfo> runtimeInfoStore;
    private static final AdaptorLogger logger = new AdaptorLogger(LoggerFactory.getLogger((Class<?>) KafkaReaderHandler.class));
    private static final DateTimeZone timeZone = DateTimeZone.forID(HftpFileSystem.HFTP_TIMEZONE);
    private static final String HOUR_FORMAT = "HH";
    private static final DateTimeFormatter hourFormatter = DateTimeFormat.forPattern(HOUR_FORMAT).withZone(timeZone);
    private static final DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyyMMdd").withZone(timeZone);
    private TimeManager timeManager = TimeManager.getInstance();
    private final String TIMESTAMP = "DT";
    private final String HOUR = "HOUR";
    private String entityName = null;

    @Override // io.bigdime.core.handler.AbstractHandler, io.bigdime.core.Handler
    public void build() throws AdaptorConfigurationException {
        super.build();
        this.handlerPhase = "building Kafka Reader Handler";
        logger.info(this.handlerPhase, "handler_id={} handler_name={} properties={}", getId(), getName(), getPropertyMap());
        String stringProperty = PropertyHelper.getStringProperty(getPropertyMap(), KafkaReaderHandlerConstants.BROKERS);
        logger.debug(this.handlerPhase, "KafkaReader={}", toString());
        Map.Entry entry = (Map.Entry) getPropertyMap().get(AdaptorConfigConstants.SourceConfigConstants.SRC_DESC);
        Map<String, Object> map = (Map) entry.getKey();
        this.inputDescriptor = new KafkaInputDescriptor();
        try {
            this.inputDescriptor.parseDescriptor(map);
            this.entityName = this.inputDescriptor.getEntityName();
            if (this.entityName == null) {
                this.entityName = this.inputDescriptor.getTopic();
            }
            try {
                this.currentOffset = getOffsetFromRuntimeInfo(this.runtimeInfoStore, this.inputDescriptor.getEntityName(), String.valueOf(this.inputDescriptor.getPartition()), KAFKA_MESSAGE_READER_OFFSET);
                if (this.currentOffset > 0) {
                    this.currentOffset++;
                }
                this.kafkaSimpleConsumer = KafkaSimpleConsumer.getInstance().brokers(KafkaUtils.parseBrokers(stringProperty, null)).clientId(AdaptorConfig.getInstance().getName()).topic(this.inputDescriptor.getTopic()).partitionId(String.valueOf(this.inputDescriptor.getPartition())).build();
                logger.info(this.handlerPhase, "topic:partition={} input_field_name={} currentOffset = {} ", entry.getKey(), entry.getValue(), Long.valueOf(this.currentOffset));
            } catch (RuntimeInfoStoreException e) {
                throw new AdaptorConfigurationException(e);
            }
        } catch (IllegalArgumentException e2) {
            throw new InvalidValueConfigurationException("incorrect value specified in src-desc " + e2.getMessage());
        }
    }

    @Override // io.bigdime.core.Handler
    public ActionEvent.Status process() throws HandlerException {
        try {
            this.handlerPhase = "processing KafkaReaderHandler";
            incrementTotalInvocations();
            ActionEvent.Status preProcess = preProcess();
            return preProcess == ActionEvent.Status.BACKOFF ? preProcess : process0();
        } catch (RuntimeInfoStoreException e) {
            throw new HandlerException("Unable to process message from Kafka", e);
        }
    }

    private void incrementTotalInvocations() {
        this.totalInvocations++;
    }

    private ActionEvent.Status preProcess() throws RuntimeInfoStoreException, HandlerException {
        if (!shallProcessNew()) {
            return ActionEvent.Status.READY;
        }
        logger.debug(this.handlerPhase, "will process new batch");
        try {
            if (this.currentOffset == -1) {
                this.kafkaMessages = this.kafkaSimpleConsumer.pollData(this.kafkaSimpleConsumer.getEarliestOffSet());
            } else {
                this.kafkaMessages = this.kafkaSimpleConsumer.pollData(this.currentOffset);
            }
            if (this.kafkaMessages == null || this.kafkaMessages.isEmpty()) {
                logger.info(this.handlerPhase, "no data in the kafka , will BACKOFF topicName={} partition={} currentOffset={}", this.inputDescriptor.getTopic(), Integer.valueOf(this.inputDescriptor.getPartition()), Long.valueOf(this.currentOffset));
                return ActionEvent.Status.BACKOFF;
            }
            this.currentOffset = this.kafkaMessages.get(this.kafkaMessages.size() - 1).getOffset() + 1;
            this.lastOffset = this.kafkaMessages.get(this.kafkaMessages.size() - 1).getOffset();
            logger.debug(this.handlerPhase, "kafkaMessageOffsetFromRuntimeInfo={} kafkaMessages.get(0).getOffset()={}", Long.valueOf(this.currentOffset), Long.valueOf(this.kafkaMessages.get(0).getOffset()));
            int i = 0;
            if (this.lastOffset >= this.kafkaMessages.get(0).getOffset()) {
                if (this.lastOffset > this.kafkaMessages.get(this.kafkaMessages.size() - 1).getOffset()) {
                    return ActionEvent.Status.BACKOFF;
                }
                i = 0;
            }
            getSimpleJournal().setTotalSize(this.kafkaMessages.size());
            getSimpleJournal().setTotalRead(i);
            logger.debug(this.handlerPhase, "handlerContext={} total_size={} total_read={}", getHandlerContext(), Long.valueOf(getTotalSizeFromJournal()), Long.valueOf(getTotalReadFromJournal()));
            return shallProcessNew() ? ActionEvent.Status.BACKOFF : ActionEvent.Status.READY;
        } catch (KafkaReaderException e) {
            logger.alert(Logger.ALERT_TYPE.INGESTION_FAILED, Logger.ALERT_CAUSE.APPLICATION_INTERNAL_ERROR, Logger.ALERT_SEVERITY.BLOCKER, "\"kafka reader exception\" error={}", e.toString());
            throw new HandlerException("Unable to read the messages from Kafka", e);
        }
    }

    private ActionEvent.Status process0() throws HandlerException {
        ActionEvent actionEvent = new ActionEvent();
        long totalReadFromJournal = getTotalReadFromJournal();
        logger.debug(this.handlerPhase, "nextIndexToRead={}", Long.valueOf(totalReadFromJournal));
        KafkaMessage kafkaMessage = this.kafkaMessages.get((int) totalReadFromJournal);
        long offset = kafkaMessage.getOffset();
        actionEvent.setBody(kafkaMessage.getMessage());
        getHandlerContext().createSingleItemEventList(actionEvent);
        getSimpleJournal().setTotalRead(totalReadFromJournal + 1);
        actionEvent.getHeaders().put(ActionEventHeaderConstants.ENTITY_NAME, this.entityName);
        actionEvent.getHeaders().put(ActionEventHeaderConstants.INPUT_DESCRIPTOR, String.valueOf(this.inputDescriptor.getPartition()));
        actionEvent.getHeaders().put(PARTITION, String.valueOf(this.inputDescriptor.getPartition()));
        actionEvent.getHeaders().put(KAFKA_MESSAGE_READER_OFFSET, String.valueOf(offset));
        DateTime localDateTime = this.timeManager.getLocalDateTime();
        String format = this.timeManager.format("yyyyMMdd", localDateTime);
        String print = hourFormatter.print(localDateTime);
        actionEvent.getHeaders().put("DT", format);
        actionEvent.getHeaders().put("HOUR", print);
        processChannelSubmission(actionEvent);
        if (getTotalReadFromJournal() != getTotalSizeFromJournal()) {
            return ActionEvent.Status.CALLBACK;
        }
        logger.info(this.handlerPhase, "totalRead from Journal ={}", Long.valueOf(getTotalSizeFromJournal()));
        return ActionEvent.Status.READY;
    }

    protected long getOffsetFromRuntimeInfo(RuntimeInfoStore<RuntimeInfo> runtimeInfoStore, String str, String str2, String str3) throws RuntimeInfoStoreException {
        try {
            RuntimeInfo runtimeInfo = runtimeInfoStore.get(AdaptorConfig.getInstance().getName(), str, str2);
            if (runtimeInfo == null || runtimeInfo.getProperties() == null) {
                return -1L;
            }
            long longProperty = PropertyHelper.getLongProperty(runtimeInfo.getProperties().get(str3), -1L);
            logger.debug("proessing handler", "MessageOffsetFromRuntimeInfo from runtime info={}", Long.valueOf(longProperty));
            return longProperty;
        } catch (IllegalArgumentException e) {
            return -1L;
        }
    }

    public long getTotalInvocations() {
        return this.totalInvocations;
    }

    public KafkaInputDescriptor getInputDescriptor() {
        return this.inputDescriptor;
    }

    private long getTotalReadFromJournal() throws HandlerException {
        return getSimpleJournal().getTotalRead();
    }

    private long getTotalSizeFromJournal() throws HandlerException {
        return getSimpleJournal().getTotalSize();
    }

    private boolean shallProcessNew() throws HandlerException {
        return getTotalReadFromJournal() == getTotalSizeFromJournal();
    }

    private HandlerJournal getSimpleJournal() throws HandlerException {
        return getNonNullJournal(SimpleJournal.class);
    }

    @Override // io.bigdime.core.handler.AbstractHandler
    protected String getHandlerPhase() {
        return this.handlerPhase;
    }

    @Override // io.bigdime.core.handler.AbstractHandler, io.bigdime.core.Handler
    public void handleException() {
    }
}
