package io.reacted.flow;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.reacted.core.config.reactors.ReActorServiceConfig;
import io.reacted.core.mailboxes.BackpressuringMbox;
import io.reacted.core.messages.reactors.ReActorInit;
import io.reacted.core.messages.reactors.ReActorStop;
import io.reacted.core.reactors.ReActions;
import io.reacted.core.reactors.ReActiveEntity;
import io.reacted.core.reactorsystem.ReActorContext;
import io.reacted.core.reactorsystem.ReActorRef;
import io.reacted.core.reactorsystem.ReActorSystem;
import io.reacted.flow.operators.FlowOperatorConfig;
import io.reacted.flow.operators.messages.OperatorInitComplete;
import io.reacted.patterns.AsyncUtils;
import io.reacted.patterns.ObjectUtils;
import io.reacted.patterns.Try;
import io.reacted.patterns.UnChecked;
import java.io.Serializable;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;

/* loaded from: input_file:io/reacted/flow/GraphController.class */
class GraphController implements ReActiveEntity {
    private final Map<String, ? extends FlowOperatorConfig<? extends FlowOperatorConfig.Builder<?, ?>, ? extends FlowOperatorConfig<?, ?>>> operatorsCfgsByName;
    private final Map<String, ReActorRef> operatorNameToOperator;
    private final String flowName;
    private final CompletableFuture<Try<Void>> completeOnInitComplete;
    private boolean inputStreamsHaveBeenInited = false;
    private final Map<String, Integer> operatorToInitedRoutees = new HashMap();
    private final ReActions reActions = ReActions.newBuilder().reAct(ReActorInit.class, (reActorContext, reActorInit) -> {
        onInit(reActorContext);
    }).reAct(InitInputStreams.class, (reActorContext2, initInputStreams) -> {
        onInitInputStreams(reActorContext2);
    }).reAct(ReActorStop.class, (reActorContext3, reActorStop) -> {
        onStop();
    }).reAct(OperatorInitComplete.class, this::onOperatorInitComplete).build();
    private final List<ExecutorService> inputStreamProcessors = new LinkedList();

    /* loaded from: input_file:io/reacted/flow/GraphController$InitInputStreams.class */
    private static final class InitInputStreams extends Record implements Serializable {
        private InitInputStreams() {
        }

        @Override // java.lang.Record
        public String toString() {
            return "InitInputStreams{}";
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, InitInputStreams.class), InitInputStreams.class, "").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, InitInputStreams.class, Object.class), InitInputStreams.class, "").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public GraphController(String str, Collection<? extends FlowOperatorConfig<? extends FlowOperatorConfig.Builder<?, ?>, ? extends FlowOperatorConfig<?, ?>>> collection, CompletableFuture<Try<Void>> completableFuture) {
        this.completeOnInitComplete = completableFuture;
        this.operatorsCfgsByName = (Map) ((Collection) ObjectUtils.requiredCondition((Collection) Objects.requireNonNull(collection), collection2 -> {
            return !collection2.isEmpty();
        }, () -> {
            return new IllegalArgumentException("Operators cannot be empty");
        })).stream().collect(Collectors.toUnmodifiableMap((v0) -> {
            return v0.getReActorName();
        }, Function.identity()));
        this.operatorNameToOperator = new ConcurrentHashMap(collection.size(), 0.5f);
        this.flowName = str;
    }

    @Nonnull
    public ReActions getReActions() {
        return this.reActions;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<String, ReActorRef> getOperatorsByName() {
        return this.operatorNameToOperator;
    }

    private void onOperatorInitComplete(ReActorContext reActorContext, OperatorInitComplete operatorInitComplete) {
        if (Objects.equals(this.flowName, operatorInitComplete.getFlowName())) {
            this.operatorToInitedRoutees.compute(operatorInitComplete.getOperatorName(), (str, num) -> {
                return Integer.valueOf(num == null ? 1 : 1 + num.intValue());
            });
            if (!this.operatorsCfgsByName.entrySet().stream().allMatch(entry -> {
                return this.operatorToInitedRoutees.getOrDefault(entry.getKey(), 0).intValue() >= ((FlowOperatorConfig) entry.getValue()).getRouteesNum();
            }) || this.inputStreamsHaveBeenInited) {
                return;
            }
            this.inputStreamsHaveBeenInited = true;
            reActorContext.selfPublish(new InitInputStreams());
        }
    }

    private void onInit(ReActorContext reActorContext) {
        BackpressuringMbox.toBackpressuringMailbox(reActorContext.getMbox()).ifPresent(backpressuringMbox -> {
            backpressuringMbox.addNonDelayableTypes(Set.of(OperatorInitComplete.class));
        });
        for (Map.Entry<String, ? extends FlowOperatorConfig<? extends FlowOperatorConfig.Builder<?, ?>, ? extends FlowOperatorConfig<?, ?>>> entry : this.operatorsCfgsByName.entrySet()) {
            this.operatorNameToOperator.put(entry.getKey(), (ReActorRef) spawnOperator(reActorContext.getReActorSystem(), entry.getValue(), reActorContext.getSelf()).orElseSneakyThrow());
        }
    }

    private void onInitInputStreams(ReActorContext reActorContext) {
        for (Map.Entry<String, ? extends FlowOperatorConfig<? extends FlowOperatorConfig.Builder<?, ?>, ? extends FlowOperatorConfig<?, ?>>> entry : this.operatorsCfgsByName.entrySet()) {
            Iterator<Stream<? extends Serializable>> it = entry.getValue().getInputStreams().iterator();
            while (it.hasNext()) {
                spawnNewStreamConsumer(this.operatorNameToOperator.get(entry.getKey()), it.next(), reActorContext.getReActorSystem(), reActorContext.getSelf().getReActorId().getReActorName(), entry.getValue());
            }
        }
        this.completeOnInitComplete.complete(Try.VOID);
    }

    private void onStop() {
        this.inputStreamProcessors.forEach((v0) -> {
            v0.shutdownNow();
        });
    }

    private void spawnNewStreamConsumer(ReActorRef reActorRef, Stream<? extends Serializable> stream, ReActorSystem reActorSystem, String str, FlowOperatorConfig<?, ?> flowOperatorConfig) {
        ExecutorService spawnNewInputStreamExecutor = spawnNewInputStreamExecutor(reActorSystem, str, flowOperatorConfig.getReActorName());
        this.inputStreamProcessors.add(spawnNewInputStreamExecutor);
        UnChecked.TriConsumer<ReActorSystem, ?, ? super Throwable> inputStreamErrorHandler = flowOperatorConfig.getInputStreamErrorHandler();
        Objects.requireNonNull(reActorRef);
        AsyncUtils.asyncForeach(reActorRef::apublish, stream.iterator(), th -> {
            inputStreamErrorHandler.accept(reActorSystem, flowOperatorConfig, th);
        }, spawnNewInputStreamExecutor).thenAccept(r3 -> {
            spawnNewInputStreamExecutor.shutdownNow();
        });
    }

    private static <CfgBuilderT extends FlowOperatorConfig.Builder<CfgBuilderT, CfgT>, CfgT extends FlowOperatorConfig<CfgBuilderT, CfgT>> Try<ReActorRef> spawnOperator(ReActorSystem reActorSystem, ReActorServiceConfig<? extends ReActorServiceConfig.Builder<?, ?>, ? extends ReActorServiceConfig<?, ?>> reActorServiceConfig, ReActorRef reActorRef) {
        return Try.of(() -> {
            return reActorSystem.spawnService((FlowOperatorConfig) reActorServiceConfig, reActorRef);
        }).flatMap(Try::identity);
    }

    private static ExecutorService spawnNewInputStreamExecutor(ReActorSystem reActorSystem, String str, String str2) {
        ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();
        threadFactoryBuilder.setNameFormat(String.format("InputStreamExecutor-Flow[%s]-Stage[%s]-", str, str2)).setUncaughtExceptionHandler((thread, th) -> {
            reActorSystem.logError("Uncaught exception in {}", new Serializable[]{thread.getName(), th});
        });
        return Executors.newSingleThreadExecutor(threadFactoryBuilder.build());
    }
}
