package io.reacted.flow.operators;

import io.reacted.core.config.reactors.ReActorConfig;
import io.reacted.core.config.reactors.ServiceConfig;
import io.reacted.core.mailboxes.BackpressuringMbox;
import io.reacted.core.messages.reactors.DeliveryStatus;
import io.reacted.core.messages.reactors.ReActorInit;
import io.reacted.core.messages.reactors.ReActorStop;
import io.reacted.core.reactors.ReActions;
import io.reacted.core.reactors.ReActor;
import io.reacted.core.reactorsystem.ReActorContext;
import io.reacted.core.reactorsystem.ReActorRef;
import io.reacted.core.reactorsystem.ReActorSystem;
import io.reacted.core.services.GateSelectorPolicies;
import io.reacted.core.utils.ReActedUtils;
import io.reacted.flow.operators.FlowOperatorConfig;
import io.reacted.flow.operators.FlowOperatorConfig.Builder;
import io.reacted.flow.operators.messages.OperatorInitComplete;
import io.reacted.patterns.Try;
import java.io.Serializable;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;

/* loaded from: input_file:io/reacted/flow/operators/FlowOperator.class */
public abstract class FlowOperator<CfgBuilderT extends FlowOperatorConfig.Builder<CfgBuilderT, CfgT>, CfgT extends FlowOperatorConfig<CfgBuilderT, CfgT>> implements ReActor {
    public static final Collection<? extends Serializable> NO_OUTPUT = List.of();
    private final CfgT operatorCfg;
    private final ReActorConfig routeeCfg;
    private ScheduledFuture<?> operatorsRefreshTask;
    private boolean shallAwakeInputStreams = true;
    private Collection<ReActorRef> ifPredicateOutputOperatorsRefs = List.of();
    private Collection<ReActorRef> thenElseOutputOperatorsRefs = List.of();
    private final ReActions operatorReactions = ReActions.newBuilder().reAct(RefreshOperatorRequest.class, (reActorContext, refreshOperatorRequest) -> {
        onRefreshOperatorRequest(reActorContext);
    }).reAct(ReActorInit.class, this::onInit).reAct(ReActorStop.class, this::onStop).reAct(OperatorOutputGatesUpdate.class, this::onServiceGatesUpdate).reAct(this::onNext).build();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/reacted/flow/operators/FlowOperator$OperatorOutputGatesUpdate.class */
    public static final class OperatorOutputGatesUpdate extends Record implements Serializable {
        private final Collection<ReActorRef> ifPredicateServices;
        private final Collection<ReActorRef> thenElseServices;

        private OperatorOutputGatesUpdate(Collection<ReActorRef> collection, Collection<ReActorRef> collection2) {
            this.ifPredicateServices = collection;
            this.thenElseServices = collection2;
        }

        @Override // java.lang.Record
        public String toString() {
            return "OperatorOutputGatesUpdate{ifPredicateServices=" + String.valueOf(this.ifPredicateServices) + ", thenElseServices=" + String.valueOf(this.thenElseServices) + "}";
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, OperatorOutputGatesUpdate.class), OperatorOutputGatesUpdate.class, "ifPredicateServices;thenElseServices", "FIELD:Lio/reacted/flow/operators/FlowOperator$OperatorOutputGatesUpdate;->ifPredicateServices:Ljava/util/Collection;", "FIELD:Lio/reacted/flow/operators/FlowOperator$OperatorOutputGatesUpdate;->thenElseServices:Ljava/util/Collection;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, OperatorOutputGatesUpdate.class, Object.class), OperatorOutputGatesUpdate.class, "ifPredicateServices;thenElseServices", "FIELD:Lio/reacted/flow/operators/FlowOperator$OperatorOutputGatesUpdate;->ifPredicateServices:Ljava/util/Collection;", "FIELD:Lio/reacted/flow/operators/FlowOperator$OperatorOutputGatesUpdate;->thenElseServices:Ljava/util/Collection;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public Collection<ReActorRef> ifPredicateServices() {
            return this.ifPredicateServices;
        }

        public Collection<ReActorRef> thenElseServices() {
            return this.thenElseServices;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/reacted/flow/operators/FlowOperator$RefreshOperatorRequest.class */
    public static class RefreshOperatorRequest implements Serializable {
        private RefreshOperatorRequest() {
        }

        public String toString() {
            return "RefreshOperatorRequest{}";
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FlowOperator(CfgT cfgt) {
        this.operatorCfg = (CfgT) Objects.requireNonNull(cfgt, "Operator Config cannot be null");
        this.routeeCfg = (ReActorConfig) Objects.requireNonNull(cfgt.getRouteeConfig());
    }

    @Nonnull
    /* renamed from: getConfig, reason: merged with bridge method [inline-methods] */
    public ReActorConfig m2getConfig() {
        return this.routeeCfg;
    }

    @Nonnull
    public ReActions getReActions() {
        return this.operatorReactions;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isShallAwakeInputStreams() {
        return this.shallAwakeInputStreams;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CfgT getOperatorCfg() {
        return this.operatorCfg;
    }

    protected static Try<ReActorRef> of(ReActorSystem reActorSystem, ServiceConfig serviceConfig) {
        return reActorSystem.spawnService(serviceConfig);
    }

    protected abstract CompletionStage<Collection<? extends Serializable>> onNext(Serializable serializable, ReActorContext reActorContext);

    protected void onLinkError(Throwable th, ReActorContext reActorContext, Serializable serializable) {
        reActorContext.logError("Unable to pass {} to the next stage", new Serializable[]{serializable, th});
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onInit(ReActorContext reActorContext, ReActorInit reActorInit) {
        BackpressuringMbox.toBackpressuringMailbox(reActorContext.getMbox()).map(backpressuringMbox -> {
            return backpressuringMbox.addNonDelayableTypes(Set.of(RefreshOperatorRequest.class, OperatorOutputGatesUpdate.class));
        }).filter(backpressuringMbox2 -> {
            return backpressuringMbox2.isDelayable(ReActorInit.class);
        }).ifPresent(backpressuringMbox3 -> {
            backpressuringMbox3.request(1L);
        });
        this.operatorsRefreshTask = reActorContext.getReActorSystem().getSystemSchedulingService().scheduleWithFixedDelay(() -> {
            if (reActorContext.selfTell(new RefreshOperatorRequest()).isSent()) {
                return;
            }
            reActorContext.logError("Unable to request refresh of operator outputs", new Serializable[0]);
        }, 0L, this.operatorCfg.getOutputOperatorsRefreshPeriod().toNanos(), TimeUnit.NANOSECONDS);
    }

    protected void onServiceGatesUpdate(ReActorContext reActorContext, OperatorOutputGatesUpdate operatorOutputGatesUpdate) {
        this.ifPredicateOutputOperatorsRefs = operatorOutputGatesUpdate.ifPredicateServices;
        this.thenElseOutputOperatorsRefs = operatorOutputGatesUpdate.thenElseServices;
        if (isShallAwakeInputStreams()) {
            this.shallAwakeInputStreams = false;
            broadcastOperatorInitializationComplete(reActorContext);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void broadcastOperatorInitializationComplete(ReActorContext reActorContext) {
        reActorContext.getReActorSystem().broadcastToLocalSubscribers(reActorContext.getSelf(), new OperatorInitComplete(this.operatorCfg.getFlowName(), this.operatorCfg.getReActorName(), reActorContext.getSelf().getReActorId().getReActorName()));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onStop(ReActorContext reActorContext, ReActorStop reActorStop) {
        this.operatorsRefreshTask.cancel(true);
    }

    protected void onRefreshOperatorRequest(ReActorContext reActorContext) {
        ReActedUtils.resolveServices(this.operatorCfg.getIfPredicateOutputOperators(), reActorContext.getReActorSystem(), GateSelectorPolicies.RANDOM_GATE, reActorContext.getSelf().getReActorId().toString()).thenCombine(ReActedUtils.resolveServices(this.operatorCfg.getThenElseOutputOperators(), reActorContext.getReActorSystem(), GateSelectorPolicies.RANDOM_GATE, reActorContext.getSelf().getReActorId().toString()), (v1, v2) -> {
            return new OperatorOutputGatesUpdate(v1, v2);
        }).thenAccept(operatorOutputGatesUpdate -> {
            if (isUpdateMatchingRequest(this.operatorCfg.getIfPredicateOutputOperators().size(), this.operatorCfg.getThenElseOutputOperators().size(), operatorOutputGatesUpdate)) {
                reActorContext.selfTell(operatorOutputGatesUpdate);
            }
        });
    }

    private void onNext(ReActorContext reActorContext, Serializable serializable) {
        backpressuredPropagation(onNext(serializable, reActorContext), serializable, reActorContext);
    }

    protected CompletionStage<Void> backpressuredPropagation(CompletionStage<Collection<? extends Serializable>> completionStage, Serializable serializable, ReActorContext reActorContext) {
        return propagate(completionStage, serializable, reActorContext).thenAccept(deliveryStatus -> {
            reActorContext.getMbox().request(1L);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletionStage<DeliveryStatus> propagate(CompletionStage<Collection<? extends Serializable>> completionStage, Serializable serializable, ReActorContext reActorContext) {
        Consumer consumer = th -> {
            onFailedDelivery(th, reActorContext, serializable);
        };
        return completionStage.thenCompose(collection -> {
            return (CompletionStage) routeOutputMessageAfterFiltering(collection).entrySet().stream().map(entry -> {
                return forwardToOperators(consumer, (Collection) entry.getValue(), reActorContext, (Collection) entry.getKey());
            }).reduce((completionStage2, completionStage3) -> {
                return ReActedUtils.composeDeliveries(completionStage2, completionStage3, consumer);
            }).orElse(CompletableFuture.completedStage(DeliveryStatus.DELIVERED));
        });
    }

    protected Map<Collection<ReActorRef>, ? extends Collection<? extends Serializable>> routeOutputMessageAfterFiltering(Collection<? extends Serializable> collection) {
        return (Map) collection.stream().collect(Collectors.groupingBy(serializable -> {
            return this.operatorCfg.getIfPredicate().test(serializable) ? this.ifPredicateOutputOperatorsRefs : this.thenElseOutputOperatorsRefs;
        }));
    }

    protected CompletionStage<DeliveryStatus> forwardToOperators(Consumer<Throwable> consumer, Collection<? extends Serializable> collection, ReActorContext reActorContext, Collection<ReActorRef> collection2) {
        return (CompletionStage) collection.stream().flatMap(serializable -> {
            return collection2.stream().map(reActorRef -> {
                return reActorRef.atell(reActorContext.getSelf(), serializable);
            });
        }).reduce((completionStage, completionStage2) -> {
            return ReActedUtils.composeDeliveries(completionStage, completionStage2, consumer);
        }).orElseGet(() -> {
            return CompletableFuture.completedFuture(DeliveryStatus.DELIVERED);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <InputT extends Serializable> DeliveryStatus onFailedDelivery(Throwable th, ReActorContext reActorContext, InputT inputt) {
        onLinkError(th, reActorContext, inputt);
        return DeliveryStatus.NOT_DELIVERED;
    }

    private static boolean isUpdateMatchingRequest(int i, int i2, OperatorOutputGatesUpdate operatorOutputGatesUpdate) {
        return operatorOutputGatesUpdate.ifPredicateServices.size() == i && operatorOutputGatesUpdate.thenElseServices.size() == i2;
    }
}
