package io.reacted.core.runtime;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.reacted.core.config.dispatchers.DispatcherConfig;
import io.reacted.core.messages.Message;
import io.reacted.core.messages.reactors.EventExecutionAttempt;
import io.reacted.core.reactors.ReActorId;
import io.reacted.core.reactorsystem.ReActorContext;
import io.reacted.core.reactorsystem.ReActorRef;
import io.reacted.core.reactorsystem.ReActorSystem;
import io.reacted.patterns.Try;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.agrona.BitUtil;
import org.agrona.concurrent.BackoffIdleStrategy;
import org.agrona.concurrent.ControlledMessageHandler;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.ringbuffer.ManyToOneRingBuffer;
import org.agrona.concurrent.ringbuffer.RingBuffer;
import org.agrona.concurrent.ringbuffer.RingBufferDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/reacted/core/runtime/Dispatcher.class */
public class Dispatcher {
    public static final String DEFAULT_DISPATCHER_NAME = "ReactorSystemDispatcher";
    public static final int DEFAULT_DISPATCHER_BATCH_SIZE = 10;
    public static final int DEFAULT_DISPATCHER_THREAD_NUM = 2;
    private static final String UNCAUGHT_EXCEPTION_IN_DISPATCHER = "Uncaught exception in thread [%s] : ";
    private static final String REACTIONS_EXECUTION_ERROR = "Error for ReActor {} processing message type {} with seq num {} and value {} ";
    private final DispatcherConfig dispatcherConfig;

    @Nullable
    private ExecutorService dispatcherLifeCyclePool;

    @Nullable
    private ExecutorService[] dispatcherPool;
    private final RingBuffer[] scheduledQueues;
    private final AtomicLong nextDispatchIdx = new AtomicLong(0);
    private final ReActorSystem reActorSystem;
    public static final Dispatcher NULL_DISPATCHER = new Dispatcher(DispatcherConfig.NULL_DISPATCHER_CFG, ReActorSystem.NO_REACTOR_SYSTEM);
    private static final Logger LOGGER = LoggerFactory.getLogger(Dispatcher.class);

    public Dispatcher(DispatcherConfig dispatcherConfig, ReActorSystem reActorSystem) {
        this.reActorSystem = reActorSystem;
        int findNextPositivePowerOfTwo = RingBufferDescriptor.TRAILER_LENGTH + BitUtil.findNextPositivePowerOfTwo(ReActorId.NO_REACTOR_ID.getRawIdSize() * reActorSystem.getSystemConfig().getMaximumReActorsNum());
        this.dispatcherConfig = dispatcherConfig;
        this.scheduledQueues = (RingBuffer[]) Stream.iterate(new ManyToOneRingBuffer(new UnsafeBuffer(ByteBuffer.allocateDirect(findNextPositivePowerOfTwo))), manyToOneRingBuffer -> {
            return new ManyToOneRingBuffer(new UnsafeBuffer(ByteBuffer.allocateDirect(findNextPositivePowerOfTwo)));
        }).limit(getDispatcherConfig().getDispatcherThreadsNum()).toArray(i -> {
            return new ManyToOneRingBuffer[i];
        });
    }

    public String getName() {
        return this.dispatcherConfig.getDispatcherName();
    }

    public void initDispatcher(ReActorRef reActorRef, boolean z, Function<ReActorContext, Optional<CompletionStage<Void>>> function) {
        ThreadFactory build = new ThreadFactoryBuilder().setNameFormat("ReActed-Dispatcher-Thread-" + getName() + "-%d").setUncaughtExceptionHandler((thread, th) -> {
            LOGGER.error(String.format(UNCAUGHT_EXCEPTION_IN_DISPATCHER, thread.getName()), th);
        }).build();
        ThreadFactory build2 = new ThreadFactoryBuilder().setNameFormat("ReActed-Dispatcher-LifeCycle-Thread-" + getName() + "-%d").setUncaughtExceptionHandler((thread2, th2) -> {
            LOGGER.error(String.format(UNCAUGHT_EXCEPTION_IN_DISPATCHER, thread2.getName()), th2);
        }).build();
        this.dispatcherPool = (ExecutorService[]) Stream.iterate(Executors.newFixedThreadPool(1, build), executorService -> {
            return Executors.newFixedThreadPool(1, build);
        }).limit(getDispatcherConfig().getDispatcherThreadsNum()).toArray(i -> {
            return new ExecutorService[i];
        });
        this.dispatcherLifeCyclePool = Executors.newFixedThreadPool(Integer.max(2, getDispatcherConfig().getDispatcherThreadsNum() >> 2), build2);
        for (int i2 = 0; i2 < getDispatcherConfig().getDispatcherThreadsNum(); i2++) {
            ExecutorService executorService2 = this.dispatcherPool[i2];
            RingBuffer ringBuffer = this.scheduledQueues[i2];
            executorService2.submit(() -> {
                return Try.ofRunnable(() -> {
                    dispatcherLoop(ringBuffer, this.dispatcherConfig.getBatchSize(), this.dispatcherLifeCyclePool, z, this.reActorSystem, reActorRef, function);
                }).ifError(th3 -> {
                    LOGGER.error("Error running dispatcher: ", th3);
                });
            });
        }
    }

    public void stopDispatcher() {
        Arrays.stream((ExecutorService[]) Objects.requireNonNull(this.dispatcherPool)).forEachOrdered((v0) -> {
            v0.shutdownNow();
        });
        getDispatcherLifeCyclePool().shutdown();
    }

    public DispatcherConfig getDispatcherConfig() {
        return this.dispatcherConfig;
    }

    public boolean dispatch(ReActorContext reActorContext) {
        if (!reActorContext.acquireScheduling()) {
            return true;
        }
        boolean write = this.scheduledQueues[(int) (this.nextDispatchIdx.getAndIncrement() % this.scheduledQueues.length)].write(1, reActorContext.getSchedulationIdBuffer(), 0, 8);
        if (!write) {
            reActorContext.releaseScheduling();
        }
        return write;
    }

    private ExecutorService getDispatcherLifeCyclePool() {
        return (ExecutorService) Objects.requireNonNull(this.dispatcherLifeCyclePool);
    }

    private void dispatcherLoop(RingBuffer ringBuffer, int i, ExecutorService executorService, boolean z, ReActorSystem reActorSystem, ReActorRef reActorRef, Function<ReActorContext, Optional<CompletionStage<Void>>> function) {
        long[] jArr = new long[10];
        AtomicInteger atomicInteger = new AtomicInteger(0);
        long j = 0;
        BackoffIdleStrategy backoffIdleStrategy = new BackoffIdleStrategy(1000000000L, 100L, 1000L, 1000000L);
        ControlledMessageHandler controlledMessageHandler = (i2, mutableDirectBuffer, i3, i4) -> {
            int plain = atomicInteger.getPlain();
            jArr[plain] = mutableDirectBuffer.getLong(i3, ByteOrder.BIG_ENDIAN);
            atomicInteger.setPlain(plain + 1);
            return ControlledMessageHandler.Action.COMMIT;
        };
        while (!Thread.currentThread().isInterrupted()) {
            long j2 = 0;
            int controlledRead = ringBuffer.controlledRead(controlledMessageHandler, jArr.length);
            int plain = atomicInteger.getPlain();
            for (int i5 = 0; i5 < plain; i5++) {
                if (reActorSystem.getReActorCtx(jArr[i5]) != null) {
                    j2 += onMessage(r0, i, executorService, z, reActorRef, function);
                }
            }
            atomicInteger.setPlain(0);
            j += j2;
            backoffIdleStrategy.idle(controlledRead);
        }
        LOGGER.info("Dispatcher Thread {} is terminating. Processed: {}", Thread.currentThread().getName(), Long.valueOf(j));
    }

    public int onMessage(ReActorContext reActorContext, int i, ExecutorService executorService, boolean z, ReActorRef reActorRef, Function<ReActorContext, Optional<CompletionStage<Void>>> function) {
        reActorContext.acquireCoherence();
        int i2 = 0;
        while (i2 < i && !reActorContext.getMbox().isEmpty() && !reActorContext.isStop()) {
            Message nextMessage = reActorContext.getMbox().getNextMessage();
            if (z && reActorRef.tell(reActorContext.getSelf(), new EventExecutionAttempt(reActorContext.getSelf().getReActorId(), reActorContext.getNextMsgExecutionId(), nextMessage.getSequenceNumber())).isNotSent()) {
                LOGGER.error("CRITIC! Unable to send an Execution Attempt for message {} Replay will NOT be possible", nextMessage);
            }
            executeReactionForMessage(reActorContext, nextMessage);
            i2++;
        }
        reActorContext.releaseCoherence();
        reActorContext.releaseScheduling();
        if (reActorContext.isStop()) {
            executorService.submit(() -> {
                return (Optional) function.apply(reActorContext);
            });
        } else if (!reActorContext.getMbox().isEmpty() && !dispatch(reActorContext)) {
            LOGGER.error("CRITIC! Dispatcher cannot reschedule reactor {} with still {} pending messages", reActorContext.getSelf().getReActorId(), Long.valueOf(reActorContext.getMbox().getMsgNum()));
        }
        return i2;
    }

    private void executeReactionForMessage(ReActorContext reActorContext, Message message) {
        try {
            reActorContext.reAct(message);
        } catch (Exception e) {
            reActorContext.logError(REACTIONS_EXECUTION_ERROR, reActorContext.getSelf().getReActorId(), message.getPayload().getClass(), Long.valueOf(message.getSequenceNumber()), message.toString(), e);
            reActorContext.stop();
        }
    }
}
