/*
 * Decompiled with CFR 0.152.
 */
package io.nosqlbench.engine.core.lifecycle;

import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.core.ActivityController;
import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver;
import io.nosqlbench.engine.api.activityapi.core.Motor;
import io.nosqlbench.engine.api.activityapi.core.ProgressMeter;
import io.nosqlbench.engine.api.activityapi.core.RunState;
import io.nosqlbench.engine.api.activityapi.core.Stoppable;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityimpl.ParameterMap;
import io.nosqlbench.engine.api.activityimpl.input.ProgressCapable;
import io.nosqlbench.engine.core.annotation.Annotators;
import io.nosqlbench.engine.core.lifecycle.ActivityExceptionHandler;
import io.nosqlbench.engine.core.lifecycle.IndexedThreadFactory;
import io.nosqlbench.nb.api.annotations.Annotation;
import io.nosqlbench.nb.api.annotations.Layer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class ActivityExecutor
implements ActivityController,
ParameterMap.Listener,
ProgressCapable {
    private static final Logger logger = LogManager.getLogger(ActivityExecutor.class);
    private static final Logger activitylogger = LogManager.getLogger((String)"ACTIVITY");
    private final List<Motor<?>> motors = new ArrayList();
    private final Activity activity;
    private final ActivityDef activityDef;
    private final ExecutorService executorService;
    private RuntimeException stoppingException;
    private static final int waitTime = 10000;
    private String sessionId = "";
    private long startedAt = 0L;
    private long stoppedAt = 0L;
    private String[] annotatedCommand;

    public ActivityExecutor(Activity activity, String sessionId) {
        this.activity = activity;
        this.activityDef = activity.getActivityDef();
        this.executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 0L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new IndexedThreadFactory(activity.getAlias(), new ActivityExceptionHandler(this)));
        activity.getActivityDef().getParams().addListener((ParameterMap.Listener)this);
        activity.setActivityController((ActivityController)this);
        this.sessionId = sessionId;
    }

    public void setSessionId(String sessionId) {
        this.sessionId = sessionId;
    }

    public synchronized void startActivity() {
        logger.info("starting activity " + this.activity.getAlias() + " for cycles " + this.activity.getCycleSummary());
        Annotators.recordAnnotation(Annotation.newBuilder().session(this.sessionId).now().layer(Layer.Activity).label("alias", this.getActivityDef().getAlias()).label("driver", this.getActivityDef().getActivityType()).label("workload", this.getActivityDef().getParams().getOptionalString(new String[]{"workload"}).orElse("none")).detail("params", this.getActivityDef().toString()).build());
        activitylogger.debug("START/before alias=(" + this.activity.getAlias() + ")");
        try {
            this.activity.setRunState(RunState.Starting);
            this.startedAt = System.currentTimeMillis();
            this.activity.initActivity();
        }
        catch (Exception e) {
            this.stoppingException = new RuntimeException("Error initializing activity '" + this.activity.getAlias() + "':\n" + e.getMessage(), e);
            throw this.stoppingException;
        }
        this.adjustToActivityDef(this.activity.getActivityDef());
        this.activity.setRunState(RunState.Running);
        activitylogger.debug("START/after alias=(" + this.activity.getAlias() + ")");
    }

    public synchronized void stopActivity() {
        activitylogger.debug("STOP/before alias=(" + this.activity.getAlias() + ")");
        this.activity.setRunState(RunState.Stopping);
        logger.info("stopping activity in progress: " + this.getActivityDef().getAlias());
        this.motors.forEach(Stoppable::requestStop);
        this.motors.forEach(m -> this.awaitRequiredMotorState((Motor)m, 30000, 50, RunState.Stopped, RunState.Finished));
        this.activity.shutdownActivity();
        this.activity.closeAutoCloseables();
        this.activity.setRunState(RunState.Stopped);
        logger.info("stopped: " + this.getActivityDef().getAlias() + " with " + this.motors.size() + " slots");
        activitylogger.debug("STOP/after alias=(" + this.activity.getAlias() + ")");
        Annotators.recordAnnotation(Annotation.newBuilder().session(this.sessionId).interval(this.startedAt, this.stoppedAt).layer(Layer.Activity).label("alias", this.getActivityDef().getAlias()).label("driver", this.getActivityDef().getActivityType()).label("workload", this.getActivityDef().getParams().getOptionalString(new String[]{"workload"}).orElse("none")).detail("params", this.getActivityDef().toString()).build());
    }

    public synchronized RuntimeException forceStopScenario(int initialMillisToWait) {
        activitylogger.debug("FORCE STOP/before alias=(" + this.activity.getAlias() + ")");
        this.activity.setRunState(RunState.Stopped);
        this.executorService.shutdown();
        this.requestStopMotors();
        int divisor = 100;
        int polltime = initialMillisToWait / divisor;
        long gracefulWaitStartedAt = System.currentTimeMillis();
        long waitUntil = (long)initialMillisToWait + gracefulWaitStartedAt;
        long time = gracefulWaitStartedAt;
        while (time < waitUntil && !this.executorService.isTerminated()) {
            try {
                Thread.sleep(polltime);
                time = System.currentTimeMillis();
            }
            catch (InterruptedException interruptedException) {}
        }
        long gracefulWaitEndedAt = System.currentTimeMillis();
        logger.debug("took " + (gracefulWaitEndedAt - gracefulWaitStartedAt) + " ms to shutdown gracefully");
        if (!this.executorService.isTerminated()) {
            logger.info("stopping activity forcibly " + this.activity.getAlias());
            List<Runnable> runnables = this.executorService.shutdownNow();
            long forcibleShutdownCompletedAt = System.currentTimeMillis();
            logger.debug("took " + (forcibleShutdownCompletedAt - gracefulWaitEndedAt) + " ms to shutdown forcibly");
            logger.debug(runnables.size() + " tasks never started.");
        }
        long activityShutdownStartedAt = System.currentTimeMillis();
        logger.debug("invoking activity-specific shutdown hooks");
        this.activity.shutdownActivity();
        this.activity.closeAutoCloseables();
        long activityShutdownEndedAt = System.currentTimeMillis();
        logger.debug("took " + (activityShutdownEndedAt - activityShutdownStartedAt) + " ms to shutdown activity threads");
        activitylogger.debug("FORCE STOP/after alias=(" + this.activity.getAlias() + ")");
        if (this.stoppingException != null) {
            activitylogger.debug("FORCE STOP/exception alias=(" + this.activity.getAlias() + ")");
        }
        return this.stoppingException;
    }

    public synchronized void forceStopScenarioAndThrow(int initialMillisToWait, boolean rethrow) {
        RuntimeException exception = this.forceStopScenario(initialMillisToWait);
        if (exception != null && rethrow) {
            throw exception;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean finishAndShutdownExecutor(int secondsToWait) {
        activitylogger.debug("REQUEST STOP/before alias=(" + this.activity.getAlias() + ")");
        logger.debug("Stopping executor for " + this.activity.getAlias() + " when work completes.");
        this.executorService.shutdown();
        boolean wasStopped = false;
        try {
            logger.trace("awaiting termination with timeout of " + secondsToWait + " seconds");
            wasStopped = this.executorService.awaitTermination(secondsToWait, TimeUnit.SECONDS);
        }
        catch (InterruptedException ie) {
            logger.trace("interrupted while awaiting termination");
            wasStopped = false;
            logger.warn("while waiting termination of activity " + this.activity.getAlias() + ", " + ie.getMessage());
            activitylogger.debug("REQUEST STOP/exception alias=(" + this.activity.getAlias() + ") wasstopped=" + wasStopped);
        }
        finally {
            logger.trace("finally shutting down activity " + this.getActivity().getAlias());
            this.activity.shutdownActivity();
            logger.trace("closing auto-closeables");
            this.activity.closeAutoCloseables();
            this.activity.setRunState(RunState.Stopped);
            this.stoppedAt = System.currentTimeMillis();
        }
        if (this.stoppingException != null) {
            logger.trace("an exception caused the activity to stop:" + this.stoppingException.getMessage());
            throw this.stoppingException;
        }
        activitylogger.debug("REQUEST STOP/after alias=(" + this.activity.getAlias() + ") wasstopped=" + wasStopped);
        return wasStopped;
    }

    public synchronized void handleParameterMapUpdate(ParameterMap parameterMap) {
        this.activity.onActivityDefUpdate(this.activityDef);
        if (this.activity.getRunState() != RunState.Uninitialized) {
            if (this.activity.getRunState() == RunState.Running) {
                this.adjustToActivityDef(this.activity.getActivityDef());
            }
            this.motors.stream().filter(m -> m instanceof ActivityDefObserver).forEach(m -> ((ActivityDefObserver)m).onActivityDefUpdate(this.activityDef));
        }
    }

    public ActivityDef getActivityDef() {
        return this.activityDef;
    }

    public boolean awaitCompletion(int waitTime) {
        boolean finished = this.finishAndShutdownExecutor(waitTime);
        Annotators.recordAnnotation(Annotation.newBuilder().session(this.sessionId).interval(this.startedAt, this.stoppedAt).layer(Layer.Activity).label("alias", this.getActivityDef().getAlias()).label("driver", this.getActivityDef().getActivityType()).label("workload", this.getActivityDef().getParams().getOptionalString(new String[]{"workload"}).orElse("none")).detail("params", this.getActivityDef().toString()).build());
        return finished;
    }

    public boolean awaitFinish(int timeout) {
        activitylogger.debug("AWAIT-FINISH/before alias=(" + this.activity.getAlias() + ")");
        boolean awaited = this.awaitAllRequiredMotorState(timeout, 50, RunState.Finished, RunState.Stopped);
        if (awaited) {
            awaited = this.awaitCompletion(timeout);
        }
        if (this.stoppingException != null) {
            activitylogger.debug("AWAIT-FINISH/exception alias=(" + this.activity.getAlias() + ")");
            throw this.stoppingException;
        }
        activitylogger.debug("AWAIT-FINISH/after alias=(" + this.activity.getAlias() + ")");
        return awaited;
    }

    public String toString() {
        return this.getClass().getSimpleName() + "~" + this.activityDef.getAlias();
    }

    private String getSlotStatus() {
        return this.motors.stream().map(m -> m.getSlotStateTracker().getSlotState().getCode()).collect(Collectors.joining(",", "[", "]"));
    }

    private synchronized void adjustToActivityDef(ActivityDef activityDef) {
        Motor motor;
        logger.trace(">-pre-adjust->" + this.getSlotStatus());
        while (this.motors.size() > activityDef.getThreads()) {
            motor = this.motors.get(this.motors.size() - 1);
            logger.trace("Stopping cycle motor thread:" + motor);
            motor.requestStop();
            this.motors.remove(this.motors.size() - 1);
        }
        while (this.motors.size() < activityDef.getThreads()) {
            motor = this.activity.getMotorDispenserDelegate().getMotor(activityDef, this.motors.size());
            logger.trace("Starting cycle motor thread:" + motor);
            this.motors.add(motor);
        }
        this.applyIntendedStateToDivergentMotors();
        this.awaitActivityAndMotorStateAlignment();
        logger.trace(">post-adjust->" + this.getSlotStatus());
    }

    private void applyIntendedStateToDivergentMotors() {
        RunState intended = this.activity.getRunState();
        logger.trace("ADJUSTING to INTENDED " + intended);
        switch (intended) {
            case Uninitialized: {
                break;
            }
            case Running: 
            case Starting: {
                this.motors.stream().filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Running).filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Finished).filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Starting).forEach(m -> {
                    m.getSlotStateTracker().enterState(RunState.Starting);
                    this.executorService.execute((Runnable)m);
                });
                break;
            }
            case Stopped: {
                this.motors.stream().filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Stopped).forEach(Stoppable::requestStop);
                break;
            }
            case Finished: 
            case Stopping: {
                throw new RuntimeException("Invalid requested state in activity executor:" + this.activity.getRunState());
            }
            default: {
                throw new RuntimeException("Unmatched run state:" + this.activity.getRunState());
            }
        }
    }

    private void awaitActivityAndMotorStateAlignment() {
        switch (this.activity.getRunState()) {
            case Running: 
            case Starting: {
                this.motors.forEach(m -> this.awaitRequiredMotorState((Motor)m, 10000, 50, RunState.Running, RunState.Finished));
                break;
            }
            case Stopped: {
                this.motors.forEach(m -> this.awaitRequiredMotorState((Motor)m, 10000, 50, RunState.Stopped, RunState.Finished));
                break;
            }
            case Uninitialized: {
                break;
            }
            case Finished: {
                this.motors.forEach(m -> this.awaitRequiredMotorState((Motor)m, 10000, 50, RunState.Finished));
                break;
            }
            case Stopping: {
                throw new RuntimeException("Invalid requested state in activity executor:" + this.activity.getRunState());
            }
            default: {
                throw new RuntimeException("Unmatched run state:" + this.activity.getRunState());
            }
        }
        logger.debug("activity and threads are aligned to state " + this.activity.getRunState() + " for " + this.getActivity().getAlias());
    }

    private boolean awaitMotorState(Motor m, int waitTime, int pollTime, RunState ... desiredRunStates) {
        long startedAt = System.currentTimeMillis();
        while (System.currentTimeMillis() < startedAt + (long)waitTime) {
            HashMap<RunState, Integer> actualStates = new HashMap<RunState, Integer>();
            for (RunState state : desiredRunStates) {
                actualStates.compute(state, (k, v) -> (v == null ? 0 : v) + 1);
            }
            for (RunState desiredRunState : desiredRunStates) {
                actualStates.remove(desiredRunState);
            }
            logger.trace("state of remaining slots:" + actualStates);
            if (actualStates.size() == 0) {
                return true;
            }
            System.out.println("motor states:" + actualStates);
            try {
                Thread.sleep(pollTime);
            }
            catch (InterruptedException interruptedException) {}
        }
        logger.trace(this.activityDef.getAlias() + "/Motor[" + m.getSlotId() + "] is now in state " + m.getSlotStateTracker().getSlotState());
        return false;
    }

    private boolean awaitAllRequiredMotorState(int waitTime, int pollTime, RunState ... awaitingState) {
        long startedAt = System.currentTimeMillis();
        boolean awaited = false;
        block0: while (!awaited && System.currentTimeMillis() < startedAt + (long)waitTime) {
            awaited = true;
            for (Motor<?> motor : this.motors) {
                awaited = this.awaitMotorState(motor, waitTime, pollTime, awaitingState);
                if (awaited) continue;
                logger.trace("failed awaiting motor " + motor.getSlotId() + " for state in " + Arrays.asList(awaitingState));
                continue block0;
            }
        }
        return awaited;
    }

    private boolean awaitAnyRequiredMotorState(int waitTime, int pollTime, RunState ... awaitingState) {
        long startedAt = System.currentTimeMillis();
        while (System.currentTimeMillis() < startedAt + (long)waitTime) {
            for (Motor<?> motor : this.motors) {
                for (RunState state : awaitingState) {
                    if (motor.getSlotStateTracker().getSlotState() != state) continue;
                    logger.trace("at least one 'any' of " + this.activityDef.getAlias() + "/Motor[" + motor.getSlotId() + "] is now in state " + motor.getSlotStateTracker().getSlotState());
                    return true;
                }
            }
            try {
                Thread.sleep(pollTime);
            }
            catch (InterruptedException interruptedException) {}
        }
        logger.trace("none of " + this.activityDef.getAlias() + "/Motor [" + this.motors.size() + "] is in states in " + Arrays.asList(awaitingState));
        return false;
    }

    private void awaitRequiredMotorState(Motor m, int waitTime, int pollTime, RunState ... awaitingState) {
        RunState startingState = m.getSlotStateTracker().getSlotState();
        boolean awaitedRequiredState = this.awaitMotorState(m, waitTime, pollTime, awaitingState);
        if (!awaitedRequiredState) {
            String error = "Unable to await " + this.activityDef.getAlias() + "/Motor[" + m.getSlotId() + "]: from state " + startingState + " to " + m.getSlotStateTracker().getSlotState() + " after waiting for " + waitTime + "ms";
            RuntimeException e = new RuntimeException(error);
            logger.error(error);
            throw e;
        }
        logger.trace("motor " + m + " entered awaited state: " + Arrays.asList(awaitingState));
    }

    private synchronized void requestStopMotors() {
        logger.info("stopping activity " + this.activity);
        this.activity.setRunState(RunState.Stopped);
        this.motors.forEach(Stoppable::requestStop);
    }

    public boolean isRunning() {
        return this.motors.stream().anyMatch(m -> m.getSlotStateTracker().getSlotState() == RunState.Running);
    }

    public Activity getActivity() {
        return this.activity;
    }

    public synchronized void notifyException(Thread t, Throwable e) {
        this.stoppingException = new RuntimeException("Error in activity thread " + t.getName(), e);
        this.forceStopScenario(10000);
    }

    public synchronized void stopActivityWithReasonAsync(String reason) {
        logger.info("Stopping activity " + this.activityDef.getAlias() + ": " + reason);
        this.stoppingException = new RuntimeException("Stopping activity " + this.activityDef.getAlias() + ": " + reason);
        logger.error("stopping with reason: " + this.stoppingException);
        this.requestStopMotors();
    }

    public synchronized void stopActivityWithErrorAsync(Throwable throwable) {
        if (this.stoppingException == null) {
            this.stoppingException = new RuntimeException(throwable);
            logger.error("stopping on error: " + throwable.toString(), throwable);
        } else if (this.activityDef.getParams().getOptionalBoolean("fullerrors").orElse(false).booleanValue()) {
            logger.error("additional error: " + throwable.toString(), throwable);
        } else {
            logger.warn("summarized error (fullerrors=false): " + throwable.toString());
        }
        this.requestStopMotors();
    }

    public ProgressMeter getProgressMeter() {
        return this.activity.getProgressMeter();
    }
}

