/*
 * Decompiled with CFR 0.152.
 */
package org.ikasan.connector.util.chunking.process;

import java.io.IOException;
import java.io.InputStream;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import org.apache.log4j.Logger;
import org.ikasan.common.util.checksum.DigestChecksum;
import org.ikasan.common.util.checksum.Md5Checksum;
import org.ikasan.connector.util.chunking.io.ChunkingInputStreamConsumer;
import org.ikasan.connector.util.chunking.io.ChunkingOutputStream;
import org.ikasan.connector.util.chunking.model.FileChunk;
import org.ikasan.connector.util.chunking.model.FileChunkHeader;
import org.ikasan.connector.util.chunking.model.FileConstituentHandle;
import org.ikasan.connector.util.chunking.model.dao.ChunkLoadException;
import org.ikasan.connector.util.chunking.model.dao.FileChunkDao;
import org.ikasan.connector.util.chunking.process.ChunkException;
import org.ikasan.connector.util.chunking.process.ChunkHandler;
import org.ikasan.connector.util.chunking.process.Chunker;
import org.ikasan.connector.util.chunking.provider.ChunkableDataProvider;
import org.ikasan.connector.util.chunking.provider.ChunkableDataProviderAccessException;
import org.ikasan.connector.util.chunking.provider.ChunkableDataSourceException;

public class ChunkerImpl
implements Chunker,
ChunkHandler {
    private static Logger logger = Logger.getLogger(ChunkerImpl.class);
    private ChunkableDataProvider dataProvider;
    public static final SimpleDateFormat chunkTimeStampFormat = new SimpleDateFormat("yyyyMMddHHmmss");
    public static final long ONE_HOUR = 3600000L;
    private long maxResumableAge = 3600000L;
    protected DigestChecksum checksum = new Md5Checksum();
    protected FileChunkDao dao;
    protected FileChunkHeader fileChunkHeader = null;
    protected ChunkingInputStreamConsumer consumer;
    protected int streamMode = 2;
    private String clientId;

    public ChunkerImpl(FileChunkDao dao, ChunkableDataProvider dataProvider, int streamMode, String clientId) {
        this.dao = dao;
        this.dataProvider = dataProvider;
        this.streamMode = streamMode;
        this.consumer = new ChunkingInputStreamConsumer(this);
        this.clientId = clientId;
    }

    public ChunkerImpl(FileChunkDao dao, ChunkableDataProvider dataProvider, int streamMode) {
        this(dao, dataProvider, streamMode, null);
    }

    public void chunkFile(String remoteDir, String fileName, int chunkSize) throws ChunkException {
        if (chunkSize <= 0) {
            throw new ChunkException("chunkSize must be greater than 0");
        }
        try {
            this.dataProvider.connect();
        }
        catch (ChunkableDataProviderAccessException e1) {
            throw new ChunkException("Exception connecting to data provider ", e1);
        }
        try {
            long fileSize = this.dataProvider.getFileSize(remoteDir, fileName);
            long noOfChunks = ChunkerImpl.getExpectedNumberOfChunks(chunkSize, fileSize);
            long startingChunk = this.initialise(fileName, noOfChunks);
            if (startingChunk > 0L) {
                logger.info((Object)("Resuming at chunk:" + startingChunk));
            }
            this.sourceChunks(remoteDir, fileName, chunkSize, noOfChunks, startingChunk);
            this.afterLastChunk();
        }
        catch (ChunkableDataSourceException cse) {
            throw new ChunkException("Exception accessing remote resource: remoteDir=" + remoteDir + ", fileName=" + fileName, cse);
        }
        catch (ChunkLoadException e) {
            throw new ChunkException("Exception reloading an existing chunk prior to resume", e);
        }
        try {
            this.dataProvider.disconnect();
        }
        catch (ChunkableDataProviderAccessException e) {
            throw new ChunkException("Exception disconnecting from data provider ", e);
        }
    }

    public void handleChunk(byte[] chunk, long ordinal, long sequenceLength) {
        long chunkNumber = ordinal;
        logger.debug((Object)("handling chunk [" + chunkNumber + " of " + sequenceLength + "]"));
        FileChunk fileChunk = new FileChunk(this.fileChunkHeader, chunkNumber++, chunk);
        this.dao.save(fileChunk);
        this.checksum.update(chunk);
    }

    protected void createHeader(String fileName, Long noOfChunks) {
        Long chunkTimeStamp = Long.parseLong(chunkTimeStampFormat.format(new Date()));
        this.fileChunkHeader = new FileChunkHeader(noOfChunks, null, fileName, chunkTimeStamp, this.clientId);
        this.dao.save(this.fileChunkHeader);
    }

    protected void afterLastChunk() {
        this.fileChunkHeader.setInternalMd5Hash(this.checksum.digestToString());
        this.dao.save(this.fileChunkHeader);
    }

    protected long initialise(String fileName, long noOfChunks) throws ChunkLoadException {
        List<FileConstituentHandle> existingChunks = this.dao.findChunks(fileName, null, noOfChunks, this.maxResumableAge);
        logger.info((Object)("existing chunks : [" + existingChunks + "]"));
        long startingChunk = 0L;
        if (existingChunks.size() != 0 && (long)existingChunks.size() != noOfChunks) {
            startingChunk = existingChunks.size();
        }
        this.checksum.reset();
        if (startingChunk == 0L) {
            this.createHeader(fileName, noOfChunks);
        } else {
            this.fileChunkHeader = existingChunks.get(0).getFileChunkHeader();
            for (FileConstituentHandle fileConstituentHandle : existingChunks) {
                FileChunk fileChunk = this.dao.load(fileConstituentHandle);
                this.checksum.update(fileChunk.getContent());
            }
        }
        return startingChunk;
    }

    protected static long getExpectedNumberOfChunks(int chunkSize, long fileSize) {
        long complete_chunks = fileSize / (long)chunkSize;
        int partialChunks = 0;
        if (fileSize % (long)chunkSize > 0L) {
            partialChunks = 1;
        }
        return complete_chunks + (long)partialChunks;
    }

    protected void sourceChunksWithInputStream(String remoteDir, String fileName, int chunkSize, long noOfChunks) throws ChunkableDataSourceException {
        InputStream inputStream = this.dataProvider.sourceChunkableData(remoteDir, fileName);
        this.consumer.consumeInputStream(inputStream, chunkSize, noOfChunks);
    }

    protected void sourceChunksWithOutputStream(String remoteDir, String fileName, int chunkSize, long noOfChunks, long startingChunk) throws ChunkableDataSourceException {
        ChunkingOutputStream chunkingOutputStream = new ChunkingOutputStream(chunkSize, this, noOfChunks, startingChunk);
        long offset = startingChunk * (long)chunkSize;
        logger.debug((Object)("SftpDataProvider.sourceChunksWithOutputStream called with remoteDir=" + remoteDir + ", fileName = " + fileName));
        this.dataProvider.sourceChunkableData(remoteDir, fileName, chunkingOutputStream, offset);
        try {
            chunkingOutputStream.close();
        }
        catch (IOException e) {
            throw new ChunkableDataSourceException("Exception caught trying to close the OutputStream", e);
        }
    }

    protected void sourceChunks(String remoteDir, String fileName, int chunkSize, long noOfChunks, long startingChunk) throws ChunkableDataSourceException {
        if (this.streamMode == 1) {
            if (startingChunk > 0L) {
                throw new ChunkableDataSourceException("Cannot resume in InputStream mode");
            }
            this.sourceChunksWithInputStream(remoteDir, fileName, chunkSize, noOfChunks);
        } else if (this.streamMode == 2) {
            this.sourceChunksWithOutputStream(remoteDir, fileName, chunkSize, noOfChunks, startingChunk);
        } else {
            throw new ChunkableDataSourceException("Unknown mode:" + this.streamMode);
        }
    }

    public FileChunkHeader getFileChunkHeader() {
        return this.fileChunkHeader;
    }
}

