/*
 * Decompiled with CFR 0.152.
 */
package com.emc.mongoose.base.load.step.client;

import com.emc.mongoose.base.Exceptions;
import com.emc.mongoose.base.env.FsUtil;
import com.emc.mongoose.base.load.step.file.FileManager;
import com.emc.mongoose.base.load.step.service.file.FileManagerService;
import com.emc.mongoose.base.logging.LogContextThreadFactory;
import com.emc.mongoose.base.logging.LogUtil;
import com.emc.mongoose.base.logging.Loggers;
import com.github.akurilov.commons.system.SizeInBytes;
import com.github.akurilov.confuse.Config;
import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.logging.log4j.CloseableThreadContext;
import org.apache.logging.log4j.Level;

public final class ItemOutputFileAggregator
implements AutoCloseable {
    private final String loadStepId;
    private final String itemOutputFile;
    private final Map<FileManager, String> itemOutputFileSlices;

    public ItemOutputFileAggregator(String loadStepId, List<FileManager> fileMgrs, List<Config> configSlices, String itemOutputFile) {
        this.loadStepId = loadStepId;
        this.itemOutputFile = itemOutputFile;
        int sliceCount = fileMgrs.size();
        this.itemOutputFileSlices = new HashMap<FileManager, String>(sliceCount);
        for (int i = 0; i < sliceCount; ++i) {
            FileManager fileMgr = fileMgrs.get(i);
            if (i == 0) {
                if (fileMgr instanceof FileManagerService) {
                    throw new AssertionError((Object)("File manager @ index #" + i + " shouldn't be a service"));
                }
                continue;
            }
            if (fileMgr instanceof FileManagerService) {
                try {
                    String remoteItemOutputFileName = fileMgr.newTmpFileName();
                    configSlices.get(i).val("item-output-file", remoteItemOutputFileName);
                    this.itemOutputFileSlices.put(fileMgr, remoteItemOutputFileName);
                    Loggers.MSG.debug("\"{}\": new tmp item output file \"{}\"", (Object)fileMgr, (Object)remoteItemOutputFileName);
                }
                catch (Exception e) {
                    Exceptions.throwUncheckedIfInterrupted(e);
                    LogUtil.exception(Level.ERROR, e, "Failed to get the new temporary file name for the file manager service \"{}\"", fileMgr);
                }
                continue;
            }
            throw new AssertionError((Object)("File manager @ index #" + i + " should be a service"));
        }
    }

    @Override
    public final void close() {
        try {
            this.collectToLocal();
        }
        finally {
            this.itemOutputFileSlices.clear();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void collectToLocal() {
        LongAdder byteCounter = new LongAdder();
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(2, new LogContextThreadFactory("collectItemOutputFileWorker", true));
        CountDownLatch finishLatch = new CountDownLatch(1);
        Path itemOutputPath = Paths.get(this.itemOutputFile, new String[0]);
        FsUtil.createParentDirsIfNotExist(itemOutputPath);
        executor.submit(() -> {
            try (OutputStream localItemOutput = Files.newOutputStream(itemOutputPath, FileManager.APPEND_OPEN_OPTIONS);){
                ReentrantLock localItemOutputLock = new ReentrantLock();
                this.itemOutputFileSlices.entrySet().parallelStream().filter(entry -> entry.getKey() instanceof FileManagerService).forEach(entry -> {
                    FileManager fileMgr = (FileManager)entry.getKey();
                    String remoteItemOutputFileName = (String)entry.getValue();
                    ItemOutputFileAggregator.transferToLocal(fileMgr, remoteItemOutputFileName, localItemOutput, localItemOutputLock, byteCounter);
                    try {
                        fileMgr.deleteFile(remoteItemOutputFileName);
                    }
                    catch (Exception e) {
                        Exceptions.throwUncheckedIfInterrupted(e);
                        LogUtil.exception(Level.WARN, e, "{}: failed to delete the file \"{}\" @ file manager \"{}\"", this.loadStepId, remoteItemOutputFileName, fileMgr);
                    }
                });
            }
            catch (IOException e) {
                LogUtil.exception(Level.WARN, e, "{}: failed to open the local item output file \"{}\" for appending", this.loadStepId, this.itemOutputFile);
            }
            finally {
                finishLatch.countDown();
            }
        });
        executor.scheduleAtFixedRate(() -> Loggers.MSG.info("\"{}\" <- transferred {} of the output items data...", (Object)this.itemOutputFile, (Object)SizeInBytes.formatFixedSize(byteCounter.longValue())), 0L, 10000L, TimeUnit.MILLISECONDS);
        try {
            finishLatch.await();
        }
        catch (InterruptedException e) {
            com.github.akurilov.commons.lang.Exceptions.throwUnchecked(e);
        }
        finally {
            executor.shutdownNow();
            Loggers.MSG.info("\"{}\" <- transferred {} of the output items data", (Object)this.itemOutputFile, (Object)SizeInBytes.formatFixedSize(byteCounter.longValue()));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void transferToLocal(FileManager fileMgr, String remoteItemOutputFileName, OutputStream localItemOutput, Lock localItemOutputLock, LongAdder byteCounter) {
        long transferredByteCount = 0L;
        try {
            CloseableThreadContext.Instance logCtx = CloseableThreadContext.put("class_name", ItemOutputFileAggregator.class.getSimpleName());
            try {
                while (true) {
                    byte[] buff = fileMgr.readFromFile(remoteItemOutputFileName, transferredByteCount);
                    localItemOutputLock.lock();
                    try {
                        localItemOutput.write(buff);
                    }
                    finally {
                        localItemOutputLock.unlock();
                    }
                    transferredByteCount += (long)buff.length;
                    byteCounter.add(buff.length);
                }
            }
            catch (Throwable throwable) {
                if (logCtx != null) {
                    try {
                        logCtx.close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                }
                throw throwable;
            }
        }
        catch (EOFException logCtx) {
            Loggers.MSG.debug("{} of items output data transferred from \"{}\" @ \"{}\" to \"{}\"", (Object)SizeInBytes.formatFixedSize(transferredByteCount), (Object)remoteItemOutputFileName, (Object)fileMgr, (Object)localItemOutput);
        }
        catch (IOException e) {
            try {
                LogUtil.exception(Level.WARN, e, "Remote items output file transfer failure", new Object[0]);
            }
            catch (Throwable throwable) {
                throw throwable;
            }
            finally {
                Loggers.MSG.debug("{} of items output data transferred from \"{}\" @ \"{}\" to \"{}\"", (Object)SizeInBytes.formatFixedSize(transferredByteCount), (Object)remoteItemOutputFileName, (Object)fileMgr, (Object)localItemOutput);
            }
        }
    }
}

