package com.cloudimpl.cluster4j.common;

import io.rsocket.AbstractRSocket;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.SocketAcceptor;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/cloudimpl/cluster4j/common/TransportManager.class */
public class TransportManager {
    private final Map<RouteEndpoint, Mono<RSocket>> mapConnections = new ConcurrentHashMap();
    private final MessageCodec defaultCodec;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/cloudimpl/cluster4j/common/TransportManager$SocketAcceptorImpl.class */
    public static class SocketAcceptorImpl implements SocketAcceptor {
        private final MessageCodec codec;
        private final EndpointListener listener;

        public SocketAcceptorImpl(MessageCodec messageCodec, EndpointListener endpointListener) {
            this.codec = messageCodec;
            this.listener = endpointListener;
        }

        public Mono<RSocket> accept(ConnectionSetupPayload connectionSetupPayload, RSocket rSocket) {
            System.out.println("socket connected...");
            return Mono.just(new AbstractRSocket() { // from class: com.cloudimpl.cluster4j.common.TransportManager.SocketAcceptorImpl.1
                public Mono<Void> fireAndForget(Payload payload) {
                    return SocketAcceptorImpl.this.listener.fireAndForget(Mono.just(payload).map(this::decode));
                }

                public Mono<Payload> requestResponse(Payload payload) {
                    return SocketAcceptorImpl.this.listener.requestResponse(Mono.just(payload).map(this::decode)).map(this::encode);
                }

                public Flux<Payload> requestStream(Payload payload) {
                    return SocketAcceptorImpl.this.listener.requestStream(Mono.just(payload).map(this::decode)).map(this::encode);
                }

                public Flux<Payload> requestChannel(Publisher<Payload> publisher) {
                    return SocketAcceptorImpl.this.listener.requestChannel(Flux.from(publisher).map(this::decode)).map(this::encode);
                }

                private Payload encode(CloudMessage cloudMessage) {
                    return DefaultPayload.create(SocketAcceptorImpl.this.codec.encode(cloudMessage));
                }

                private CloudMessage decode(Payload payload) {
                    return (CloudMessage) SocketAcceptorImpl.this.codec.decode(CloudMessage.class, payload.sliceData());
                }
            });
        }
    }

    public TransportManager(MessageCodec messageCodec) {
        this.defaultCodec = messageCodec;
    }

    public Mono<RSocket> get(RouteEndpoint routeEndpoint) {
        return get(routeEndpoint, this.defaultCodec);
    }

    public Mono<RSocket> get(RouteEndpoint routeEndpoint, MessageCodec messageCodec) {
        return this.mapConnections.computeIfAbsent(routeEndpoint, this::connect);
    }

    public void createEndpoint(String str, int i, EndpointListener endpointListener) {
        createEndpoint(str, i, this.defaultCodec, endpointListener);
    }

    public void createEndpoint(String str, int i, MessageCodec messageCodec, EndpointListener endpointListener) {
        RSocketFactory.receive().acceptor(new SocketAcceptorImpl(messageCodec, endpointListener)).transport(TcpServerTransport.create(str, i)).start().subscribe();
    }

    private Mono<RSocket> connect(RouteEndpoint routeEndpoint) {
        return connectRemote(routeEndpoint);
    }

    private Mono<RSocket> handleErrors(Mono<RSocket> mono, RouteEndpoint routeEndpoint) {
        return mono.doOnSuccess(rSocket -> {
            System.out.println("Connected successfully on " + routeEndpoint);
            rSocket.onClose().doOnTerminate(() -> {
                this.mapConnections.remove(routeEndpoint);
                System.out.println("Connection closed on {} and removed from the pool " + routeEndpoint);
            }).subscribe();
        }).doOnError(th -> {
            System.out.println("Connect failed on {}, cause: " + routeEndpoint + " " + th);
            this.mapConnections.remove(routeEndpoint);
        }).cache();
    }

    private Mono<RSocket> connectRemote(RouteEndpoint routeEndpoint) {
        return handleErrors(RSocketFactory.connect().transport(TcpClientTransport.create(routeEndpoint.getHost(), routeEndpoint.getPort())).start(), routeEndpoint);
    }
}
