package org.flinkextended.flink.ml.cluster.master;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.flinkextended.flink.ml.cluster.BaseEventReporter;
import org.flinkextended.flink.ml.cluster.master.meta.AMMeta;
import org.flinkextended.flink.ml.cluster.node.MLContext;
import org.flinkextended.flink.ml.cluster.statemachine.InvalidStateTransitionException;
import org.flinkextended.flink.ml.cluster.statemachine.StateMachine;
import org.flinkextended.flink.ml.cluster.statemachine.event.EventHandler;
import org.flinkextended.flink.ml.proto.AMStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/flinkextended/flink/ml/cluster/master/AbstractAMStateMachine.class */
public abstract class AbstractAMStateMachine implements EventHandler<AMEvent> {
    private static final Logger Logger = LoggerFactory.getLogger(AbstractAMStateMachine.class);
    protected final Lock writeLock;
    protected final Lock readLock;
    protected StateMachine<AMStatus, AMEventType, AMEvent> stateMachine;
    protected final AMService amService;
    protected final BaseEventReporter eventReporter;
    protected final AMMeta amMeta;
    protected final MLContext mlContext;
    protected final BlockingQueue<AMEvent> eventQueue = new ArrayBlockingQueue(1000);
    protected final ExecutorService exService;

    /* loaded from: input_file:org/flinkextended/flink/ml/cluster/master/AbstractAMStateMachine$EventHandle.class */
    class EventHandle implements Runnable {
        EventHandle() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    AbstractAMStateMachine.this.handle(AbstractAMStateMachine.this.eventQueue.take());
                } catch (InterruptedException e) {
                    AbstractAMStateMachine.Logger.info("EventHandle thread interrupted, exiting with {} pending events", Integer.valueOf(AbstractAMStateMachine.this.eventQueue.size()));
                    return;
                } catch (Exception e2) {
                    AbstractAMStateMachine.Logger.error("Failed to handle event", e2);
                    AbstractAMStateMachine.this.getAmService().handleStateTransitionError(null, e2);
                }
            }
        }
    }

    public AbstractAMStateMachine(AMService aMService, AMMeta aMMeta, MLContext mLContext, BaseEventReporter baseEventReporter) {
        this.amService = aMService;
        this.eventReporter = baseEventReporter;
        this.amMeta = aMMeta;
        this.mlContext = mLContext;
        ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
        this.readLock = reentrantReadWriteLock.readLock();
        this.writeLock = reentrantReadWriteLock.writeLock();
        this.stateMachine = buildStateMachine(mLContext, aMMeta);
        this.exService = Executors.newFixedThreadPool(1, runnable -> {
            Thread thread = new Thread(runnable);
            thread.setDaemon(true);
            thread.setName("am_event_handler");
            return thread;
        });
        this.exService.submit(new EventHandle());
        Logger.info("start am_event_handler thread!");
    }

    protected abstract StateMachine<AMStatus, AMEventType, AMEvent> buildStateMachine(MLContext mLContext, AMMeta aMMeta);

    public boolean sendEvent(AMEvent aMEvent) {
        try {
            return this.eventQueue.offer(aMEvent, 5L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
            return false;
        }
    }

    protected StateMachine<AMStatus, AMEventType, AMEvent> getStateMachine() {
        return this.stateMachine;
    }

    public AMStatus getInternalState() {
        this.readLock.lock();
        try {
            return getStateMachine().getCurrentState();
        } finally {
            this.readLock.unlock();
        }
    }

    @Override // org.flinkextended.flink.ml.cluster.statemachine.event.EventHandler
    public void handle(AMEvent aMEvent) throws Exception {
        try {
            this.writeLock.lock();
            if (0 == aMEvent.getVersion() || this.amService.version() == aMEvent.getVersion()) {
                AMStatus internalState = getInternalState();
                try {
                    getStateMachine().doTransition(aMEvent.getType(), aMEvent);
                    Logger.info("AM doTransition:" + internalState.toString() + " => " + getInternalState().toString());
                    this.writeLock.unlock();
                } catch (InvalidStateTransitionException e) {
                    e.printStackTrace();
                    Logger.info("Can't handle this event at current state");
                    if (internalState != getInternalState()) {
                        Logger.info("Job Transitioned from " + internalState + " to " + getInternalState());
                    }
                    throw e;
                }
            }
        } finally {
            this.writeLock.unlock();
        }
    }

    public AMService getAmService() {
        return this.amService;
    }

    public BaseEventReporter getEventReporter() {
        return this.eventReporter;
    }

    public AMMeta getAMMeta() {
        return this.amMeta;
    }

    public MLContext getMLContext() {
        return this.mlContext;
    }

    public void close() {
        if (null == this.exService || this.exService.isShutdown()) {
            return;
        }
        this.exService.shutdownNow();
        try {
            this.exService.awaitTermination(5L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Logger.info("close am_event_handler thread!");
    }
}
