package com.emc.mongoose.load.step.client;

import com.emc.mongoose.Constants;
import com.emc.mongoose.concurrent.ServiceTaskExecutor;
import com.emc.mongoose.exception.InterruptRunException;
import com.emc.mongoose.load.step.file.FileManager;
import com.emc.mongoose.logging.LogUtil;
import com.emc.mongoose.logging.Loggers;
import com.github.akurilov.confuse.Config;
import com.github.akurilov.fiber4j.ExclusiveFiberBase;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.CloseableThreadContext;
import org.apache.logging.log4j.Level;

/* loaded from: input_file:com/emc/mongoose/load/step/client/TempInputTextFileSlicer.class */
public final class TempInputTextFileSlicer implements AutoCloseable {
    private final String loadStepId;
    private final Map<FileManager, String> fileSlices;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/emc/mongoose/load/step/client/TempInputTextFileSlicer$ReadTask.class */
    public static final class ReadTask extends ExclusiveFiberBase {
        private final AtomicBoolean inputFinishFlag;
        private final List<BlockingQueue<String>> lineQueues;
        private final String srcFileName;
        private final int sliceCount;
        private final BufferedReader lineReader;
        private long lineCount;
        private long lastProgressOutputTimeMillis;

        public ReadTask(AtomicBoolean atomicBoolean, List<BlockingQueue<String>> list, String str, int i) throws IOException {
            super(ServiceTaskExecutor.INSTANCE);
            this.lineCount = 0L;
            this.lastProgressOutputTimeMillis = System.currentTimeMillis();
            this.inputFinishFlag = atomicBoolean;
            this.lineQueues = list;
            this.srcFileName = str;
            this.sliceCount = i;
            this.lineReader = Files.newBufferedReader(Paths.get(str, new String[0]));
        }

        /* JADX WARN: Code restructure failed: missing block: B:11:0x0043, code lost:
        
            stop();
         */
        @Override // com.github.akurilov.fiber4j.ExclusiveFiberBase
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        protected final void invokeTimedExclusively(long r9) {
            /*
                r8 = this;
            L0:
                long r0 = java.lang.System.nanoTime()     // Catch: java.io.IOException -> L78 java.lang.InterruptedException -> L95
                r1 = r9
                long r0 = r0 - r1
                r1 = 10000000(0x989680, double:4.9406565E-317)
                int r0 = (r0 > r1 ? 1 : (r0 == r1 ? 0 : -1))
                if (r0 >= 0) goto L75
                long r0 = java.lang.System.currentTimeMillis()     // Catch: java.io.IOException -> L78 java.lang.InterruptedException -> L95
                r1 = r8
                long r1 = r1.lastProgressOutputTimeMillis     // Catch: java.io.IOException -> L78 java.lang.InterruptedException -> L95
                long r0 = r0 - r1
                r1 = 10000(0x2710, double:4.9407E-320)
                int r0 = (r0 > r1 ? 1 : (r0 == r1 ? 0 : -1))
                if (r0 <= 0) goto L37
                org.apache.logging.log4j.Logger r0 = com.emc.mongoose.logging.Loggers.MSG     // Catch: java.io.IOException -> L78 java.lang.InterruptedException -> L95
                java.lang.String r1 = "Read task progress: scattered {} lines from the input file \"{}\"..."
                r2 = r8
                long r2 = r2.lineCount     // Catch: java.io.IOException -> L78 java.lang.InterruptedException -> L95
                java.lang.Long r2 = java.lang.Long.valueOf(r2)     // Catch: java.io.IOException -> L78 java.lang.InterruptedException -> L95
                r3 = r8
                java.lang.String r3 = r3.srcFileName     // Catch: java.io.IOException -> L78 java.lang.InterruptedException -> L95
                r0.info(r1, r2, r3)     // Catch: java.io.IOException -> L78 java.lang.InterruptedException -> L95
                r0 = r8
                long r1 = java.lang.System.currentTimeMillis()     // Catch: java.io.IOException -> L78 java.lang.InterruptedException -> L95
                r0.lastProgressOutputTimeMillis = r1     // Catch: java.io.IOException -> L78 java.lang.InterruptedException -> L95
            L37:
                r0 = r8
                java.io.BufferedReader r0 = r0.lineReader     // Catch: java.io.IOException -> L78 java.lang.InterruptedException -> L95
                java.lang.String r0 = r0.readLine()     // Catch: java.io.IOException -> L78 java.lang.InterruptedException -> L95
                r11 = r0
                r0 = r11
                if (r0 != 0) goto L4b
                r0 = r8
                com.github.akurilov.commons.concurrent.AsyncRunnableBase r0 = r0.stop()     // Catch: java.io.IOException -> L78 java.lang.InterruptedException -> L95
                goto L75
            L4b:
                r0 = r8
                java.util.List<java.util.concurrent.BlockingQueue<java.lang.String>> r0 = r0.lineQueues     // Catch: java.io.IOException -> L78 java.lang.InterruptedException -> L95
                r1 = r8
                long r1 = r1.lineCount     // Catch: java.io.IOException -> L78 java.lang.InterruptedException -> L95
                r2 = r8
                int r2 = r2.sliceCount     // Catch: java.io.IOException -> L78 java.lang.InterruptedException -> L95
                long r2 = (long) r2     // Catch: java.io.IOException -> L78 java.lang.InterruptedException -> L95
                long r1 = r1 % r2
                int r1 = (int) r1     // Catch: java.io.IOException -> L78 java.lang.InterruptedException -> L95
                java.lang.Object r0 = r0.get(r1)     // Catch: java.io.IOException -> L78 java.lang.InterruptedException -> L95
                java.util.concurrent.BlockingQueue r0 = (java.util.concurrent.BlockingQueue) r0     // Catch: java.io.IOException -> L78 java.lang.InterruptedException -> L95
                r1 = r11
                r0.put(r1)     // Catch: java.io.IOException -> L78 java.lang.InterruptedException -> L95
                r0 = r8
                r1 = r0
                long r1 = r1.lineCount     // Catch: java.io.IOException -> L78 java.lang.InterruptedException -> L95
                r2 = 1
                long r1 = r1 + r2
                r0.lineCount = r1     // Catch: java.io.IOException -> L78 java.lang.InterruptedException -> L95
                goto L0
            L75:
                goto La4
            L78:
                r11 = move-exception
                org.apache.logging.log4j.Level r0 = org.apache.logging.log4j.Level.WARN
                r1 = r11
                java.lang.String r2 = "Read task failure, source file name: \"{}\""
                r3 = 1
                java.lang.Object[] r3 = new java.lang.Object[r3]
                r4 = r3
                r5 = 0
                r6 = r8
                java.lang.String r6 = r6.srcFileName
                r4[r5] = r6
                com.emc.mongoose.logging.LogUtil.exception(r0, r1, r2, r3)
                r0 = r8
                com.github.akurilov.commons.concurrent.AsyncRunnableBase r0 = r0.stop()
                goto La4
            L95:
                r11 = move-exception
                r0 = r8
                com.github.akurilov.commons.concurrent.AsyncRunnableBase r0 = r0.stop()
                com.emc.mongoose.exception.InterruptRunException r0 = new com.emc.mongoose.exception.InterruptRunException
                r1 = r0
                r2 = r11
                r1.<init>(r2)
                throw r0
            La4:
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: com.emc.mongoose.load.step.client.TempInputTextFileSlicer.ReadTask.invokeTimedExclusively(long):void");
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // com.github.akurilov.fiber4j.FiberBase, com.github.akurilov.commons.concurrent.AsyncRunnableBase
        public final void doStop() {
            super.doStop();
            this.inputFinishFlag.set(true);
            Loggers.MSG.info("Read task finish: scattered {} lines from the input file \"{}\"", Long.valueOf(this.lineCount), this.srcFileName);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // com.github.akurilov.commons.concurrent.AsyncRunnableBase
        public final void doClose() throws IOException {
            super.doClose();
            this.lineReader.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/emc/mongoose/load/step/client/TempInputTextFileSlicer$WriteTask.class */
    public static final class WriteTask extends ExclusiveFiberBase {
        private final AtomicBoolean inputFinishFlag;
        private final CountDownLatch writeFinishCountDown;
        private final BlockingQueue<String> lineQueue;
        private final FileManager fileMgr;
        private final String dstFileName;
        private final int batchSize;
        private final List<String> lines;
        private final ByteArrayOutputStream linesByteBuff;
        private final BufferedWriter linesWriter;
        private long lineCount;

        public WriteTask(AtomicBoolean atomicBoolean, CountDownLatch countDownLatch, BlockingQueue<String> blockingQueue, FileManager fileManager, String str, int i) {
            super(ServiceTaskExecutor.INSTANCE);
            this.lineCount = 0L;
            this.inputFinishFlag = atomicBoolean;
            this.writeFinishCountDown = countDownLatch;
            this.lineQueue = blockingQueue;
            this.fileMgr = fileManager;
            this.dstFileName = str;
            this.batchSize = i;
            this.lines = new ArrayList(i);
            this.linesByteBuff = new ByteArrayOutputStream();
            this.linesWriter = new BufferedWriter(new OutputStreamWriter(this.linesByteBuff));
        }

        @Override // com.github.akurilov.fiber4j.ExclusiveFiberBase
        protected final void invokeTimedExclusively(long j) {
            int drainTo = this.lineQueue.drainTo(this.lines, this.batchSize);
            if (drainTo == 0 && this.inputFinishFlag.get()) {
                stop();
                return;
            }
            for (int i = 0; i < drainTo; i++) {
                try {
                    this.linesWriter.write(this.lines.get(i));
                    this.linesWriter.newLine();
                } catch (IOException e) {
                    LogUtil.exception(Level.WARN, e, "Write task failure, destination file name: \"{}\", file manager: \"{}\"", this.dstFileName, this.fileMgr);
                    stop();
                }
            }
            this.linesWriter.flush();
            this.fileMgr.writeToFile(this.dstFileName, this.linesByteBuff.toByteArray());
            this.lineCount += drainTo;
            this.linesByteBuff.reset();
            this.lines.clear();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // com.github.akurilov.fiber4j.FiberBase, com.github.akurilov.commons.concurrent.AsyncRunnableBase
        public final void doStop() {
            this.writeFinishCountDown.countDown();
            Loggers.MSG.debug("Write task finish, written line count: {}, destination file name: \"{}\", file manager: \"{}\"", Long.valueOf(this.lineCount), this.dstFileName, this.fileMgr);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // com.github.akurilov.commons.concurrent.AsyncRunnableBase
        public final void doClose() throws IOException {
            super.doClose();
            this.lines.clear();
            this.linesByteBuff.close();
            this.linesWriter.close();
        }
    }

    public TempInputTextFileSlicer(String str, String str2, List<FileManager> list, String str3, List<Config> list2, int i) throws InterruptRunException {
        this.loadStepId = str;
        int size = list2.size();
        this.fileSlices = new HashMap(size);
        for (int i2 = 0; i2 < size; i2++) {
            try {
                FileManager fileManager = list.get(i2);
                String newTmpFileName = fileManager.newTmpFileName();
                this.fileSlices.put(fileManager, newTmpFileName);
                list2.get(i2).val(str3, newTmpFileName);
            } catch (Exception e) {
                LogUtil.exception(Level.ERROR, e, "Failed to get the input text file name for the step slice #" + i2, new Object[0]);
            }
        }
        try {
            CloseableThreadContext.Instance put = CloseableThreadContext.put(Constants.KEY_STEP_ID, str).put(Constants.KEY_CLASS_NAME, getClass().getSimpleName());
            Throwable th = null;
            try {
                try {
                    Loggers.MSG.info("{}: scatter the lines from the input text file \"{}\"...", str, str2);
                    scatterLines(str2, size, list, this.fileSlices, i);
                    Loggers.MSG.info("{}: scatter the lines from the input text file \"{}\" finished", str, str2);
                    if (put != null) {
                        if (0 != 0) {
                            try {
                                put.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            put.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (InterruptRunException e2) {
            throw e2;
        } catch (Throwable th4) {
            LogUtil.exception(Level.ERROR, th4, "{}: failed to scatter the lines from the file \"{}\"", str, str2);
        }
    }

    @Override // java.lang.AutoCloseable
    public final void close() {
        this.fileSlices.entrySet().parallelStream().forEach(entry -> {
            FileManager fileManager = (FileManager) entry.getKey();
            String str = (String) entry.getValue();
            try {
                fileManager.deleteFile(str);
            } catch (Exception e) {
                LogUtil.exception(Level.WARN, e, "{}: failed to delete the file \"{}\" @ file manager \"{}\"", this.loadStepId, str, fileManager);
            }
        });
        this.fileSlices.clear();
    }

    static void scatterLines(String str, int i, List<FileManager> list, Map<FileManager, String> map, int i2) throws InterruptRunException, IOException {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        ArrayList arrayList = new ArrayList(i);
        for (int i3 = 0; i3 < i; i3++) {
            arrayList.add(new ArrayBlockingQueue(i2));
        }
        ArrayList arrayList2 = new ArrayList(i + 1);
        arrayList2.add(new ReadTask(atomicBoolean, arrayList, str, i));
        CountDownLatch countDownLatch = new CountDownLatch(i);
        for (int i4 = 0; i4 < i; i4++) {
            BlockingQueue blockingQueue = (BlockingQueue) arrayList.get(i4);
            FileManager fileManager = list.get(i4);
            arrayList2.add(new WriteTask(atomicBoolean, countDownLatch, blockingQueue, fileManager, map.get(fileManager), i2));
        }
        arrayList2.forEach(asyncRunnable -> {
            try {
                asyncRunnable.start();
            } catch (RemoteException e) {
            }
        });
        try {
            try {
                countDownLatch.await();
                arrayList2.forEach(asyncRunnable2 -> {
                    try {
                        asyncRunnable2.close();
                    } catch (IOException e) {
                    }
                });
                arrayList2.clear();
            } catch (InterruptedException e) {
                throw new InterruptRunException(e);
            }
        } catch (Throwable th) {
            arrayList2.forEach(asyncRunnable22 -> {
                try {
                    asyncRunnable22.close();
                } catch (IOException e2) {
                }
            });
            arrayList2.clear();
            throw th;
        }
    }
}
