package io.parallec.core.actor;

import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import io.parallec.core.actor.message.ContinueToSendToBatchSenderAsstManager;
import io.parallec.core.actor.message.RequestToBatchSenderAsstManager;
import io.parallec.core.actor.message.ResponseCountToBatchSenderAsstManager;
import io.parallec.core.actor.message.type.OperationWorkerMsgType;
import io.parallec.core.config.ParallecGlobalConfig;
import io.parallec.core.util.PcDateUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Duration;

/* loaded from: input_file:io/parallec/core/actor/AssistantExecutionManager.class */
public class AssistantExecutionManager extends UntypedActor {
    private static Logger logger = LoggerFactory.getLogger(AssistantExecutionManager.class);
    private long asstManagerRetryIntervalMillis;
    protected int responseCount = 0;
    protected int requestTotalCount = 0;
    protected long startTime = System.currentTimeMillis();
    protected long endTime = -1;
    protected ActorRef originalManager = null;
    protected List<ActorRef> workers = new ArrayList();
    protected int maxConcurrencyAdjusted = ParallecGlobalConfig.concurrencyDefault;
    protected int processedWorkerCount = 0;
    protected String taskId = null;
    protected String taskIdTrim = null;

    public void sendMessageUntilStopCount(int i) {
        for (int i2 = this.processedWorkerCount; i2 < this.workers.size(); i2++) {
            ActorRef actorRef = this.workers.get(i2);
            try {
                Thread.sleep(1L);
            } catch (InterruptedException e) {
                logger.error("sleep exception " + e + " details: ", e);
            }
            actorRef.tell(OperationWorkerMsgType.PROCESS_REQUEST, this.originalManager);
            this.processedWorkerCount++;
            if (this.processedWorkerCount > i) {
                return;
            }
            logger.debug("REQ_SENT: {} / {} taskId {}", new Object[]{Integer.valueOf(this.processedWorkerCount), Integer.valueOf(this.requestTotalCount), this.taskIdTrim});
        }
    }

    public void waitAndRetry() {
        ContinueToSendToBatchSenderAsstManager continueToSendToBatchSenderAsstManager = new ContinueToSendToBatchSenderAsstManager(this.processedWorkerCount);
        logger.debug("NOW WAIT Another " + this.asstManagerRetryIntervalMillis + " MS. at " + PcDateUtils.getNowDateTimeStrStandard());
        getContext().system().scheduler().scheduleOnce(Duration.create(this.asstManagerRetryIntervalMillis, TimeUnit.MILLISECONDS), getSelf(), continueToSendToBatchSenderAsstManager, getContext().system().dispatcher(), getSelf());
    }

    public void onReceive(Object obj) {
        if (obj instanceof RequestToBatchSenderAsstManager) {
            RequestToBatchSenderAsstManager requestToBatchSenderAsstManager = (RequestToBatchSenderAsstManager) obj;
            this.originalManager = getSender();
            this.taskId = requestToBatchSenderAsstManager.getTaskId();
            this.asstManagerRetryIntervalMillis = requestToBatchSenderAsstManager.getAsstManagerRetryIntervalMillis();
            this.taskIdTrim = this.taskId.length() <= 12 ? this.taskId : this.taskId.substring(this.taskId.length() - 12, this.taskId.length());
            this.workers = requestToBatchSenderAsstManager.getWorkers();
            this.maxConcurrencyAdjusted = requestToBatchSenderAsstManager.getMaxConcurrency();
            this.requestTotalCount = this.workers.size();
            sendMessageUntilStopCount(this.maxConcurrencyAdjusted);
            if (this.processedWorkerCount < this.requestTotalCount) {
                waitAndRetry();
                return;
            } else {
                logger.info("Now finished sending all needed messages. Done job of ASST Manager at " + PcDateUtils.getNowDateTimeStrStandard());
                return;
            }
        }
        if (!(obj instanceof ContinueToSendToBatchSenderAsstManager)) {
            if (obj instanceof ResponseCountToBatchSenderAsstManager) {
                this.responseCount = ((ResponseCountToBatchSenderAsstManager) obj).getResponseCount();
                logger.debug("RECV IN batchSenderAsstManager FROM ExecutionManager responseCount: " + this.responseCount);
                return;
            }
            return;
        }
        if (this.requestTotalCount - this.processedWorkerCount <= 0) {
            logger.info("!Finished sending all msg in ASST MANAGER at " + PcDateUtils.getNowDateTimeStrStandard() + " STOP doing wait and retry.");
            return;
        }
        int i = this.maxConcurrencyAdjusted - (this.processedWorkerCount - this.responseCount);
        if (i <= 0) {
            logger.info("NO ROOM to send extra. Windowns is full. extraSendCount is negative: " + i + " reschedule now at " + PcDateUtils.getNowDateTimeStrStandard());
            waitAndRetry();
        } else {
            logger.info("HAVE ROOM to send extra of : " + i + " MSG. now Send at " + PcDateUtils.getNowDateTimeStrStandard());
            sendMessageUntilStopCount(this.processedWorkerCount + i);
            waitAndRetry();
        }
    }
}
