/*
 * Decompiled with CFR 0.152.
 */
package com.cloudimpl.cluster4j.common;

import com.cloudimpl.cluster4j.common.CloudMessage;
import com.cloudimpl.cluster4j.common.EndpointListener;
import com.cloudimpl.cluster4j.common.MessageCodec;
import com.cloudimpl.cluster4j.common.RouteEndpoint;
import io.rsocket.AbstractRSocket;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.SocketAcceptor;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.ServerTransport;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class TransportManager {
    private final Map<RouteEndpoint, Mono<RSocket>> mapConnections = new ConcurrentHashMap<RouteEndpoint, Mono<RSocket>>();
    private final MessageCodec defaultCodec;

    public TransportManager(MessageCodec defaultCodec) {
        this.defaultCodec = defaultCodec;
    }

    public Mono<RSocket> get(RouteEndpoint endpoint) {
        return this.get(endpoint, this.defaultCodec);
    }

    public Mono<RSocket> get(RouteEndpoint endpoint, MessageCodec codec) {
        Mono socket = this.mapConnections.computeIfAbsent(endpoint, this::connect);
        return socket;
    }

    public void createEndpoint(String host, int port, EndpointListener listener) {
        this.createEndpoint(host, port, this.defaultCodec, listener);
    }

    public void createEndpoint(String host, int port, MessageCodec codec, EndpointListener listener) {
        RSocketFactory.receive().acceptor((SocketAcceptor)new SocketAcceptorImpl(codec, listener)).transport((ServerTransport)TcpServerTransport.create((String)host, (int)port)).start().subscribe();
    }

    private Mono<RSocket> connect(RouteEndpoint endpoint) {
        return this.connectRemote(endpoint);
    }

    private Mono<RSocket> handleErrors(Mono<RSocket> mono, RouteEndpoint endpoint) {
        return mono.doOnSuccess(rsocket -> {
            System.out.println("Connected successfully on " + endpoint);
            rsocket.onClose().doOnTerminate(() -> {
                this.mapConnections.remove(endpoint);
                System.out.println("Connection closed on {} and removed from the pool " + endpoint);
            }).subscribe();
        }).doOnError(throwable -> {
            System.out.println("Connect failed on {}, cause: " + endpoint + " " + throwable);
            this.mapConnections.remove(endpoint);
        }).cache();
    }

    private Mono<RSocket> connectRemote(RouteEndpoint endpoint) {
        Mono rsocketMono = RSocketFactory.connect().transport((ClientTransport)TcpClientTransport.create((String)endpoint.getHost(), (int)endpoint.getPort())).start();
        return this.handleErrors((Mono<RSocket>)rsocketMono, endpoint);
    }

    private static class SocketAcceptorImpl
    implements SocketAcceptor {
        private final MessageCodec codec;
        private final EndpointListener listener;

        public SocketAcceptorImpl(MessageCodec codec, EndpointListener listener) {
            this.codec = codec;
            this.listener = listener;
        }

        public Mono<RSocket> accept(ConnectionSetupPayload setupPayload, RSocket reactiveSocket) {
            System.out.println("socket connected...");
            return Mono.just((Object)new AbstractRSocket(){

                public Mono<Void> fireAndForget(Payload payload) {
                    return listener.fireAndForget((Mono<CloudMessage>)Mono.just((Object)payload).map(this::decode));
                }

                public Mono<Payload> requestResponse(Payload payload) {
                    return listener.requestResponse((Mono<CloudMessage>)Mono.just((Object)payload).map(this::decode)).map(this::encode);
                }

                public Flux<Payload> requestStream(Payload payload) {
                    return listener.requestStream((Mono<CloudMessage>)Mono.just((Object)payload).map(this::decode)).map(this::encode);
                }

                public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
                    return listener.requestChannel((Publisher<CloudMessage>)Flux.from(payloads).map(this::decode)).map(this::encode);
                }

                private Payload encode(CloudMessage msg) {
                    return DefaultPayload.create((ByteBuffer)codec.encode(msg));
                }

                private CloudMessage decode(Payload payload) {
                    CloudMessage msg = codec.decode(CloudMessage.class, payload.sliceData());
                    return msg;
                }
            });
        }
    }
}

