package net.sodacan.core.scheduler;

import java.io.Closeable;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import net.sodacan.core.ActorGroup;
import net.sodacan.core.ActorId;
import net.sodacan.core.Config;
import net.sodacan.core.Message;
import net.sodacan.core.Scheduler;
import net.sodacan.core.actor.ActorEntry;
import net.sodacan.core.message.Evict;

/* loaded from: input_file:net/sodacan/core/scheduler/DefaultScheduler.class */
public class DefaultScheduler implements Scheduler, Closeable {
    private Config config;
    protected ActorGroup actorGroup;
    private AtomicInteger messageCount = new AtomicInteger(0);
    private AtomicInteger messageLoad = new AtomicInteger(0);
    private AtomicInteger maxMessageLoad = new AtomicInteger(0);
    private AtomicInteger evictionCount = new AtomicInteger(0);
    private AtomicInteger totalSleepTime = new AtomicInteger(0);
    private AtomicInteger maxThreadQueueDepth = new AtomicInteger(0);
    private final ReentrantLock newMessageLock = new ReentrantLock();
    private Map<ActorId, ActorEntry> actorEntries = new ConcurrentHashMap();
    private ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newCachedThreadPool();

    public DefaultScheduler(Config config) {
        this.config = config;
    }

    @Override // net.sodacan.core.Scheduler
    public ActorGroup getActorGroup() {
        return this.actorGroup;
    }

    @Override // net.sodacan.core.Scheduler
    public void setActorGroup(ActorGroup actorGroup) {
        this.actorGroup = actorGroup;
    }

    @Override // net.sodacan.core.Scheduler
    public boolean addMessage(Message message) {
        return addMessage(message, false);
    }

    protected boolean backpressureWait() {
        while (getMessageLoad() >= this.config.getBackpressureLimit()) {
            try {
                Thread.sleep(this.config.getBackpressureWaitMs());
                incrementSleepTime();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return true;
    }

    protected void evict(ActorEntry actorEntry) {
        actorEntry.queueMessage(new Evict(actorEntry.getActorId()));
        this.actorEntries.remove(actorEntry.getActorId());
    }

    protected void makeRoomInActorEntries() {
        int eviction = this.config.getEviction();
        int size = this.actorEntries.size();
        if (size < this.config.getActorGroupThreads() + eviction) {
            return;
        }
        ActorEntry[] actorEntryArr = (ActorEntry[]) this.actorEntries.values().toArray(new ActorEntry[size]);
        Arrays.sort(actorEntryArr, Comparator.comparingLong((v0) -> {
            return v0.getUsage();
        }));
        for (int i = 0; i < eviction; i++) {
            evict(actorEntryArr[i]);
        }
    }

    protected synchronized ActorEntry addActorEntry(ActorId actorId) {
        ActorEntry actorEntry = null;
        try {
            makeRoomInActorEntries();
            actorEntry = new ActorEntry(this.config, this, actorId);
            this.actorEntries.put(actorId, actorEntry);
            this.pool.execute(actorEntry);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return actorEntry;
    }

    @Override // net.sodacan.core.Scheduler
    public boolean addMessage(Message message, boolean z) {
        if (!z) {
            backpressureWait();
        }
        ActorId targetActorId = message.getTargetActorId();
        if (targetActorId == null) {
            throw new RuntimeException("In scheduler, Message has no targetActorId");
        }
        ActorEntry actorEntry = this.actorEntries.get(targetActorId);
        if (actorEntry == null) {
            actorEntry = addActorEntry(targetActorId);
        }
        actorEntry.queueMessage(message);
        int queueSize = actorEntry.getQueueSize();
        if (queueSize <= this.maxThreadQueueDepth.get()) {
            return true;
        }
        this.maxThreadQueueDepth.set(queueSize);
        return true;
    }

    public void auditActorEntries() {
        for (Map.Entry<ActorId, ActorEntry> entry : this.actorEntries.entrySet()) {
            int queueSize = entry.getValue().getQueueSize();
            if (queueSize > 0) {
                System.out.println(String.valueOf(entry.getKey()) + " queue size=" + queueSize);
                this.pool.execute(entry.getValue());
            }
        }
    }

    @Override // net.sodacan.core.Scheduler
    public void evictAll() {
        for (ActorEntry actorEntry : (ActorEntry[]) this.actorEntries.values().toArray(new ActorEntry[this.actorEntries.size()])) {
            evict(actorEntry);
        }
    }

    @Override // net.sodacan.core.Scheduler
    public void waitForMessagesToFinish() {
        while (getMessageLoad() != 0) {
            try {
                System.out.println("Shutdown: Waiting for " + getMessageLoad() + " messages to finish...");
                Thread.sleep(this.config.getShutdownWaitMs());
            } catch (InterruptedException e) {
                e.printStackTrace();
                return;
            }
        }
    }

    @Override // net.sodacan.core.Scheduler
    public Config getConfig() {
        return this.config;
    }

    @Override // net.sodacan.core.Scheduler
    public void increaseMessageLoad() {
        int incrementAndGet = this.messageLoad.incrementAndGet();
        if (incrementAndGet > this.maxMessageLoad.get()) {
            this.maxMessageLoad.set(incrementAndGet);
        }
    }

    @Override // net.sodacan.core.Scheduler
    public void reduceMessageLoad() {
        this.messageLoad.decrementAndGet();
    }

    @Override // net.sodacan.core.Scheduler
    public int getMessageLoad() {
        return this.messageLoad.get();
    }

    @Override // net.sodacan.core.Scheduler
    public int getMaxMessageLoad() {
        return this.maxMessageLoad.get();
    }

    @Override // net.sodacan.core.Scheduler
    public void incrementSleepTime() {
        this.totalSleepTime.addAndGet(getConfig().getBackpressureWaitMs());
    }

    @Override // net.sodacan.core.Scheduler
    public void increaseMessageCount() {
        this.messageCount.incrementAndGet();
    }

    @Override // net.sodacan.core.Scheduler
    public int getMessageCount() {
        return this.messageCount.get();
    }

    @Override // net.sodacan.core.Scheduler
    public int getSleepTime() {
        return this.totalSleepTime.get();
    }

    @Override // net.sodacan.core.Scheduler
    public int getActorCount() {
        return this.actorEntries.size();
    }

    @Override // net.sodacan.core.Scheduler
    public int getMaxThreadQueueDepth() {
        return this.maxThreadQueueDepth.get();
    }

    @Override // net.sodacan.core.Scheduler
    public void increaseEvictionCount() {
        this.evictionCount.incrementAndGet();
    }

    @Override // net.sodacan.core.Scheduler
    public int getEvictionCount() {
        return this.evictionCount.get();
    }

    @Override // net.sodacan.core.Scheduler
    public int getThreadPoolSize() {
        return this.pool.getActiveCount();
    }

    @Override // net.sodacan.core.Scheduler, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        try {
            evictAll();
            waitForMessagesToFinish();
            this.pool.awaitTermination(1L, TimeUnit.SECONDS);
            this.pool.shutdown();
            System.out.println("Schduler orderly Shutdown complete for " + String.valueOf(getActorGroup()));
        } catch (Exception e) {
            throw new RuntimeException("Error closing Scheduler for ActorGroup ", e);
        }
    }
}
