package io.reacted.core.services;

import io.reacted.core.config.reactors.ReActorConfig;
import io.reacted.core.config.reactors.ServiceConfig;
import io.reacted.core.mailboxes.BackpressuringMbox;
import io.reacted.core.messages.reactors.DeliveryStatus;
import io.reacted.core.messages.reactors.ReActorInit;
import io.reacted.core.messages.reactors.ReActorStop;
import io.reacted.core.messages.reactors.SystemMonitorReport;
import io.reacted.core.messages.serviceregistry.ServiceCancellationRequest;
import io.reacted.core.messages.serviceregistry.ServicePublicationRequest;
import io.reacted.core.messages.serviceregistry.ServicePublicationRequestError;
import io.reacted.core.messages.serviceregistry.ServiceRegistryNotAvailable;
import io.reacted.core.messages.services.ServiceDiscoveryReply;
import io.reacted.core.messages.services.ServiceDiscoveryRequest;
import io.reacted.core.messages.services.ServiceDiscoverySearchFilter;
import io.reacted.core.reactors.ReActions;
import io.reacted.core.reactors.ReActiveEntity;
import io.reacted.core.reactors.ReActor;
import io.reacted.core.reactorsystem.ReActorContext;
import io.reacted.core.reactorsystem.ReActorRef;
import io.reacted.core.reactorsystem.ReActorSystem;
import io.reacted.core.typedsubscriptions.TypedSubscription;
import io.reacted.core.utils.ReActedUtils;
import io.reacted.patterns.NonNullByDefault;
import io.reacted.patterns.Try;
import java.io.Serializable;
import java.util.Comparator;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;

@NonNullByDefault
/* loaded from: input_file:io/reacted/core/services/Service.class */
public class Service implements ReActiveEntity {
    private static final String ROUTEE_REACTIONS_RETRIEVAL_ERROR = "Unable to get routee reactions from specified provider";
    private static final String ROUTEE_SPAWN_ERROR = "Unable to spawn routee";
    private static final String NO_ROUTEE_FOR_SPECIFIED_ROUTER = "No routee found for router {}";
    private static final String REACTOR_SERVICE_NAME_FORMAT = "[%s-%s-%d]";
    private final ServiceConfig serviceConfig;
    private final Properties serviceInfo = new Properties();
    private long msgReceived = 1;

    /* loaded from: input_file:io/reacted/core/services/Service$LoadBalancingPolicy.class */
    public enum LoadBalancingPolicy {
        ROUND_ROBIN { // from class: io.reacted.core.services.Service.LoadBalancingPolicy.1
            @Override // io.reacted.core.services.Service.LoadBalancingPolicy
            public Optional<ReActorRef> selectRoutee(ReActorContext reActorContext, long j) {
                int size = (int) ((j % 2147483647L) % reActorContext.getChildren().size());
                return Try.of(() -> {
                    return reActorContext.getChildren().get(size);
                }).toOptional();
            }
        },
        LOWEST_LOAD { // from class: io.reacted.core.services.Service.LoadBalancingPolicy.2
            @Override // io.reacted.core.services.Service.LoadBalancingPolicy
            public Optional<ReActorRef> selectRoutee(ReActorContext reActorContext, long j) {
                Stream<R> map = reActorContext.getChildren().stream().map((v0) -> {
                    return v0.getReActorId();
                });
                ReActorSystem reActorSystem = reActorContext.getReActorSystem();
                Objects.requireNonNull(reActorSystem);
                return map.map(reActorSystem::getReActor).flatMap((v0) -> {
                    return v0.stream();
                }).min(Comparator.comparingLong(reActorContext2 -> {
                    return reActorContext2.getMbox().getMsgNum();
                })).map((v0) -> {
                    return v0.getSelf();
                });
            }
        };

        abstract Optional<ReActorRef> selectRoutee(ReActorContext reActorContext, long j);
    }

    /* loaded from: input_file:io/reacted/core/services/Service$RouteeReSpawnRequest.class */
    public static class RouteeReSpawnRequest implements Serializable {
        final ReActorConfig routeeConfig;

        public RouteeReSpawnRequest(ReActorConfig reActorConfig) {
            this.routeeConfig = reActorConfig;
        }
    }

    public Service(ServiceConfig serviceConfig) {
        this.serviceConfig = (ServiceConfig) Objects.requireNonNull(serviceConfig);
        this.serviceInfo.put(ServiceDiscoverySearchFilter.FIELD_NAME_SERVICE_NAME, serviceConfig.getReActorName());
    }

    @Override // io.reacted.core.reactors.ReActiveEntity
    @Nonnull
    public ReActions getReActions() {
        return ReActions.newBuilder().reAct((reActorContext, serializable) -> {
            requestNextMessage(reActorContext, serializable, this::routeMessage);
        }).reAct(ServiceRegistryNotAvailable.class, (reActorContext2, serviceRegistryNotAvailable) -> {
            requestNextMessage(reActorContext2, serviceRegistryNotAvailable, this::onServiceRegistryNotAvailable);
        }).reAct(ServiceDiscoveryRequest.class, (reActorContext3, serviceDiscoveryRequest) -> {
            requestNextMessage(reActorContext3, serviceDiscoveryRequest, this::serviceDiscovery);
        }).reAct(RouteeReSpawnRequest.class, (reActorContext4, routeeReSpawnRequest) -> {
            requestNextMessage(reActorContext4, routeeReSpawnRequest, this::respawnRoutee);
        }).reAct(ReActorInit.class, (reActorContext5, reActorInit) -> {
            requestNextMessage(reActorContext5, reActorInit, this::initService);
        }).reAct(ReActorStop.class, (reActorContext6, reActorStop) -> {
            requestNextMessage(reActorContext6, reActorStop, this::stopService);
        }).reAct(ServicePublicationRequestError.class, (reActorContext7, servicePublicationRequestError) -> {
            requestNextMessage(reActorContext7, servicePublicationRequestError, this::onServicePublicationError);
        }).reAct(SystemMonitorReport.class, (reActorContext8, systemMonitorReport) -> {
            requestNextMessage(reActorContext8, systemMonitorReport, this::onSystemInfoReport);
        }).build();
    }

    private void onServiceRegistryNotAvailable(ReActorContext reActorContext, ServiceRegistryNotAvailable serviceRegistryNotAvailable) {
        reActorContext.logInfo("{} makes itself discoverable", this.serviceInfo.getProperty(ServiceDiscoverySearchFilter.FIELD_NAME_SERVICE_NAME));
        reActorContext.addTypedSubscriptions(TypedSubscription.LOCAL.forType(ServiceDiscoveryRequest.class));
    }

    public void onServicePublicationError(ReActorContext reActorContext, ServicePublicationRequestError servicePublicationRequestError) {
        if (this.serviceConfig.isRemoteService()) {
            Try.of(() -> {
                return reActorContext.getReActorSystem().getSystemSchedulingService().schedule(() -> {
                    return sendPublicationRequest(reActorContext, this.serviceInfo);
                }, this.serviceConfig.getServiceRepublishReattemptDelayOnError().toMillis(), TimeUnit.MILLISECONDS);
            }).peekFailure(th -> {
                reActorContext.logError("Unable to reschedule service publication", th);
            }).ifError(th2 -> {
                reActorContext.getSelf().tell(reActorContext.getSender(), servicePublicationRequestError);
            });
        }
    }

    private void onSystemInfoReport(ReActorContext reActorContext, SystemMonitorReport systemMonitorReport) {
        this.serviceInfo.put(ServiceDiscoverySearchFilter.FIELD_NAME_CPU_LOAD, Double.valueOf(systemMonitorReport.getCpuLoad()));
        this.serviceInfo.put(ServiceDiscoverySearchFilter.FIELD_NAME_FREE_MEMORY_SIZE, Long.valueOf(systemMonitorReport.getFreeMemorySize()));
        updateServiceRegistry(reActorContext, this.serviceInfo);
    }

    private void stopService(ReActorContext reActorContext, ReActorStop reActorStop) {
        reActorContext.getReActorSystem().getSystemRemotingRoot().tell(reActorContext.getSelf(), new ServiceCancellationRequest(reActorContext.getReActorSystem().getLocalReActorSystemId(), this.serviceConfig.getReActorName()));
    }

    private void initService(ReActorContext reActorContext, ReActorInit reActorInit) {
        reActorContext.addTypedSubscriptions(TypedSubscription.LOCAL.forType(SystemMonitorReport.class));
        if (BackpressuringMbox.class.isAssignableFrom(reActorContext.getMbox().getClass())) {
            BackpressuringMbox backpressuringMbox = (BackpressuringMbox) reActorContext.getMbox();
            backpressuringMbox.setNotDelayedMessageTypes(getNonDelayedMessageTypes(backpressuringMbox.getNotDelayedMessageTypes()));
        }
        for (int i = 0; i < this.serviceConfig.getRouteesNum(); i++) {
            try {
                ReActor reActor = (ReActor) Objects.requireNonNull((ReActor) this.serviceConfig.getRouteeProvider().get());
                ReActorConfig config = reActor.getConfig();
                spawnRoutee(reActorContext, reActor.getReActions(), ReActorConfig.fromConfig(config).setReActorName(String.format(REACTOR_SERVICE_NAME_FORMAT, this.serviceConfig.getReActorName(), config.getReActorName(), Integer.valueOf(i))).build());
            } catch (Throwable th) {
                reActorContext.logError(ROUTEE_SPAWN_ERROR, th);
            }
        }
        if (this.serviceConfig.isRemoteService()) {
            sendPublicationRequest(reActorContext, this.serviceInfo);
        }
    }

    private void serviceDiscovery(ReActorContext reActorContext, ServiceDiscoveryRequest serviceDiscoveryRequest) {
        if (serviceDiscoveryRequest.getSearchFilter().matches(this.serviceInfo, reActorContext.getSelf())) {
            (serviceDiscoveryRequest.getSearchFilter().getSelectionType() == SelectionType.ROUTED ? Optional.of(reActorContext.getSelf()) : selectRoutee(reActorContext, this.msgReceived)).map(ServiceDiscoveryReply::new).ifPresent(serviceDiscoveryReply -> {
                reActorContext.getSender().tell(reActorContext.getSelf(), serviceDiscoveryReply);
            });
        }
    }

    private void routeMessage(ReActorContext reActorContext, Serializable serializable) {
        long j = this.msgReceived + 1;
        this.msgReceived = j;
        selectRoutee(reActorContext, j).ifPresentOrElse(reActorRef -> {
            reActorRef.tell(reActorContext.getSender(), serializable);
        }, () -> {
            reActorContext.logError(NO_ROUTEE_FOR_SPECIFIED_ROUTER, this.serviceConfig.getReActorName(), new IllegalStateException());
        });
    }

    private Optional<ReActorRef> selectRoutee(ReActorContext reActorContext, long j) {
        return this.serviceConfig.getLoadBalancingPolicy().selectRoutee(reActorContext, j);
    }

    private void respawnRoutee(ReActorContext reActorContext, RouteeReSpawnRequest routeeReSpawnRequest) {
        Try.of(() -> {
            return (ReActor) Objects.requireNonNull((ReActor) this.serviceConfig.getRouteeProvider().get());
        }).peekFailure(th -> {
            reActorContext.logError(ROUTEE_REACTIONS_RETRIEVAL_ERROR, th);
        }).ifSuccess(reActor -> {
            spawnRoutee(reActorContext, reActor.getReActions(), routeeReSpawnRequest.routeeConfig);
        }).ifError(th2 -> {
            reActorContext.logError(ROUTEE_SPAWN_ERROR, th2);
        });
    }

    private void spawnRoutee(ReActorContext reActorContext, ReActions reActions, ReActorConfig reActorConfig) {
        ReActorContext orElseThrow = reActorContext.getReActorSystem().getReActor(((ReActorRef) reActorContext.spawnChild(reActions, reActorConfig).orElseSneakyThrow()).getReActorId()).orElseThrow();
        orElseThrow.getHierarchyTermination().thenAccept(r9 -> {
            if (orElseThrow.isStop()) {
                return;
            }
            reActorContext.getSelf().tell(ReActorRef.NO_REACTOR_REF, new RouteeReSpawnRequest(reActorConfig));
        });
    }

    private void updateServiceRegistry(ReActorContext reActorContext, Properties properties) {
        if (this.serviceConfig.isRemoteService()) {
            ReActedUtils.ifNotDelivered(sendPublicationRequest(reActorContext, properties), th -> {
                reActorContext.logError("Unable to refresh service info {}", properties.getProperty(ServiceDiscoverySearchFilter.FIELD_NAME_SERVICE_NAME), th);
            });
        }
    }

    private static CompletionStage<Try<DeliveryStatus>> sendPublicationRequest(ReActorContext reActorContext, Properties properties) {
        return reActorContext.getReActorSystem().getSystemRemotingRoot().tell(reActorContext.getSelf(), new ServicePublicationRequest(reActorContext.getSelf(), properties));
    }

    private static <PayloadT extends Serializable> void requestNextMessage(ReActorContext reActorContext, PayloadT payloadt, BiConsumer<ReActorContext, PayloadT> biConsumer) {
        reActorContext.getMbox().request(1L);
        biConsumer.accept(reActorContext, payloadt);
    }

    private static Set<Class<? extends Serializable>> getNonDelayedMessageTypes(Set<Class<? extends Serializable>> set) {
        return (Set) Stream.concat(set.stream(), Stream.of((Object[]) new Class[]{ReActorInit.class, ServiceRegistryNotAvailable.class, ServiceDiscoveryRequest.class, RouteeReSpawnRequest.class, ServicePublicationRequestError.class, SystemMonitorReport.class})).collect(Collectors.toUnmodifiableSet());
    }
}
