package com.cloudimpl.cluster4j.node;

import com.cloudimpl.cluster4j.common.CloudMessage;
import com.cloudimpl.cluster4j.common.EndpointListener;
import com.cloudimpl.cluster4j.common.GsonCodec;
import com.cloudimpl.cluster4j.common.RouteEndpoint;
import com.cloudimpl.cluster4j.common.TransportManager;
import com.cloudimpl.cluster4j.core.CloudEngine;
import com.cloudimpl.cluster4j.core.CloudEngineImpl;
import com.cloudimpl.cluster4j.core.CloudFunction;
import com.cloudimpl.cluster4j.core.CloudMsgHdr;
import com.cloudimpl.cluster4j.core.CloudServiceDescriptor;
import com.cloudimpl.cluster4j.core.CloudUtil;
import com.cloudimpl.cluster4j.core.FluxStream;
import com.cloudimpl.cluster4j.core.Injector;
import com.cloudimpl.cluster4j.core.LocalCloudService;
import com.cloudimpl.cluster4j.core.RemoteCloudService;
import com.cloudimpl.cluster4j.core.logger.ILogger;
import com.cloudimpl.cluster4j.logger.Logger;
import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream;
import io.scalecube.cluster.Cluster;
import io.scalecube.cluster.ClusterImpl;
import io.scalecube.cluster.ClusterMessageHandler;
import io.scalecube.cluster.membership.IdGenerator;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.metadata.MetadataDecoder;
import io.scalecube.cluster.metadata.MetadataEncoder;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.MessageCodec;
import io.scalecube.net.Address;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/cloudimpl/cluster4j/node/CloudNode.class */
public class CloudNode {
    private final CloudEngine engine;
    private final NodeConfig config;
    private Cluster gossipCluster;
    private final Injector injector;
    private final TransportManager transportManager;
    private final ILogger logger;
    private final Map<String, String> serviceCache = new ConcurrentHashMap();
    private MetadataEncoder metadataEncoder = MetadataEncoder.INSTANCE;
    private MetadataDecoder metadataDecoder = MetadataDecoder.INSTANCE;
    private final String id = IdGenerator.generateId(10);

    /* loaded from: input_file:com/cloudimpl/cluster4j/node/CloudNode$MessageCodecImpl.class */
    private static class MessageCodecImpl implements MessageCodec {
        private MessageCodecImpl() {
        }

        public Message deserialize(InputStream inputStream) throws Exception {
            return (Message) DefaultObjectMapper.OBJECT_MAPPER.readValue(inputStream, Message.class);
        }

        public void serialize(Message message, OutputStream outputStream) throws Exception {
            DefaultObjectMapper.OBJECT_MAPPER.writeValue(outputStream, message);
        }
    }

    public CloudNode(Injector injector, NodeConfig nodeConfig) {
        this.injector = injector;
        injector.bind("@host").to(CloudUtil.getHostIpAddr());
        injector.bind("@nodeId").to(this.id);
        this.transportManager = new TransportManager(com.cloudimpl.cluster4j.core.MessageCodecImpl.instance());
        this.engine = new CloudEngineImpl(this.id, injector, nodeConfig);
        this.config = nodeConfig;
        this.logger = ((Logger) injector.inject(Logger.class)).createSubLogger(CloudNode.class);
    }

    public <T> Mono<T> requestReply(String str, Object obj) {
        return this.engine.requestReply(str, obj);
    }

    public <T> Flux<T> requestStream(String str, Object obj) {
        return this.engine.requestStream(str, obj);
    }

    public Mono<Void> send(String str, Object obj) {
        return this.engine.send(str, obj);
    }

    public void registerService(String str, CloudFunction cloudFunction) {
        this.engine.registerService(str, cloudFunction);
    }

    public NodeConfig getConfig() {
        return this.config;
    }

    public void start() {
        this.gossipCluster = new ClusterImpl().membership(membershipConfig -> {
            return membershipConfig.seedMembers(this.config.getSeeds());
        }).config(clusterConfig -> {
            return clusterConfig.metadataDecoder(this::decode).metadataEncoder(this::encode).transport(transportConfig -> {
                return transportConfig.port(this.config.getGossipPort()).messageCodec(new MessageCodecImpl());
            });
        }).handler(cluster -> {
            this.gossipCluster = cluster;
            return new ClusterMessageHandler() { // from class: com.cloudimpl.cluster4j.node.CloudNode.1
                public void onMembershipEvent(MembershipEvent membershipEvent) {
                    if (membershipEvent.isUpdated() || membershipEvent.isAdded()) {
                        CloudNode.this.onMemberEvent(membershipEvent);
                    } else if (membershipEvent.isRemoved()) {
                        CloudNode.this.engine.getServiceRegistry().unregisterByNodeId(membershipEvent.member().id());
                    }
                }
            };
        }).startAwait();
        publishServices();
        startServices();
    }

    private void startServices() {
        this.transportManager.createEndpoint(CloudUtil.getHostIpAddr(), this.config.getNodePort(), new EndpointListener() { // from class: com.cloudimpl.cluster4j.node.CloudNode.2
            public Mono<Void> fireAndForget(Mono<CloudMessage> mono) {
                return mono.flatMap(cloudMessage -> {
                    return CloudNode.this.engine.getServiceRegistry().findLocal(cloudMessage.attr(CloudMsgHdr.SERVICE_ID)).send(cloudMessage);
                });
            }

            public Mono<CloudMessage> requestResponse(Mono<CloudMessage> mono) {
                return mono.flatMap(cloudMessage -> {
                    return CloudNode.this.engine.getServiceRegistry().findLocal(cloudMessage.attr(CloudMsgHdr.SERVICE_ID)).requestReply(cloudMessage);
                }).map(obj -> {
                    return CloudMessage.builder().withData(obj).build();
                });
            }

            public Flux<CloudMessage> requestStream(Mono<CloudMessage> mono) {
                return mono.flatMapMany(cloudMessage -> {
                    return CloudNode.this.engine.getServiceRegistry().findLocal(cloudMessage.attr(CloudMsgHdr.SERVICE_ID)).requestStream(cloudMessage);
                }).map(obj -> {
                    return CloudMessage.builder().withData(obj).build();
                });
            }
        });
    }

    private void publishServices() {
        this.engine.getServiceRegistry().localFlux().filter(event -> {
            return event.getType() == FluxStream.Event.Type.ADD || event.getType() == FluxStream.Event.Type.UPDATE;
        }).map(event2 -> {
            return ((LocalCloudService) event2.getValue()).getDescriptor();
        }).doOnNext(cloudServiceDescriptor -> {
            this.logger.info("local service update : {0}", new Object[]{cloudServiceDescriptor});
        }).doOnNext(cloudServiceDescriptor2 -> {
            this.serviceCache.put("srv_" + cloudServiceDescriptor2.getServiceId(), cloudServiceDescriptor2.toString());
        }).doOnNext(cloudServiceDescriptor3 -> {
            this.gossipCluster.updateMetadata(this.serviceCache).subscribe();
        }).doOnError(th -> {
            this.logger.exception(th, "error updating membership service", new Object[0]);
        }).subscribe();
        this.engine.getServiceRegistry().localFlux().filter(event3 -> {
            return event3.getType() == FluxStream.Event.Type.REMOVE;
        }).doOnNext(event4 -> {
            this.serviceCache.remove("srv_" + ((LocalCloudService) event4.getValue()).id());
        }).doOnNext(event5 -> {
            this.logger.info("local service removed : {0}", new Object[]{((LocalCloudService) event5.getValue()).getDescriptor()});
        }).doOnNext(event6 -> {
            this.gossipCluster.updateMetadata(this.serviceCache).subscribe();
        }).subscribe();
    }

    private void onMemberEvent(MembershipEvent membershipEvent) {
        try {
            this.logger.info("member event {0} received", new Object[]{membershipEvent});
            String id = membershipEvent.member().id();
            Optional metadata = this.gossipCluster.metadata(membershipEvent.member());
            if (metadata.isPresent()) {
                Map map = (Map) metadata.get();
                HashSet hashSet = new HashSet(map.keySet());
                this.engine.getServiceRegistry().services().filter(cloudService -> {
                    return cloudService.nodeId().equals(id);
                }).forEach(cloudService2 -> {
                    if (map.containsKey("srv_" + cloudService2.id())) {
                        hashSet.remove(cloudService2.id());
                    } else {
                        this.engine.getServiceRegistry().unregister(cloudService2.id());
                        this.logger.info("remote service removed : {0}", new Object[]{cloudService2.getDescriptor()});
                    }
                });
                hashSet.stream().filter(str -> {
                    return str.startsWith("srv_");
                }).map(str2 -> {
                    return (String) map.get(str2);
                }).map(str3 -> {
                    return (CloudServiceDescriptor) GsonCodec.decode(CloudServiceDescriptor.class, str3);
                }).forEach(cloudServiceDescriptor -> {
                    updateRegistry(id, cloudServiceDescriptor);
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void updateRegistry(String str, CloudServiceDescriptor cloudServiceDescriptor) {
        this.logger.info("remote service registered. {0}", new Object[]{cloudServiceDescriptor});
        Address create = Address.create(cloudServiceDescriptor.getHostAddr(), cloudServiceDescriptor.getServicePort());
        this.engine.getServiceRegistry().register(new RemoteCloudService(str, () -> {
            return this.transportManager.get(RouteEndpoint.create(create.host(), create.port()));
        }, cloudServiceDescriptor));
    }

    private Object decode(ByteBuffer byteBuffer) {
        try {
            return DefaultObjectMapper.OBJECT_MAPPER.readValue(new ByteBufferBackedInputStream(byteBuffer), Map.class);
        } catch (Exception e) {
            this.logger.exception(e, "Failed to read metadata: ", new Object[0]);
            return null;
        }
    }

    private ByteBuffer encode(Object obj) {
        try {
            return ByteBuffer.wrap(DefaultObjectMapper.OBJECT_MAPPER.writeValueAsString((Map) obj).getBytes(StandardCharsets.UTF_8));
        } catch (Exception e) {
            this.logger.exception(e, "Failed to write metadata: ", new Object[0]);
            throw Exceptions.propagate(e);
        }
    }
}
