/*
 * Decompiled with CFR 0.152.
 */
package com.emc.mongoose.base.load.step.client;

import com.emc.mongoose.base.Exceptions;
import com.emc.mongoose.base.item.Item;
import com.emc.mongoose.base.load.step.file.FileManager;
import com.emc.mongoose.base.load.step.service.file.FileManagerService;
import com.emc.mongoose.base.logging.LogUtil;
import com.emc.mongoose.base.logging.Loggers;
import com.github.akurilov.commons.io.Input;
import com.github.akurilov.confuse.Config;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.logging.log4j.Level;

public final class ItemInputFileSlicer
implements AutoCloseable {
    private static final int APPROX_LINE_LENGTH = 64;
    private final String loadStepId;
    private final Map<FileManager, String> itemInputFileSlices;
    private final List<FileManager> fileMgrs;

    public <I extends Item> ItemInputFileSlicer(String loadStepId, List<FileManager> fileMgrs, List<Config> configSlices, Input<I> itemInput, int batchSize) {
        this.loadStepId = loadStepId;
        int sliceCount = configSlices.size();
        this.itemInputFileSlices = new HashMap<FileManager, String>(sliceCount);
        this.fileMgrs = fileMgrs;
        for (int i = 0; i < sliceCount; ++i) {
            try {
                FileManager fileMgr = fileMgrs.get(i);
                String itemInputFileName = fileMgr.newTmpFileName();
                this.itemInputFileSlices.put(fileMgr, itemInputFileName);
                Config configSlice = configSlices.get(i);
                configSlice.val("item-input-file", itemInputFileName);
                continue;
            }
            catch (Exception e) {
                Exceptions.throwUncheckedIfInterrupted(e);
                LogUtil.exception(Level.ERROR, e, "Failed to get the item input file name for the step slice #" + i, new Object[0]);
            }
        }
        try {
            Loggers.MSG.info("{}: scatter the items from the input \"{}\"...", (Object)loadStepId, (Object)itemInput);
            this.scatterItems(itemInput, batchSize);
        }
        catch (IOException e) {
            LogUtil.exception(Level.WARN, e, "{}: failed to use the item input", loadStepId);
        }
        catch (Throwable cause) {
            Exceptions.throwUncheckedIfInterrupted(cause);
            LogUtil.exception(Level.ERROR, cause, "{}: unexpected failure", loadStepId);
        }
    }

    @Override
    public final void close() {
        this.itemInputFileSlices.entrySet().parallelStream().forEach(entry -> {
            FileManager fileMgr = (FileManager)entry.getKey();
            String itemInputFileName = (String)entry.getValue();
            try {
                fileMgr.deleteFile(itemInputFileName);
            }
            catch (Exception e) {
                Exceptions.throwUncheckedIfInterrupted(e);
                LogUtil.exception(Level.WARN, e, "{}: failed to delete the file \"{}\" @ file manager \"{}\"", this.loadStepId, itemInputFileName, fileMgr);
            }
        });
        this.itemInputFileSlices.clear();
    }

    private <I extends Item> void scatterItems(Input<I> itemInput, int batchSize) throws IOException {
        Loggers.MSG.info("{}: slice the item input \"{}\"...", (Object)this.loadStepId, (Object)itemInput);
        Map<FileManager, ByteArrayOutputStream> itemsOutByteBuffs = this.fileMgrs.stream().collect(Collectors.toMap(Function.identity(), fileMgr -> new ByteArrayOutputStream(batchSize * 64)));
        Map<FileManager, ObjectOutputStream> itemsOutputs = itemsOutByteBuffs.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> {
            try {
                return new ObjectOutputStream((OutputStream)entry.getValue());
            }
            catch (IOException iOException) {
                return null;
            }
        }));
        this.transferData(itemInput, itemsOutByteBuffs, itemsOutputs, batchSize);
        itemsOutputs.values().parallelStream().filter(Objects::nonNull).forEach(outStream -> {
            try {
                outStream.close();
            }
            catch (IOException iOException) {
                // empty catch block
            }
        });
    }

    private <I extends Item> void transferData(Input<I> itemInput, Map<FileManager, ByteArrayOutputStream> itemsOutByteBuffs, Map<FileManager, ObjectOutputStream> itemsOutputs, int batchSize) throws IOException {
        int sliceCount = itemsOutByteBuffs.size();
        ArrayList itemsBuff = new ArrayList(batchSize);
        long count = 0L;
        long lastProgressOutputTimeMillis = System.currentTimeMillis();
        Loggers.MSG.info("Items input \"{}\": starting to distribute the items among the {} load step slices", (Object)itemInput, (Object)sliceCount);
        while (true) {
            int n;
            try {
                n = itemInput.get(itemsBuff, batchSize);
            }
            catch (Exception e) {
                Exceptions.throwUncheckedIfInterrupted(e);
                if (e instanceof EOFException) break;
                throw e;
            }
            if (n <= 0) break;
            for (int i = 0; i < n; ++i) {
                itemsOutputs.get(this.fileMgrs.get(i % sliceCount)).writeUnshared(itemsBuff.get(i));
            }
            itemsBuff.clear();
            this.fileMgrs.parallelStream().forEach(fileMgr -> {
                ByteArrayOutputStream buff = (ByteArrayOutputStream)itemsOutByteBuffs.get(fileMgr);
                String itemInputFileName = this.itemInputFileSlices.get(fileMgr);
                try {
                    byte[] data = buff.toByteArray();
                    fileMgr.writeToFile(itemInputFileName, data);
                    buff.reset();
                }
                catch (IOException e) {
                    LogUtil.exception(Level.WARN, e, "Failed to write the items input data to the {} file \"{}\"", itemInputFileName, fileMgr instanceof FileManagerService ? "remote" : "local");
                }
            });
            count += (long)n;
            if (System.currentTimeMillis() - lastProgressOutputTimeMillis <= 10000L) continue;
            Loggers.MSG.info("Transferred {} items from the input \"{}\"...", (Object)count, (Object)itemInput);
            lastProgressOutputTimeMillis = System.currentTimeMillis();
        }
        Loggers.MSG.info("Items input \"{}\": {} items was distributed among the {} load step slices", (Object)itemInput, (Object)count, (Object)sliceCount);
    }
}

