package com.cs.software.engine.kafka.dataprocess.reader;

import com.cs.software.engine.KafkaMessage;
import com.cs.software.engine.dataprocess.DataProcess;
import com.cs.software.engine.dataprocess.DataProcessConfigIntf;
import com.cs.software.engine.dataprocess.DataProcessEngine;
import com.cs.software.engine.dataprocess.DataProcessTemplateIntf;
import com.cs.software.engine.dataprocess.reader.DataProcessReaderBase;
import com.cs.software.engine.datastore.TypeBaseIntf;
import com.cs.software.engine.kafka.KafkaUtil;
import com.cs.software.engine.schema.TableInfo;
import com.cs.software.engine.threadpool.PoolMgr;
import com.cs.software.engine.threadpool.ThreadMgr;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/* loaded from: input_file:com/cs/software/engine/kafka/dataprocess/reader/DataProcessKafka.class */
public class DataProcessKafka extends DataProcessReaderBase {
    private static final int DEF_ERROR_CODE = -9119;
    private static final String threadPoolName = "DataProcessKafka";
    private Map<String, String> inputMap;
    private KafkaConsumer<String, String> kafkaConsumer;
    protected DataProcessEngine engine;
    private PoolMgr poolManager;
    private String topicName;
    private String groupId;
    private int maxThreads;

    public DataProcessKafka() {
        this.fileToken = null;
    }

    public void openInput(DataProcessConfigIntf dataProcessConfigIntf) throws Exception {
        this.dataProcessConfig = dataProcessConfigIntf;
        this.inputMap = this.dataProcessConfig.getInputMap();
        int intValue = new Integer(this.inputMap.get("MinThreads")).intValue();
        this.maxThreads = new Integer(this.inputMap.get("MaxThreads")).intValue();
        if (this.maxThreads > 1) {
            String str = this.inputMap.get("ServiceClassName");
            ThreadMgr threadMgr = ThreadMgr.getInstance();
            threadMgr.setPoolMgr(threadPoolName, str, intValue, this.maxThreads, 5);
            this.poolManager = threadMgr.getPoolMgr(threadPoolName);
        }
        this.groupId = this.inputMap.get("GroupId");
        this.topicName = this.inputMap.get("TopicName");
        ArrayList arrayList = new ArrayList();
        for (String str2 : this.topicName.split(TableInfo.FIELD_SEP)) {
            arrayList.add(str2);
        }
        this.kafkaConsumer = new KafkaConsumer<>(KafkaUtil.getConsumerProperties(this.groupId));
        this.kafkaConsumer.subscribe(arrayList);
    }

    @Override // com.cs.software.engine.dataprocess.reader.DataProcessReaderBase, com.cs.software.engine.dataprocess.DataProcessReaderIntf
    public void loadDataFile() throws Exception {
    }

    @Override // com.cs.software.engine.dataprocess.reader.DataProcessReaderBase, com.cs.software.engine.dataprocess.DataProcessReaderIntf
    public void processTemplate(DataProcessTemplateIntf dataProcessTemplateIntf, DataProcess dataProcess) throws Exception {
        while (true) {
            try {
                try {
                    this.pages.clear();
                    this.pageList.clear();
                    Iterator it = this.kafkaConsumer.poll(Duration.ofSeconds(100L)).iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        if (this.maxThreads > 1) {
                            KafkaMessage kafkaMessage = new KafkaMessage();
                            kafkaMessage.setParam("__Record Key", consumerRecord.key());
                            kafkaMessage.setParam("__Payload", consumerRecord.value());
                            kafkaMessage.setParam(KafkaMessage.KAFKA_RECORD_PARTITION, Integer.valueOf(consumerRecord.partition()));
                            kafkaMessage.setParam(KafkaMessage.KAFKA_RECORD_OFFSET, Long.valueOf(consumerRecord.offset()));
                            kafkaMessage.setParam(KafkaMessage.KAFKA_RECORD_TOPIC, consumerRecord.topic());
                            kafkaMessage.setParam(KafkaMessage.KAFKA_RECORD_HEADERS, consumerRecord.headers());
                            kafkaMessage.setParam(KafkaMessage.KAFKA_RECORD_TIMESTAMP, Long.valueOf(consumerRecord.timestamp()));
                            kafkaMessage.setParam("DataProcessTemplateIntf", dataProcessTemplateIntf);
                            kafkaMessage.setParam("DataProcess", dataProcess);
                            this.poolManager.performWork(kafkaMessage);
                        } else {
                            if (this.engine == null) {
                                this.engine = new DataProcessEngine();
                            }
                            String str = (String) consumerRecord.value();
                            ArrayList arrayList = new ArrayList();
                            ArrayList arrayList2 = new ArrayList();
                            arrayList.add(str);
                            for (String str2 : str.split(TypeBaseIntf.NEW_LINE)) {
                                arrayList2.add(str2);
                            }
                            this.pages = arrayList;
                            this.pageList.add(arrayList2);
                            this.engine.setData(this.pages, this.pageList);
                            this.engine.processTemplate(dataProcessTemplateIntf, dataProcess);
                        }
                    }
                    this.kafkaConsumer.commitAsync();
                } catch (Exception e) {
                    System.out.println("Exception caught " + e.getMessage());
                    this.kafkaConsumer.close();
                    System.out.println("After closing KafkaConsumer");
                    return;
                }
            } catch (Throwable th) {
                this.kafkaConsumer.close();
                System.out.println("After closing KafkaConsumer");
                throw th;
            }
        }
    }

    @Override // com.cs.software.engine.dataprocess.reader.DataProcessReaderBase
    public int getDefError() {
        return DEF_ERROR_CODE;
    }
}
