package io.reacted.flow.operators.service;

import io.reacted.core.mailboxes.BackpressuringMbox;
import io.reacted.core.messages.reactors.ReActorInit;
import io.reacted.core.messages.reactors.ReActorStop;
import io.reacted.core.reactors.ReActions;
import io.reacted.core.reactorsystem.ReActorContext;
import io.reacted.core.reactorsystem.ReActorRef;
import io.reacted.core.utils.ReActedUtils;
import io.reacted.flow.operators.FlowOperator;
import io.reacted.flow.operators.service.ServiceOperatorConfig;
import io.reacted.patterns.AsyncUtils;
import java.io.Serializable;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/* loaded from: input_file:io/reacted/flow/operators/service/ServiceOperator.class */
public class ServiceOperator extends FlowOperator<ServiceOperatorConfig.Builder, ServiceOperatorConfig> {
    private final ReActions reActions;
    private final ExecutorService executorService;
    private final boolean shallStopExecutorService;

    @Nullable
    private ScheduledFuture<?> serviceRefreshTask;
    private ReActorRef service;
    private boolean serviceInitializationMissing;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/reacted/flow/operators/service/ServiceOperator$RefreshServiceRequest.class */
    public static class RefreshServiceRequest implements Serializable {
        private RefreshServiceRequest() {
        }

        public String toString() {
            return "RefreshServiceRequest{}";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/reacted/flow/operators/service/ServiceOperator$RefreshServiceUpdate.class */
    public static final class RefreshServiceUpdate extends Record implements Serializable {
        private final Collection<ReActorRef> serviceGates;

        private RefreshServiceUpdate(Collection<ReActorRef> collection) {
            this.serviceGates = collection;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, RefreshServiceUpdate.class), RefreshServiceUpdate.class, "serviceGates", "FIELD:Lio/reacted/flow/operators/service/ServiceOperator$RefreshServiceUpdate;->serviceGates:Ljava/util/Collection;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, RefreshServiceUpdate.class), RefreshServiceUpdate.class, "serviceGates", "FIELD:Lio/reacted/flow/operators/service/ServiceOperator$RefreshServiceUpdate;->serviceGates:Ljava/util/Collection;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, RefreshServiceUpdate.class, Object.class), RefreshServiceUpdate.class, "serviceGates", "FIELD:Lio/reacted/flow/operators/service/ServiceOperator$RefreshServiceUpdate;->serviceGates:Ljava/util/Collection;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public Collection<ReActorRef> serviceGates() {
            return this.serviceGates;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ServiceOperator(ServiceOperatorConfig serviceOperatorConfig) {
        super(serviceOperatorConfig);
        this.serviceInitializationMissing = true;
        this.executorService = serviceOperatorConfig.getExecutorService().orElseGet(Executors::newSingleThreadExecutor);
        this.shallStopExecutorService = serviceOperatorConfig.getExecutorService().isEmpty();
        this.reActions = ReActions.newBuilder().from(super.getReActions()).reAct(ReActorInit.class, this::onServiceOperatorInit).reAct(ReActorStop.class, this::onServiceOperatorStop).reAct(RefreshServiceRequest.class, (reActorContext, refreshServiceRequest) -> {
            onRefreshServiceRequest(reActorContext);
        }).reAct(RefreshServiceUpdate.class, this::onRefreshServiceUpdate).reAct(serviceOperatorConfig.getServiceReplyType(), this::onReply).build();
    }

    @Override // io.reacted.flow.operators.FlowOperator
    @Nonnull
    public ReActions getReActions() {
        return this.reActions;
    }

    @Override // io.reacted.flow.operators.FlowOperator
    protected final CompletionStage<Collection<? extends Serializable>> onNext(Serializable serializable, ReActorContext reActorContext) {
        return AsyncUtils.asyncForeach(serializable2 -> {
            return this.service.atell(reActorContext.getSelf(), serializable2);
        }, getOperatorCfg().getToServiceRequests().apply(serializable).iterator(), th -> {
            onFailedDelivery(th, reActorContext, serializable);
        }, this.executorService).thenAccept(r5 -> {
            reActorContext.getMbox().request(1L);
        }).thenApply(r2 -> {
            return FlowOperator.NO_OUTPUT;
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.reacted.flow.operators.FlowOperator
    public void broadcastOperatorInitializationComplete(ReActorContext reActorContext) {
        if (this.serviceInitializationMissing) {
            return;
        }
        super.broadcastOperatorInitializationComplete(reActorContext);
    }

    private void onServiceOperatorInit(ReActorContext reActorContext, ReActorInit reActorInit) {
        super.onInit(reActorContext, reActorInit);
        BackpressuringMbox.toBackpressuringMailbox(reActorContext.getMbox()).ifPresent(backpressuringMbox -> {
            backpressuringMbox.addNonDelayableTypes(Set.of(RefreshServiceRequest.class));
        });
        this.serviceRefreshTask = reActorContext.getReActorSystem().getSystemSchedulingService().scheduleWithFixedDelay(() -> {
            if (reActorContext.selfTell(new RefreshServiceRequest()).isSent()) {
                return;
            }
            reActorContext.logError("Unable to request refresh of service operators", new Serializable[0]);
        }, 0L, getOperatorCfg().getServiceRefreshPeriod().toNanos(), TimeUnit.NANOSECONDS);
    }

    private void onServiceOperatorStop(ReActorContext reActorContext, ReActorStop reActorStop) {
        super.onStop(reActorContext, reActorStop);
        if (this.serviceRefreshTask != null) {
            this.serviceRefreshTask.cancel(true);
        }
        if (this.shallStopExecutorService) {
            this.executorService.shutdownNow();
        }
    }

    private void onRefreshServiceRequest(ReActorContext reActorContext) {
        ReActedUtils.resolveServices(List.of(getOperatorCfg().getServiceSearchFilter()), reActorContext.getReActorSystem(), getOperatorCfg().getGateSelector(), reActorContext.getSelf().getReActorId().toString()).thenAccept(list -> {
            if (list.isEmpty()) {
                return;
            }
            reActorContext.selfTell(new RefreshServiceUpdate(list));
        });
    }

    private void onRefreshServiceUpdate(ReActorContext reActorContext, RefreshServiceUpdate refreshServiceUpdate) {
        this.service = refreshServiceUpdate.serviceGates.iterator().next();
        if (this.serviceInitializationMissing) {
            this.serviceInitializationMissing = false;
            if (isShallAwakeInputStreams()) {
                super.broadcastOperatorInitializationComplete(reActorContext);
            }
        }
    }

    private <PayloadT extends Serializable> void onReply(ReActorContext reActorContext, PayloadT payloadt) {
        propagate(CompletableFuture.supplyAsync(() -> {
            return getOperatorCfg().getFromServiceResponse().apply(payloadt);
        }, this.executorService), payloadt, reActorContext);
    }
}
