package com.cloudimpl.cluster4j.coreImpl;

import com.cloudimpl.cluster4j.common.CloudMessage;
import com.cloudimpl.cluster4j.core.CloudFunction;
import com.cloudimpl.cluster4j.core.CloudServiceDescriptor;
import com.cloudimpl.cluster4j.core.CloudUtil;
import com.cloudimpl.cluster4j.core.Injector;
import com.cloudimpl.cluster4j.logger.Logger;
import com.cloudimpl.cluster4j.node.NodeConfig;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/cloudimpl/cluster4j/coreImpl/CloudEngineImpl.class */
public class CloudEngineImpl implements CloudEngine {
    private final CloudRouterRepository routerRepository;
    private final Logger rootLogger;
    private final Injector injector;
    private final CloudServiceRegistry serviceRegistry;
    private final String id;
    private final CorrelationIdGenerator idGen;
    private final NodeConfig config;

    public CloudEngineImpl(String str, Injector injector, NodeConfig nodeConfig) {
        this.injector = injector;
        this.id = str;
        this.idGen = new CorrelationIdGenerator(str);
        this.config = nodeConfig;
        this.rootLogger = (Logger) injector.inject(Logger.class);
        this.serviceRegistry = new CloudServiceRegistry(this.rootLogger);
        injector.bind(CloudServiceRegistry.class).to(this.serviceRegistry);
        injector.bind(Logger.class).to(this.rootLogger);
        injector.bind(Injector.class).to(injector);
        this.routerRepository = (CloudRouterRepository) injector.inject(CloudRouterRepository.class);
        this.routerRepository.subscribe(this.serviceRegistry.flux());
        registerHandlers();
    }

    @Override // com.cloudimpl.cluster4j.coreImpl.CloudEngine
    public String id() {
        return this.id;
    }

    @Override // com.cloudimpl.cluster4j.coreImpl.CloudEngine
    public <T> Mono<T> requestReply(String str, Object obj) {
        try {
            CloudMessage buildMsg = buildMsg(str, obj);
            return this.routerRepository.router(str).route(buildMsg).flatMap(cloudService -> {
                return cloudService.requestReply(buildMsg.withAttr(CloudMsgHdr.SERVICE_ID, cloudService.id()));
            });
        } catch (Exception e) {
            return Mono.error(e);
        }
    }

    @Override // com.cloudimpl.cluster4j.coreImpl.CloudEngine
    public <T> Flux<T> requestStream(String str, Object obj) {
        try {
            CloudMessage buildMsg = buildMsg(str, obj);
            return this.routerRepository.router(str).route(buildMsg).flatMapMany(cloudService -> {
                return cloudService.requestStream(buildMsg.withAttr(CloudMsgHdr.SERVICE_ID, cloudService.id()));
            });
        } catch (Exception e) {
            return Flux.error(e);
        }
    }

    @Override // com.cloudimpl.cluster4j.coreImpl.CloudEngine
    public Mono<Void> send(String str, Object obj) {
        try {
            CloudMessage buildMsg = buildMsg(str, obj);
            return this.routerRepository.router(str).route(buildMsg).flatMap(cloudService -> {
                return cloudService.send(buildMsg.withAttr(CloudMsgHdr.SERVICE_ID, cloudService.id()));
            });
        } catch (Exception e) {
            return Mono.error(e);
        }
    }

    @Override // com.cloudimpl.cluster4j.coreImpl.CloudEngine
    public CloudServiceRegistry getServiceRegistry() {
        return this.serviceRegistry;
    }

    @Override // com.cloudimpl.cluster4j.coreImpl.CloudEngine
    public void registerService(String str, CloudFunction cloudFunction) {
        this.serviceRegistry.register(new LocalCloudService(() -> {
            return this.id;
        }, this.injector, CloudServiceDescriptor.builder().withFunctionType(cloudFunction.getFunctionType()).withInputType(cloudFunction.getInputType()).withName(str).withRouterDescriptor(cloudFunction.getRouterDesc()).withServiceId(this.idGen.nextCid()).withServicePort(this.config.getNodePort()).withHostAddress(CloudUtil.getHostIpAddr()).build()));
    }

    private CloudMessage buildMsg(String str, Object obj) {
        return obj instanceof CloudMessage ? CloudMessage.builder().from((CloudMessage) obj).withAttr(CloudMsgHdr.TOPIC, str).build() : CloudMessage.builder().withData(obj).withAttr(CloudMsgHdr.TOPIC, str).build();
    }

    private void registerHandlers() {
        this.injector.nameBind("RRHnd", this::requestReply);
        this.injector.nameBind("RSHnd", this::requestStream);
    }
}
