package io.reacted.core.drivers.system;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalCause;
import com.google.common.cache.RemovalNotification;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.reacted.core.config.ChannelId;
import io.reacted.core.config.drivers.ChannelDriverConfig;
import io.reacted.core.drivers.DriverCtx;
import io.reacted.core.messages.AckingPolicy;
import io.reacted.core.messages.reactors.DeliveryStatus;
import io.reacted.core.messages.reactors.DeliveryStatusUpdate;
import io.reacted.core.reactors.ReActorId;
import io.reacted.core.reactorsystem.NullReActorSystemRef;
import io.reacted.core.reactorsystem.ReActorContext;
import io.reacted.core.reactorsystem.ReActorRef;
import io.reacted.core.reactorsystem.ReActorSystem;
import io.reacted.core.reactorsystem.ReActorSystemId;
import io.reacted.core.reactorsystem.ReActorSystemRef;
import io.reacted.patterns.Try;
import io.reacted.patterns.UnChecked;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.commons.lang3.NotImplementedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/reacted/core/drivers/system/ReActorSystemDriver.class */
public abstract class ReActorSystemDriver<ConfigT extends ChannelDriverConfig<?, ConfigT>> {

    @Nullable
    public static final UnChecked.TriConsumer<ReActorId, Serializable, ReActorRef> DO_NOT_PROPAGATE = null;
    public static final ThreadLocal<DriverCtx> REACTOR_SYSTEM_CTX = new InheritableThreadLocal();
    protected static final Logger LOGGER = LoggerFactory.getLogger(ReActorSystemDriver.class);
    protected static final CompletionStage<DeliveryStatus>[] DELIVERY_RESULT_CACHE = (CompletionStage[]) Arrays.stream(DeliveryStatus.values()).map((v0) -> {
        return CompletableFuture.completedStage(v0);
    }).toArray(i -> {
        return new CompletionStage[i];
    });
    private final ConfigT driverConfig;
    private final Cache<Long, CompletableFuture<DeliveryStatus>> pendingAcksTriggers;

    @Nullable
    private ScheduledFuture<?> cacheMaintenanceTask;

    @Nullable
    private ReActorSystem localReActorSystem;

    @Nullable
    private ExecutorService driverThread;

    /* JADX INFO: Access modifiers changed from: protected */
    public ReActorSystemDriver(ConfigT configt) {
        this.driverConfig = (ConfigT) Objects.requireNonNull(configt, "Driver config cannot be null");
        this.pendingAcksTriggers = CacheBuilder.newBuilder().expireAfterWrite(configt.getApublishAutomaticFailureTimeout().toMillis(), TimeUnit.MILLISECONDS).initialCapacity(configt.getAckCacheSize()).removalListener(ReActorSystemDriver::expireOnTimeout).build();
    }

    public abstract void initDriverLoop(ReActorSystem reActorSystem) throws Exception;

    public abstract CompletionStage<Try<Void>> cleanDriverLoop();

    public abstract UnChecked.CheckedRunnable getDriverLoop();

    public abstract ChannelId getChannelId();

    public abstract Properties getChannelProperties();

    public abstract <PayloadT extends Serializable> DeliveryStatus sendMessage(ReActorRef reActorRef, ReActorContext reActorContext, ReActorRef reActorRef2, long j, ReActorSystemId reActorSystemId, AckingPolicy ackingPolicy, PayloadT payloadt);

    public <PayloadT extends Serializable> CompletionStage<DeliveryStatus> sendAsyncMessage(ReActorRef reActorRef, ReActorContext reActorContext, ReActorRef reActorRef2, long j, ReActorSystemId reActorSystemId, AckingPolicy ackingPolicy, PayloadT payloadt) {
        return DELIVERY_RESULT_CACHE[sendMessage(reActorRef, reActorContext, reActorRef2, j, reActorSystemId, ackingPolicy, payloadt).ordinal()];
    }

    public ConfigT getDriverConfig() {
        return this.driverConfig;
    }

    public abstract <PayloadT extends Serializable> DeliveryStatus publish(ReActorRef reActorRef, ReActorRef reActorRef2, PayloadT payloadt);

    public abstract <PayloadT extends Serializable> DeliveryStatus publish(ReActorRef reActorRef, ReActorRef reActorRef2, @Nullable UnChecked.TriConsumer<ReActorId, Serializable, ReActorRef> triConsumer, PayloadT payloadt);

    public abstract <PayloadT extends Serializable> DeliveryStatus tell(ReActorRef reActorRef, ReActorRef reActorRef2, PayloadT payloadt);

    public abstract <PayloadT extends Serializable> CompletionStage<DeliveryStatus> atell(ReActorRef reActorRef, ReActorRef reActorRef2, AckingPolicy ackingPolicy, PayloadT payloadt);

    public abstract <PayloadT extends Serializable> CompletionStage<DeliveryStatus> apublish(ReActorRef reActorRef, ReActorRef reActorRef2, AckingPolicy ackingPolicy, PayloadT payloadt);

    public abstract <PayloadT extends Serializable> CompletionStage<DeliveryStatus> apublish(ReActorRef reActorRef, ReActorRef reActorRef2, AckingPolicy ackingPolicy, UnChecked.TriConsumer<ReActorId, Serializable, ReActorRef> triConsumer, PayloadT payloadt);

    /* JADX INFO: Access modifiers changed from: protected */
    public <PayloadT extends Serializable> void offerMessage(ReActorRef reActorRef, ReActorRef reActorRef2, long j, ReActorSystemId reActorSystemId, AckingPolicy ackingPolicy, PayloadT payloadt) {
        getLocalReActorSystem().logError("Invalid message offering {}", payloadt, new NotImplementedException());
    }

    @Nullable
    public CompletionStage<DeliveryStatus> removePendingAckTrigger(long j) {
        CompletableFuture completableFuture = (CompletableFuture) this.pendingAcksTriggers.getIfPresent(Long.valueOf(j));
        this.pendingAcksTriggers.invalidate(Long.valueOf(j));
        return completableFuture;
    }

    public CompletionStage<DeliveryStatus> newPendingAckTrigger(long j) {
        CompletableFuture completableFuture = new CompletableFuture();
        this.pendingAcksTriggers.put(Long.valueOf(j), completableFuture);
        return completableFuture;
    }

    public Try<Void> initDriverCtx(ReActorSystem reActorSystem) {
        this.localReActorSystem = reActorSystem;
        this.driverThread = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setNameFormat(reActorSystem.getLocalReActorSystemId().getReActorSystemName() + "-" + String.valueOf(getChannelId()) + "-" + getClass().getSimpleName() + "-driver-%d").setUncaughtExceptionHandler((thread, th) -> {
            reActorSystem.logError("Uncaught error in driver thread {} ", thread.getName(), th);
        }).build());
        ScheduledExecutorService systemSchedulingService = reActorSystem.getSystemSchedulingService();
        Cache<Long, CompletableFuture<DeliveryStatus>> cache = this.pendingAcksTriggers;
        Objects.requireNonNull(cache);
        this.cacheMaintenanceTask = systemSchedulingService.scheduleWithFixedDelay(cache::cleanUp, getDriverConfig().getAckCacheCleanupInterval().toMillis(), getDriverConfig().getAckCacheCleanupInterval().toMillis(), TimeUnit.MILLISECONDS);
        Try<Void> r0 = (Try) CompletableFuture.runAsync(() -> {
            REACTOR_SYSTEM_CTX.set(new DriverCtx(reActorSystem, this));
        }, this.driverThread).thenApplyAsync(r5 -> {
            return Try.ofRunnable(() -> {
                initDriverLoop(reActorSystem);
            });
        }, (Executor) this.driverThread).join();
        r0.ifSuccessOrElse(r52 -> {
            CompletableFuture.supplyAsync(() -> {
                return Try.ofRunnable(getDriverLoop()).peekFailure(th2 -> {
                    LOGGER.error("Driver body failed:", th2);
                }).ifError(th3 -> {
                    stopDriverCtx(reActorSystem);
                });
            }, this.driverThread);
        }, th2 -> {
            LOGGER.error("Driver {} init failed", getClass().getSimpleName(), th2);
            stopDriverCtx(reActorSystem);
        });
        return r0;
    }

    public CompletionStage<Try<Void>> stopDriverCtx(ReActorSystem reActorSystem) {
        ((ScheduledFuture) Objects.requireNonNull(this.cacheMaintenanceTask)).cancel(true);
        ((ExecutorService) Objects.requireNonNull(this.driverThread)).shutdownNow();
        return cleanDriverLoop();
    }

    @Nullable
    public static DriverCtx getDriverCtx() {
        return REACTOR_SYSTEM_CTX.get();
    }

    public ReActorSystem getLocalReActorSystem() {
        return (ReActorSystem) Objects.requireNonNull(this.localReActorSystem);
    }

    public static boolean isLocalReActorSystem(ReActorSystemId reActorSystemId, ReActorSystemId reActorSystemId2) {
        return reActorSystemId.equals(reActorSystemId2);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static boolean isMessageComingFromLocalReActorSystem(ReActorSystemId reActorSystemId, ReActorSystemId reActorSystemId2) {
        return reActorSystemId.equals(reActorSystemId2);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static DeliveryStatus sendDeliveryAck(ReActorSystem reActorSystem, ChannelId channelId, DeliveryStatus deliveryStatus, long j, ReActorSystemId reActorSystemId) {
        DeliveryStatusUpdate deliveryStatusUpdate = new DeliveryStatusUpdate(j, deliveryStatus, reActorSystem.getLocalReActorSystemId(), channelId);
        ReActorSystemRef findGate = reActorSystem.findGate(reActorSystemId, channelId);
        if (findGate == null) {
            findGate = NullReActorSystemRef.NULL_REACTOR_SYSTEM_REF;
        }
        return new ReActorRef(ReActorId.NO_REACTOR_ID, findGate).tell(ReActorRef.NO_REACTOR_REF, deliveryStatusUpdate);
    }

    private static void expireOnTimeout(RemovalNotification<Long, CompletableFuture<DeliveryStatus>> removalNotification) {
        if (removalNotification.getCause() != RemovalCause.EXPLICIT) {
            ((CompletableFuture) removalNotification.getValue()).completeExceptionally(new TimeoutException());
        }
    }
}
