package io.bigdime.handler.file;

import io.bigdime.alert.Logger;
import io.bigdime.alert.LoggerFactory;
import io.bigdime.core.ActionEvent;
import io.bigdime.core.AdaptorConfigurationException;
import io.bigdime.core.HandlerException;
import io.bigdime.core.InvalidValueConfigurationException;
import io.bigdime.core.commons.AdaptorLogger;
import io.bigdime.core.commons.FileHelper;
import io.bigdime.core.commons.PropertyHelper;
import io.bigdime.core.config.AdaptorConfigConstants;
import io.bigdime.core.constants.ActionEventHeaderConstants;
import io.bigdime.core.handler.AbstractHandler;
import io.bigdime.core.runtimeinfo.RuntimeInfo;
import io.bigdime.core.runtimeinfo.RuntimeInfoStore;
import io.bigdime.core.runtimeinfo.RuntimeInfoStoreException;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.zip.ZipEntry;
import java.util.zip.ZipException;
import java.util.zip.ZipFile;
import org.apache.commons.lang.NotImplementedException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

@Scope("prototype")
@Component
/* loaded from: input_file:lib/bigdime-data-handlers-0.9.1.jar:io/bigdime/handler/file/ZipFileInputStreamHandler.class */
public class ZipFileInputStreamHandler extends AbstractHandler {
    public static final String FILE_LOCATION = "fileLocation";
    private File currentFile;
    private String fileNamePattern;
    private int bufferSize;
    private String fileLocation;
    private FileInputDescriptor fileInputDescriptor;

    @Autowired
    private RuntimeInfoStore<RuntimeInfo> runtimeInfoStore;

    @Autowired
    private FileHelper fileHelper;
    private String entityName;
    private File currentFileToProcess;
    private boolean preserveBasePath;
    private boolean preserveRelativePath;
    private ZipEntry zipFileEntry;
    private ZipFile zipFile;
    private InputStream inputStream;
    private BufferedInputStream bufferedInputStream;
    private int noOfEntry;
    Enumeration<? extends ZipEntry> entriesInZip;
    List<RuntimeInfo> dirtyRecords;
    private static final AdaptorLogger logger = new AdaptorLogger(LoggerFactory.getLogger((Class<?>) ZipFileInputStreamHandler.class));
    private static int DEFAULT_BUFFER_SIZE = 1048576;
    private long zipFileLength = -1;
    private String handlerPhase = "";
    private int readEntries = 0;
    private String basePath = "/";
    private String fileName = "";
    private long totalRead = 0;
    private long fileSize = 0;
    long dirtyRecordCount = 0;
    private boolean processingDirty = false;

    @Override // io.bigdime.core.handler.AbstractHandler, io.bigdime.core.Handler
    public void build() throws AdaptorConfigurationException {
        super.build();
        this.handlerPhase = "building ZipFileInputStreamHandler";
        logger.info(this.handlerPhase, "properties={}", getPropertyMap());
        Map.Entry entry = (Map.Entry) getPropertyMap().get(AdaptorConfigConstants.SourceConfigConstants.SRC_DESC);
        if (entry == null) {
            throw new InvalidValueConfigurationException("src-desc can't be null");
        }
        logger.debug(this.handlerPhase, "entity:fileNamePattern={} input_field_name={}", entry.getKey(), entry.getValue());
        this.basePath = PropertyHelper.getStringProperty(getPropertyMap(), FileInputStreamReaderHandlerConstants.BASE_PATH, "/");
        logger.debug(this.handlerPhase, "basePath={}", this.basePath);
        this.fileInputDescriptor = new FileInputDescriptor();
        try {
            this.fileInputDescriptor.parseDescriptor(this.basePath);
            this.fileLocation = this.fileInputDescriptor.getPath();
            if (this.fileLocation == null) {
                throw new InvalidValueConfigurationException("filePath in src-desc can't be null");
            }
            String[] parseSourceDescriptor = this.fileInputDescriptor.parseSourceDescriptor((String) entry.getKey());
            this.entityName = parseSourceDescriptor[0];
            this.fileNamePattern = parseSourceDescriptor[1];
            logger.debug(this.handlerPhase, "entityName={} fileNamePattern={}", this.entityName, this.fileNamePattern);
            this.bufferSize = PropertyHelper.getIntProperty(getPropertyMap(), FileInputStreamReaderHandlerConstants.BUFFER_SIZE, DEFAULT_BUFFER_SIZE);
            logger.debug(this.handlerPhase, "id={} fileLocation={}", getId(), this.fileLocation);
            this.preserveBasePath = PropertyHelper.getBooleanProperty(getPropertyMap(), FileInputStreamReaderHandlerConstants.PRESERVE_BASE_PATH);
            this.preserveRelativePath = PropertyHelper.getBooleanProperty(getPropertyMap(), FileInputStreamReaderHandlerConstants.PRESERVE_RELATIVE_PATH);
            logger.debug(this.handlerPhase, "id={}", getId());
        } catch (IllegalArgumentException e) {
            throw new InvalidValueConfigurationException("src-desc must contain entityName:fileNamePrefix, e.g. user:web_user*");
        }
    }

    @Override // io.bigdime.core.Handler
    public ActionEvent.Status process() throws HandlerException {
        this.handlerPhase = "processing ZipFileInputStreamHandler";
        incrementInvocationCount();
        try {
            ActionEvent.Status preProcess = preProcess();
            if (preProcess != ActionEvent.Status.BACKOFF) {
                return doProcess();
            }
            logger.debug(this.handlerPhase, "returning BACKOFF");
            return preProcess;
        } catch (RuntimeInfoStoreException e) {
            throw new HandlerException("Unable to process message from file", e);
        } catch (IOException e2) {
            logger.alert(Logger.ALERT_TYPE.INGESTION_FAILED, Logger.ALERT_CAUSE.APPLICATION_INTERNAL_ERROR, Logger.ALERT_SEVERITY.BLOCKER, "error during reading file", e2);
            throw new HandlerException("Unable to process message from file", e2);
        }
    }

    private boolean isFirstRun() {
        return getInvocationCount() == 1;
    }

    private void getStartedRecordsFromRuntimeInfos() throws RuntimeInfoStoreException {
        this.dirtyRecords = getAllStartedRuntimeInfos(this.runtimeInfoStore, this.entityName);
        if (this.dirtyRecords == null || this.dirtyRecords.isEmpty()) {
            logger.info(this.handlerPhase, "_message=\"no dirty records found\" handler_id={}", getId());
        } else {
            this.dirtyRecordCount = this.dirtyRecords.size();
            logger.warn(this.handlerPhase, "_message=\"dirty records found\" handler_id={} dirty_record_count=\"{}\" entityName={}", getId(), Long.valueOf(this.dirtyRecordCount), this.entityName);
        }
    }

    private ActionEvent.Status preProcess() throws IOException, RuntimeInfoStoreException, HandlerException {
        if (isFirstRun()) {
            getStartedRecordsFromRuntimeInfos();
        }
        if (!readAllFromZip() || !readAllFromFile()) {
            return ActionEvent.Status.READY;
        }
        this.currentFileToProcess = getNextFileToProcess();
        if (this.currentFileToProcess == null) {
            logger.info(this.handlerPhase, "_message=\"no file to process\" handler_id={} ", getId());
            return ActionEvent.Status.BACKOFF;
        }
        this.noOfEntry = getNumberOfFileInZip(this.currentFileToProcess);
        this.readEntries++;
        this.zipFileLength = this.currentFileToProcess.length();
        logger.info(this.handlerPhase, "_message=\"got a new file to process\" handler_id={} zip_file_length={}", getId(), Long.valueOf(this.zipFileLength));
        if (this.zipFileLength == 0) {
            logger.info(this.handlerPhase, "_message=\"zip file is empty\" handler_id={} ", getId());
            return ActionEvent.Status.BACKOFF;
        }
        logger.info(this.handlerPhase, "_message=\"number files in zip\" handler_id={} no_of_file={} ", getId(), Integer.valueOf(this.noOfEntry));
        getZipFileHandlerJournal().setTotalEntries(this.noOfEntry);
        getZipFileHandlerJournal().setZipFileName(this.currentFileToProcess.getName());
        getZipFileHandlerJournal().setReadEntries(this.readEntries);
        return ActionEvent.Status.READY;
    }

    private ActionEvent.Status doProcess() throws IOException, HandlerException, RuntimeInfoStoreException {
        logger.debug(this.handlerPhase, "handler_id={} next_index_to_read={} buffer_size={}", getId(), Long.valueOf(getTotalReadFromJournal()), Integer.valueOf(this.bufferSize));
        if (getReadEntriesFromJournal() == 1 && readAllFromFile()) {
            this.entriesInZip = this.zipFile.entries();
        }
        if (readAllFromFile()) {
            this.fileName = fileNameFromZip(this.entriesInZip);
            String substring = this.fileName.contains("/") ? this.fileName.substring(this.fileName.lastIndexOf("/") + 1) : this.fileName;
            getZipFileHandlerJournal().setEntryName(this.fileName);
            getZipFileHandlerJournal().setSrcFileName(substring);
            getZipFileHandlerJournal().setTotalSize(this.zipFileEntry.getSize());
            this.inputStream = this.zipFile.getInputStream(this.zipFileEntry);
            this.bufferedInputStream = new BufferedInputStream(this.inputStream);
            this.fileSize = this.zipFileEntry.getSize();
            if (this.fileSize == 0) {
                logger.info(this.handlerPhase, "_message=\"file is empty\" handler_id={} ", getId());
                return ActionEvent.Status.BACKOFF;
            }
        }
        logger.info(this.handlerPhase, "_message=\"File details\" handler_id={}  file_name={} file_size={} ", getId(), this.fileName, Long.valueOf(this.fileSize));
        int i = this.fileSize < ((long) this.bufferSize) ? (int) this.fileSize : this.bufferSize;
        byte[] bArr = new byte[i];
        long read = this.bufferedInputStream.read(bArr, 0, i);
        if (read <= 0) {
            logger.debug(this.handlerPhase, "returning READY, no data read from the file");
            return ActionEvent.Status.READY;
        }
        this.totalRead += read;
        getZipFileHandlerJournal().setTotalRead(this.totalRead);
        ActionEvent actionEvent = new ActionEvent();
        logger.debug(this.handlerPhase, "handler_id={} readBody.length={} remaining={}", getId(), Long.valueOf(read), Long.valueOf(this.fileSize - read));
        actionEvent.setBody(bArr);
        actionEvent.getHeaders().put(ActionEventHeaderConstants.INPUT_DESCRIPTOR, this.currentFile.getAbsolutePath());
        actionEvent.getHeaders().put(ActionEventHeaderConstants.SOURCE_FILE_PATH, this.currentFile.getAbsolutePath());
        actionEvent.getHeaders().put(ActionEventHeaderConstants.SOURCE_FILE_NAME, getZipFileHandlerJournal().getSrcFileName());
        actionEvent.getHeaders().put(ActionEventHeaderConstants.SOURCE_FILE_TOTAL_SIZE, String.valueOf(getTotalSizeFromJournal()));
        actionEvent.getHeaders().put(ActionEventHeaderConstants.SOURCE_FILE_TOTAL_READ, String.valueOf(getTotalReadFromJournal()));
        actionEvent.getHeaders().put(ActionEventHeaderConstants.SOURCE_FILE_LOCATION, this.currentFile.getParent());
        actionEvent.getHeaders().put("basePath", this.basePath);
        actionEvent.getHeaders().put(ActionEventHeaderConstants.RELATIVE_PATH, this.currentFile.getParent().length() > this.basePath.length() ? this.currentFile.getParent().substring(this.basePath.length()) : "");
        actionEvent.getHeaders().put(ActionEventHeaderConstants.PRESERVE_BASE_PATH, String.valueOf(this.preserveBasePath));
        actionEvent.getHeaders().put(ActionEventHeaderConstants.PRESERVE_RELATIVE_PATH, String.valueOf(this.preserveRelativePath));
        logger.debug(this.handlerPhase, "setting entityName header, value={}", this.entityName);
        actionEvent.getHeaders().put(ActionEventHeaderConstants.ENTITY_NAME, this.entityName);
        logger.debug(this.handlerPhase, "setting CLEANUP_REQUIRED header, processingDirty={} ", Boolean.valueOf(this.processingDirty));
        if (this.processingDirty) {
            actionEvent.getHeaders().put(ActionEventHeaderConstants.CLEANUP_REQUIRED, "true");
        }
        this.processingDirty = false;
        actionEvent.getHeaders().put(ActionEventHeaderConstants.READ_COMPLETE, Boolean.FALSE.toString());
        getHandlerContext().createSingleItemEventList(actionEvent);
        processChannelSubmission(actionEvent);
        if (!readAllFromFile() && getZipFileHandlerJournal().getEntryName().equalsIgnoreCase(this.fileName)) {
            this.fileSize -= read;
            return ActionEvent.Status.CALLBACK;
        }
        if (!getZipFileHandlerJournal().getEntryName().equalsIgnoreCase(this.fileName)) {
            logger.alert(Logger.ALERT_TYPE.INGESTION_FAILED, Logger.ALERT_CAUSE.APPLICATION_INTERNAL_ERROR, Logger.ALERT_SEVERITY.BLOCKER, "file name mismatch during read file");
            throw new HandlerException("file name is not same as file name in Journal");
        }
        if (this.readEntries == this.noOfEntry) {
            logger.debug(this.handlerPhase, "returning READY, no file need read from the zip");
            logger.debug(this.handlerPhase, "_message=\"handler read data, ready to return\", context.list_size={} total_read={} total_size={} context={} fileSize={}", Integer.valueOf(getHandlerContext().getEventList().size()), Long.valueOf(getTotalReadFromJournal()), Long.valueOf(getTotalSizeFromJournal()), getHandlerContext(), Long.valueOf(getZipFileHandlerJournal().getTotalSize()));
            this.totalRead = 0L;
            getZipFileHandlerJournal().reset();
            this.readEntries = 0;
            actionEvent.getHeaders().put(ActionEventHeaderConstants.READ_COMPLETE, Boolean.TRUE.toString());
            return ActionEvent.Status.READY;
        }
        logger.debug(this.handlerPhase, "_message=\"done reading file={}, there might be more files to process, returning CALLBACK\" handler_id={} headers_from_file_handler={}", this.currentFile.getAbsolutePath(), getId(), actionEvent.getHeaders());
        this.readEntries++;
        getZipFileHandlerJournal().setTotalRead(0L);
        getZipFileHandlerJournal().setTotalSize(0L);
        getZipFileHandlerJournal().setEntryName(null);
        getZipFileHandlerJournal().setSrcFileName(null);
        return ActionEvent.Status.CALLBACK;
    }

    private File getQueuedRecordsFromRuntimeInfos() throws RuntimeInfoStoreException, IOException {
        RuntimeInfo oneQueuedRuntimeInfo = getOneQueuedRuntimeInfo(this.runtimeInfoStore, this.entityName);
        if (oneQueuedRuntimeInfo == null && findAndAddRuntimeInfoRecords()) {
            oneQueuedRuntimeInfo = getOneQueuedRuntimeInfo(this.runtimeInfoStore, this.entityName);
        }
        if (oneQueuedRuntimeInfo == null) {
            return null;
        }
        File file = new File(oneQueuedRuntimeInfo.getInputDescriptor());
        logger.debug(this.handlerPhase, "absolute_path={} file_name_for_descriptor={}", file.getAbsolutePath(), file.getName());
        updateRuntimeInfo(this.runtimeInfoStore, this.entityName, oneQueuedRuntimeInfo.getInputDescriptor(), RuntimeInfoStore.Status.STARTED);
        return file;
    }

    private boolean findAndAddRuntimeInfoRecords() throws RuntimeInfoStoreException {
        List<String> availableFiles = getAvailableFiles();
        if (availableFiles == null || availableFiles.isEmpty()) {
            return false;
        }
        Iterator<String> it = availableFiles.iterator();
        while (it.hasNext()) {
            queueRuntimeInfo(this.runtimeInfoStore, this.entityName, it.next());
        }
        return true;
    }

    protected File getNextFileToProcess() throws RuntimeInfoStoreException, HandlerException, IOException {
        if (this.dirtyRecords == null || this.dirtyRecords.isEmpty()) {
            logger.info(this.handlerPhase, "processing a clean record");
            this.processingDirty = false;
            this.currentFile = getQueuedRecordsFromRuntimeInfos();
        } else {
            RuntimeInfo remove = this.dirtyRecords.remove(0);
            logger.info(this.handlerPhase, "\"processing a dirty record\" dirtyRecord=\"{}\"", remove);
            this.currentFile = new File(remove.getInputDescriptor());
            logger.debug(this.handlerPhase, "absolute_path={} file_name_for_descriptor={}", this.currentFile.getAbsolutePath(), this.currentFile.getName());
            this.processingDirty = true;
        }
        return this.currentFile;
    }

    private int getNumberOfFileInZip(File file) {
        try {
            this.zipFile = new ZipFile(file);
            return this.zipFile.size();
        } catch (ZipException e) {
            logger.warn(this.handlerPhase, "_message=\"Unable to open zip file\" currentZipFile={}", file, e);
            return 0;
        } catch (IOException e2) {
            logger.warn(this.handlerPhase, "_message=\"error during getting number of files in zip\" currentZipFile={}", file, e2);
            return 0;
        }
    }

    private String fileNameFromZip(Enumeration<? extends ZipEntry> enumeration) {
        this.zipFileEntry = enumeration.nextElement();
        return this.zipFileEntry.getName();
    }

    protected List<String> getAvailableFiles() {
        try {
            logger.debug(this.handlerPhase, "getting next file list, fileLocation=\"{}\" fileNamePattern=\"{}\"", this.fileLocation, this.fileNamePattern);
            return this.fileHelper.getAvailableFiles(this.fileLocation, this.fileNamePattern);
        } catch (IllegalArgumentException e) {
            logger.warn(this.handlerPhase, "_message=\"no file found, make sure fileLocation and fileNamePattern are correct\" fileLocation={} fileNamePattern={}", this.fileLocation, this.fileNamePattern, e);
            return null;
        }
    }

    @Override // io.bigdime.core.handler.AbstractHandler, io.bigdime.core.Handler
    public void shutdown() {
        super.shutdown();
        shutdown0();
    }

    private void shutdown0() {
        throw new NotImplementedException();
    }

    private long getTotalReadFromJournal() throws HandlerException {
        return getZipFileHandlerJournal().getTotalRead();
    }

    private long getTotalSizeFromJournal() throws HandlerException {
        return getZipFileHandlerJournal().getTotalSize();
    }

    private int getNoOfEntriesFromJournal() throws HandlerException {
        return getZipFileHandlerJournal().getTotalEntries();
    }

    private int getReadEntriesFromJournal() throws HandlerException {
        return getZipFileHandlerJournal().getReadEntries();
    }

    private ZipFileHandlerJournal getZipFileHandlerJournal() throws HandlerException {
        return (ZipFileHandlerJournal) getNonNullJournal(ZipFileHandlerJournal.class);
    }

    private boolean readAllFromFile() throws HandlerException {
        return getTotalReadFromJournal() == getTotalSizeFromJournal();
    }

    private boolean readAllFromZip() throws HandlerException {
        return getReadEntriesFromJournal() == getNoOfEntriesFromJournal();
    }

    @Override // io.bigdime.core.handler.AbstractHandler
    protected String getHandlerPhase() {
        return this.handlerPhase;
    }

    public String getFileNamePattern() {
        return this.fileNamePattern;
    }

    public String getEntityName() {
        return this.entityName;
    }

    public String getBasePath() {
        return this.basePath;
    }
}
