/*
 * Decompiled with CFR 0.152.
 */
package com.emc.mongoose.storage.driver.preempt;

import com.emc.mongoose.base.config.IllegalConfigurationException;
import com.emc.mongoose.base.data.DataInput;
import com.emc.mongoose.base.item.Item;
import com.emc.mongoose.base.item.op.Operation;
import com.emc.mongoose.base.logging.Loggers;
import com.emc.mongoose.base.storage.driver.StorageDriver;
import com.emc.mongoose.base.storage.driver.StorageDriverBase;
import com.github.akurilov.commons.lang.Exceptions;
import com.github.akurilov.confuse.Config;
import java.io.EOFException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public abstract class PreemptStorageDriverBase<I extends Item, O extends Operation<I>>
extends StorageDriverBase<I, O>
implements StorageDriver<I, O> {
    public static final int BATCH_MODE_INPUT_OP_COUNT_LIMIT = 1000000;
    private final ThreadPoolExecutor ioExecutor;

    protected abstract ThreadFactory ioWorkerThreadFactory();

    protected PreemptStorageDriverBase(String stepId, DataInput itemDataInput, Config storageConfig, boolean verifyFlag, int batchSize) throws IllegalConfigurationException {
        super(stepId, itemDataInput, storageConfig, verifyFlag);
        int inQueueSize = storageConfig.intVal("driver-limit-queue-input");
        int outQueueSize = storageConfig.intVal("driver-limit-queue-output");
        int maxOpCount = inQueueSize * batchSize;
        if (maxOpCount > outQueueSize) {
            Loggers.ERR.warn("The product of the batch size and input queue size (" + maxOpCount + ") is greater than the output queue size (" + outQueueSize + ") which may cause the load operation results handling failures, please consider tuning");
        }
        if (1000000 < maxOpCount) {
            Loggers.ERR.warn("The product of the batch size and input queue size is " + maxOpCount + " which may cause out of memory, please consider tuning");
        }
        this.ioExecutor = new ThreadPoolExecutor(this.ioWorkerCount, this.ioWorkerCount, 0L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(inQueueSize), this.ioWorkerThreadFactory());
    }

    public final boolean put(O op) {
        try {
            this.ioExecutor.execute(this.wrapToBlocking(op));
            return true;
        }
        catch (RejectedExecutionException e) {
            if (!this.isStarted() || this.ioExecutor.isShutdown() || this.ioExecutor.isTerminated()) {
                Exceptions.throwUnchecked((Throwable)new EOFException());
            }
            return false;
        }
    }

    public final int put(List<O> ops, int from, int to) {
        int i;
        if (!this.isStarted() || this.ioExecutor.isShutdown() || this.ioExecutor.isTerminated()) {
            Exceptions.throwUnchecked((Throwable)new EOFException());
        }
        try {
            if (this.isBatch(ops, from, to)) {
                this.ioExecutor.execute(this.wrapToBlocking(ops, from, to));
                i = to;
            } else {
                for (i = from; i < to; ++i) {
                    this.ioExecutor.execute(this.wrapToBlocking((Operation)ops.get(i)));
                }
            }
        }
        catch (RejectedExecutionException rejectedExecutionException) {
            // empty catch block
        }
        return i - from;
    }

    public final int put(List<O> ops) {
        return this.put(ops, 0, ops.size());
    }

    private Runnable wrapToBlocking(O op) {
        if (this.prepare((Operation)op)) {
            return () -> this.execute(op);
        }
        return () -> op.status(Operation.Status.FAIL_UNKNOWN);
    }

    private Runnable wrapToBlocking(List<O> ops, int from, int to) {
        ArrayList<Operation> opsRangeCopy = new ArrayList<Operation>();
        for (int i = from; i < to; ++i) {
            Operation op = (Operation)ops.get(i);
            this.prepare(op);
            opsRangeCopy.add(op);
        }
        return () -> this.execute((List<O>)opsRangeCopy);
    }

    protected abstract boolean isBatch(List<O> var1, int var2, int var3);

    protected abstract void execute(O var1);

    protected abstract void execute(List<O> var1);

    public final int activeOpCount() {
        return this.ioExecutor.getActiveCount();
    }

    public final long scheduledOpCount() {
        return this.ioExecutor.getTaskCount();
    }

    public final long completedOpCount() {
        return this.ioExecutor.getCompletedTaskCount();
    }

    public final boolean isIdle() {
        return this.ioExecutor.getActiveCount() == 0;
    }

    protected void doStart() {
        this.ioExecutor.prestartAllCoreThreads();
        Loggers.MSG.debug("{}: started", (Object)this.toString());
    }

    protected void doShutdown() {
        this.ioExecutor.shutdown();
        this.ioExecutor.getQueue().clear();
        Loggers.MSG.debug("{}: shut down", (Object)this.toString());
    }

    protected void doStop() {
        Loggers.MSG.debug("{}: interrupting...", (Object)this.toString());
        try {
            if (this.ioExecutor.awaitTermination(1L, TimeUnit.SECONDS)) {
                Loggers.MSG.debug("{}: interrupting finished in 1 seconds", (Object)this.toString());
            } else {
                Loggers.ERR.debug("{}: interrupting did not finish in 1 second, forcing", (Object)this.toString());
            }
        }
        catch (InterruptedException e) {
            this.ioExecutor.shutdownNow();
            Exceptions.throwUnchecked((Throwable)e);
        }
        finally {
            Loggers.MSG.debug("{}: interrupted", (Object)this.toString());
        }
    }

    public boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException {
        return this.ioExecutor.awaitTermination(timeout, timeUnit);
    }
}

