package io.reacted.core.runtime;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.reacted.core.config.dispatchers.DispatcherConfig;
import io.reacted.core.messages.Message;
import io.reacted.core.messages.reactors.EventExecutionAttempt;
import io.reacted.core.reactorsystem.ReActorContext;
import io.reacted.core.reactorsystem.ReActorRef;
import io.reacted.patterns.NonNullByDefault;
import io.reacted.patterns.Try;
import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NonNullByDefault
/* loaded from: input_file:io/reacted/core/runtime/Dispatcher.class */
public class Dispatcher {
    private static final String UNCAUGHT_EXCEPTION_IN_DISPATCHER = "Uncaught exception in thread [%s] : ";
    private static final String REACTIONS_EXECUTION_ERROR = "Error for ReActor {} processing message type {} with seq num {} and value {} ";
    private static final Logger LOGGER = LoggerFactory.getLogger(Dispatcher.class);
    private final DispatcherConfig dispatcherConfig;

    @Nullable
    private ExecutorService dispatcherLifeCyclePool;

    @Nullable
    private ExecutorService[] dispatcherPool;
    private final AtomicLong nextDispatchIdx = new AtomicLong(0);
    private final BlockingDeque<ReActorContext>[] scheduledQueues = (BlockingDeque[]) Stream.iterate(new LinkedBlockingDeque(), linkedBlockingDeque -> {
        return new LinkedBlockingDeque();
    }).limit(getDispatcherConfig().getDispatcherThreadsNum()).toArray(i -> {
        return new LinkedBlockingDeque[i];
    });

    public Dispatcher(DispatcherConfig dispatcherConfig) {
        this.dispatcherConfig = dispatcherConfig;
    }

    public String getName() {
        return this.dispatcherConfig.getDispatcherName();
    }

    public void initDispatcher(ReActorRef reActorRef, boolean z, Function<ReActorContext, Optional<CompletionStage<Void>>> function) {
        ThreadFactory build = new ThreadFactoryBuilder().setNameFormat("ReActed-Dispatcher-Thread-" + getName() + "-%d").setUncaughtExceptionHandler((thread, th) -> {
            LOGGER.error(String.format(UNCAUGHT_EXCEPTION_IN_DISPATCHER, thread.getName()), th);
        }).build();
        ThreadFactory build2 = new ThreadFactoryBuilder().setNameFormat("ReActed-Dispatcher-LifeCycle-Thread-" + getName() + "-%d").setUncaughtExceptionHandler((thread2, th2) -> {
            LOGGER.error(String.format(UNCAUGHT_EXCEPTION_IN_DISPATCHER, thread2.getName()), th2);
        }).build();
        this.dispatcherPool = (ExecutorService[]) Stream.iterate(Executors.newFixedThreadPool(1, build), executorService -> {
            return Executors.newFixedThreadPool(1, build);
        }).limit(getDispatcherConfig().getDispatcherThreadsNum()).toArray(i -> {
            return new ExecutorService[i];
        });
        this.dispatcherLifeCyclePool = Executors.newFixedThreadPool(Integer.max(2, getDispatcherConfig().getDispatcherThreadsNum() >> 2), build2);
        for (int i2 = 0; i2 < getDispatcherConfig().getDispatcherThreadsNum(); i2++) {
            ExecutorService executorService2 = this.dispatcherPool[i2];
            BlockingDeque<ReActorContext> blockingDeque = this.scheduledQueues[i2];
            executorService2.submit(() -> {
                return Try.ofRunnable(() -> {
                    dispatcherLoop(blockingDeque, this.dispatcherConfig.getBatchSize(), this.dispatcherLifeCyclePool, z, reActorRef, function);
                }).ifError(th3 -> {
                    LOGGER.error("Error running dispatcher: ", th3);
                });
            });
        }
    }

    public void stopDispatcher() {
        Arrays.stream((ExecutorService[]) Objects.requireNonNull(this.dispatcherPool)).forEachOrdered((v0) -> {
            v0.shutdownNow();
        });
        getDispatcherLifeCyclePool().shutdown();
    }

    public DispatcherConfig getDispatcherConfig() {
        return this.dispatcherConfig;
    }

    public void dispatch(ReActorContext reActorContext) {
        if (reActorContext.acquireScheduling()) {
            this.scheduledQueues[(int) (this.nextDispatchIdx.getAndIncrement() % this.scheduledQueues.length)].addLast(reActorContext);
        }
    }

    private ExecutorService getDispatcherLifeCyclePool() {
        return (ExecutorService) Objects.requireNonNull(this.dispatcherLifeCyclePool);
    }

    private void dispatcherLoop(BlockingDeque<ReActorContext> blockingDeque, int i, ExecutorService executorService, boolean z, ReActorRef reActorRef, Function<ReActorContext, Optional<CompletionStage<Void>>> function) {
        int i2 = 0;
        while (!Thread.currentThread().isInterrupted()) {
            try {
                ReActorContext takeFirst = blockingDeque.takeFirst();
                takeFirst.acquireCoherence();
                for (int i3 = 0; i3 < i && !takeFirst.getMbox().isEmpty() && !takeFirst.isStop(); i3++) {
                    Message nextMessage = takeFirst.getMbox().getNextMessage();
                    if (z) {
                        reActorRef.tell(takeFirst.getSelf(), new EventExecutionAttempt(takeFirst.getSelf().getReActorId(), takeFirst.getNextMsgExecutionId(), nextMessage.getSequenceNumber()));
                    }
                    try {
                        takeFirst.reAct(nextMessage);
                    } catch (Exception e) {
                        takeFirst.logError(REACTIONS_EXECUTION_ERROR, e, takeFirst.getSelf().getReActorId(), nextMessage.getPayload().getClass(), Long.valueOf(nextMessage.getSequenceNumber()), nextMessage.toString());
                        takeFirst.stop();
                    }
                    i2++;
                }
                takeFirst.releaseCoherence();
                takeFirst.releaseScheduling();
                if (takeFirst.isStop()) {
                    executorService.submit(() -> {
                        return (Optional) function.apply(takeFirst);
                    });
                } else if (!takeFirst.getMbox().isEmpty()) {
                    dispatch(takeFirst);
                }
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
            }
        }
        LOGGER.debug("Dispatcher Thread {} is terminating. Processed: {}", Thread.currentThread().getName(), Integer.valueOf(i2));
    }
}
