package com.cloudimpl.cluster4j.node;

import com.cloudimpl.cluster4j.common.CloudMessage;
import com.cloudimpl.cluster4j.common.EndpointListener;
import com.cloudimpl.cluster4j.core.CloudMsgHdr;
import com.cloudimpl.cluster4j.core.CloudServiceRegistry;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/cloudimpl/cluster4j/node/NodeServieListener.class */
public class NodeServieListener implements EndpointListener {
    private final CloudServiceRegistry registry;

    public NodeServieListener(CloudServiceRegistry cloudServiceRegistry) {
        this.registry = cloudServiceRegistry;
    }

    public Mono<Void> fireAndForget(Mono<CloudMessage> mono) {
        return mono.flatMap(cloudMessage -> {
            String attr = cloudMessage.attr(CloudMsgHdr.SERVICE_ID);
            return attr == null ? Mono.error(new ServiceException("service id not found to route")) : this.registry.findLocal(attr).send(cloudMessage);
        });
    }

    public Mono<CloudMessage> requestResponse(Mono<CloudMessage> mono) {
        return mono.flatMap(cloudMessage -> {
            String attr = cloudMessage.attr(CloudMsgHdr.SERVICE_ID);
            return attr == null ? Mono.error(new ServiceException("service id not found to route")) : this.registry.findLocal(attr).requestReply(cloudMessage);
        });
    }

    public Flux<CloudMessage> requestStream(Mono<CloudMessage> mono) {
        return mono.flatMapMany(cloudMessage -> {
            String attr = cloudMessage.attr(CloudMsgHdr.SERVICE_ID);
            return attr == null ? Mono.error(new ServiceException("service id not found to route")) : this.registry.findLocal(attr).requestStream(cloudMessage);
        });
    }
}
