/*
 * Decompiled with CFR 0.152.
 */
package com.emc.mongoose.base.load.step.client;

import com.emc.mongoose.base.Exceptions;
import com.emc.mongoose.base.concurrent.ServiceTaskExecutor;
import com.emc.mongoose.base.load.step.file.FileManager;
import com.emc.mongoose.base.logging.LogUtil;
import com.emc.mongoose.base.logging.Loggers;
import com.github.akurilov.confuse.Config;
import com.github.akurilov.fiber4j.ExclusiveFiberBase;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.CloseableThreadContext;
import org.apache.logging.log4j.Level;

public final class TempInputTextFileSlicer
implements AutoCloseable {
    private final String loadStepId;
    private final Map<FileManager, String> fileSlices;

    public TempInputTextFileSlicer(String loadStepId, String srcFileName, List<FileManager> fileMgrs, String configPath, List<Config> configSlices, int batchSize) {
        this.loadStepId = loadStepId;
        int sliceCount = configSlices.size();
        this.fileSlices = new HashMap<FileManager, String>(sliceCount);
        for (int i = 0; i < sliceCount; ++i) {
            try {
                FileManager fileMgr = fileMgrs.get(i);
                String fileName = fileMgr.newTmpFileName();
                this.fileSlices.put(fileMgr, fileName);
                Config configSlice = configSlices.get(i);
                configSlice.val(configPath, fileName);
                continue;
            }
            catch (Exception e) {
                LogUtil.exception(Level.ERROR, e, "Failed to get the input text file name for the step slice #" + i, new Object[0]);
            }
        }
        try (CloseableThreadContext.Instance logCtx = CloseableThreadContext.put("step_id", loadStepId).put("class_name", this.getClass().getSimpleName());){
            Loggers.MSG.info("{}: scatter the lines from the input text file \"{}\"...", (Object)loadStepId, (Object)srcFileName);
            TempInputTextFileSlicer.scatterLines(srcFileName, sliceCount, fileMgrs, this.fileSlices, batchSize);
            Loggers.MSG.info("{}: scatter the lines from the input text file \"{}\" finished", (Object)loadStepId, (Object)srcFileName);
        }
        catch (Throwable cause) {
            Exceptions.throwUncheckedIfInterrupted(cause);
            LogUtil.exception(Level.ERROR, cause, "{}: failed to scatter the lines from the file \"{}\"", loadStepId, srcFileName);
        }
    }

    @Override
    public final void close() {
        this.fileSlices.entrySet().parallelStream().forEach(entry -> {
            FileManager fileMgr = (FileManager)entry.getKey();
            String fileName = (String)entry.getValue();
            try {
                fileMgr.deleteFile(fileName);
            }
            catch (Exception e) {
                Exceptions.throwUncheckedIfInterrupted(e);
                LogUtil.exception(Level.WARN, e, "{}: failed to delete the file \"{}\" @ file manager \"{}\"", this.loadStepId, fileName, fileMgr);
            }
        });
        this.fileSlices.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    static void scatterLines(String srcFileName, int sliceCount, List<FileManager> fileMgrs, Map<FileManager, String> fileSlices, int batchSize) throws IOException {
        AtomicBoolean inputFinishFlag = new AtomicBoolean(false);
        ArrayList<BlockingQueue<String>> lineQueues = new ArrayList<BlockingQueue<String>>(sliceCount);
        for (int i = 0; i < sliceCount; ++i) {
            lineQueues.add(new ArrayBlockingQueue(batchSize));
        }
        ArrayList<ExclusiveFiberBase> tasks = new ArrayList<ExclusiveFiberBase>(sliceCount + 1);
        tasks.add(new ReadTask(inputFinishFlag, lineQueues, srcFileName, sliceCount));
        CountDownLatch writeFinishCountDown = new CountDownLatch(sliceCount);
        for (int i = 0; i < sliceCount; ++i) {
            BlockingQueue lineQueue = (BlockingQueue)lineQueues.get(i);
            FileManager fileMgr = fileMgrs.get(i);
            String dstFileName = fileSlices.get(fileMgr);
            tasks.add(new WriteTask(inputFinishFlag, writeFinishCountDown, lineQueue, fileMgr, dstFileName, batchSize));
        }
        tasks.forEach(task -> {
            try {
                task.start();
            }
            catch (RemoteException remoteException) {
                // empty catch block
            }
        });
        try {
            writeFinishCountDown.await();
        }
        catch (InterruptedException e) {
            com.github.akurilov.commons.lang.Exceptions.throwUnchecked(e);
        }
        finally {
            tasks.forEach(task -> {
                try {
                    task.close();
                }
                catch (IOException iOException) {
                    // empty catch block
                }
            });
            tasks.clear();
        }
    }

    private static final class WriteTask
    extends ExclusiveFiberBase {
        private final AtomicBoolean inputFinishFlag;
        private final CountDownLatch writeFinishCountDown;
        private final BlockingQueue<String> lineQueue;
        private final FileManager fileMgr;
        private final String dstFileName;
        private final int batchSize;
        private final List<String> lines;
        private final ByteArrayOutputStream linesByteBuff;
        private final BufferedWriter linesWriter;
        private long lineCount = 0L;

        public WriteTask(AtomicBoolean inputFinishFlag, CountDownLatch writeFinishCountDown, BlockingQueue<String> lineQueue, FileManager fileMgr, String dstFileName, int batchSize) {
            super(ServiceTaskExecutor.INSTANCE);
            this.inputFinishFlag = inputFinishFlag;
            this.writeFinishCountDown = writeFinishCountDown;
            this.lineQueue = lineQueue;
            this.fileMgr = fileMgr;
            this.dstFileName = dstFileName;
            this.batchSize = batchSize;
            this.lines = new ArrayList<String>(batchSize);
            this.linesByteBuff = new ByteArrayOutputStream();
            this.linesWriter = new BufferedWriter(new OutputStreamWriter(this.linesByteBuff));
        }

        @Override
        protected final void invokeTimedExclusively(long startTimeNanos) {
            int n = this.lineQueue.drainTo(this.lines, this.batchSize);
            if (n == 0 && this.inputFinishFlag.get()) {
                this.stop();
            } else {
                try {
                    for (int i = 0; i < n; ++i) {
                        this.linesWriter.write(this.lines.get(i));
                        this.linesWriter.newLine();
                    }
                    this.linesWriter.flush();
                    this.fileMgr.writeToFile(this.dstFileName, this.linesByteBuff.toByteArray());
                    this.lineCount += (long)n;
                }
                catch (IOException e) {
                    LogUtil.exception(Level.WARN, e, "Write task failure, destination file name: \"{}\", file manager: \"{}\"", this.dstFileName, this.fileMgr);
                    this.stop();
                }
                this.linesByteBuff.reset();
                this.lines.clear();
            }
        }

        @Override
        protected final void doStop() {
            this.writeFinishCountDown.countDown();
            Loggers.MSG.debug("Write task finish, written line count: {}, destination file name: \"{}\", file manager: \"{}\"", (Object)this.lineCount, (Object)this.dstFileName, (Object)this.fileMgr);
        }

        @Override
        protected final void doClose() throws IOException {
            super.doClose();
            this.lines.clear();
            this.linesByteBuff.close();
            this.linesWriter.close();
        }
    }

    private static final class ReadTask
    extends ExclusiveFiberBase {
        private final AtomicBoolean inputFinishFlag;
        private final List<BlockingQueue<String>> lineQueues;
        private final String srcFileName;
        private final int sliceCount;
        private final BufferedReader lineReader;
        private long lineCount = 0L;
        private long lastProgressOutputTimeMillis = System.currentTimeMillis();
        private volatile String pendingLine = null;

        public ReadTask(AtomicBoolean inputFinishFlag, List<BlockingQueue<String>> lineQueues, String srcFileName, int sliceCount) throws IOException {
            super(ServiceTaskExecutor.INSTANCE);
            this.inputFinishFlag = inputFinishFlag;
            this.lineQueues = lineQueues;
            this.srcFileName = srcFileName;
            this.sliceCount = sliceCount;
            this.lineReader = Files.newBufferedReader(Paths.get(srcFileName, new String[0]));
        }

        @Override
        protected final void invokeTimedExclusively(long startTimeNanos) {
            try {
                while (System.nanoTime() - startTimeNanos < 10000000L) {
                    String line;
                    if (System.currentTimeMillis() - this.lastProgressOutputTimeMillis > 10000L) {
                        Loggers.MSG.info("Read task progress: scattered {} lines from the input file \"{}\"...", (Object)this.lineCount, (Object)this.srcFileName);
                        this.lastProgressOutputTimeMillis = System.currentTimeMillis();
                    }
                    if (this.pendingLine == null) {
                        line = this.lineReader.readLine();
                    } else {
                        line = this.pendingLine;
                        this.pendingLine = null;
                    }
                    if (line == null) {
                        this.stop();
                    } else {
                        if (this.lineQueues.get((int)(this.lineCount % (long)this.sliceCount)).offer(line)) {
                            ++this.lineCount;
                            continue;
                        }
                        this.pendingLine = line;
                    }
                    break;
                }
            }
            catch (IOException e) {
                LogUtil.exception(Level.WARN, e, "Read task failure, source file name: \"{}\"", this.srcFileName);
                this.stop();
            }
        }

        @Override
        protected final void doStop() {
            super.doStop();
            this.inputFinishFlag.set(true);
            Loggers.MSG.info("Read task finish: scattered {} lines from the input file \"{}\"", (Object)this.lineCount, (Object)this.srcFileName);
        }

        @Override
        protected final void doClose() throws IOException {
            super.doClose();
            this.lineReader.close();
        }
    }
}

