/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.transport.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.EncoderException;
import io.netty.util.ReferenceCountUtil;
import io.scalecube.cluster.transport.api.DistinctErrors;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.MessageCodec;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.transport.netty.Receiver;
import io.scalecube.transport.netty.Sender;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Sinks;
import reactor.netty.Connection;
import reactor.netty.DisposableServer;
import reactor.netty.resources.LoopResources;

public final class TransportImpl
implements Transport {
    private static final Logger LOGGER = LoggerFactory.getLogger(Transport.class);
    private static final DistinctErrors DISTINCT_ERRORS = new DistinctErrors(Duration.ofMinutes(1L));
    private final MessageCodec messageCodec;
    private final Sinks.Many<Message> sink = Sinks.many().multicast().directBestEffort();
    private final Sinks.One<Void> stop = Sinks.one();
    private final Sinks.One<Void> onStop = Sinks.one();
    private String address;
    private DisposableServer server;
    private final Map<String, Mono<? extends Connection>> connections = new ConcurrentHashMap<String, Mono<? extends Connection>>();
    private final LoopResources loopResources = LoopResources.create((String)"sc-cluster-io", (int)1, (boolean)true);
    private final Receiver receiver;
    private final Sender sender;
    private final Function<String, String> addressMapper;

    public TransportImpl(MessageCodec messageCodec, Receiver receiver, Sender sender, Function<String, String> addressMapper) {
        this.messageCodec = messageCodec;
        this.receiver = receiver;
        this.sender = sender;
        this.addressMapper = addressMapper;
    }

    private static String prepareAddress(DisposableServer server) {
        InetSocketAddress serverAddress = (InetSocketAddress)server.address();
        InetAddress inetAddress = serverAddress.getAddress();
        int port = serverAddress.getPort();
        if (inetAddress.isAnyLocalAddress()) {
            return TransportImpl.getLocalHostAddress() + ":" + port;
        }
        return inetAddress.getHostAddress() + ":" + port;
    }

    private static String getLocalHostAddress() {
        try {
            return InetAddress.getLocalHost().getHostAddress();
        }
        catch (UnknownHostException e) {
            throw new RuntimeException(e);
        }
    }

    private void init(DisposableServer server) {
        this.server = server;
        this.address = TransportImpl.prepareAddress(server);
        this.stop.asMono().then(this.doStop()).doFinally(s -> this.onStop.emitEmpty(Sinks.EmitFailureHandler.busyLooping((Duration)Duration.ofSeconds(3L)))).subscribe(null, ex -> LOGGER.warn("[{}][doStop] Exception occurred: {}", (Object)this.address, (Object)ex.toString()));
    }

    public Mono<Transport> start() {
        return Mono.deferContextual(context -> this.receiver.bind()).doOnNext(this::init).doOnSuccess(t -> LOGGER.info("[start][{}] Bound cluster transport", (Object)t.address())).doOnError(ex -> LOGGER.error("[start][{}] Exception occurred: {}", (Object)this.address, (Object)ex.toString())).thenReturn((Object)this).cast(Transport.class).contextWrite(context -> context.put(ReceiverContext.class, (Object)new ReceiverContext(this.address, this.sink, this.loopResources, this::decodeMessage)));
    }

    public String address() {
        return this.address;
    }

    public boolean isStopped() {
        return this.onStop.asMono().toFuture().isDone();
    }

    public Mono<Void> stop() {
        return Mono.defer(() -> {
            this.stop.emitEmpty(Sinks.EmitFailureHandler.busyLooping((Duration)Duration.ofSeconds(3L)));
            return this.onStop.asMono();
        });
    }

    private Mono<Void> doStop() {
        return Mono.defer(() -> {
            LOGGER.info("[{}][doStop] Stopping", (Object)this.address);
            this.sink.emitComplete(Sinks.EmitFailureHandler.busyLooping((Duration)Duration.ofSeconds(3L)));
            return Flux.concatDelayError((Publisher[])new Publisher[]{this.closeServer(), this.shutdownLoopResources()}).then().doFinally(s -> this.connections.clear()).doOnSuccess(avoid -> LOGGER.info("[{}][doStop] Stopped", (Object)this.address));
        });
    }

    public Flux<Message> listen() {
        return this.sink.asFlux().onBackpressureBuffer();
    }

    public Mono<Void> send(String address, Message message) {
        return Mono.deferContextual(context -> this.connections.computeIfAbsent(address, this::connect)).flatMap(connection -> Mono.deferContextual(context -> this.sender.send(message)).contextWrite(context -> context.put(Connection.class, connection))).contextWrite(context -> context.put(SenderContext.class, (Object)new SenderContext(this.loopResources, this::encodeMessage)));
    }

    public Mono<Message> requestResponse(String address, Message request) {
        return Mono.create(sink -> {
            Objects.requireNonNull(request, "request must be not null");
            Objects.requireNonNull(request.correlationId(), "correlationId must be not null");
            Disposable receive = this.listen().filter(resp -> resp.correlationId() != null).filter(resp -> resp.correlationId().equals(request.correlationId())).take(1L).subscribe(arg_0 -> ((MonoSink)sink).success(arg_0), arg_0 -> ((MonoSink)sink).error(arg_0), () -> ((MonoSink)sink).success());
            Disposable send = this.send(address, request).subscribe(null, ex -> {
                receive.dispose();
                sink.error(ex);
            });
            sink.onDispose((Disposable)Disposables.composite((Disposable[])new Disposable[]{send, receive}));
        });
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private Message decodeMessage(ByteBuf byteBuf) {
        try (ByteBufInputStream stream = new ByteBufInputStream(byteBuf, true);){
            Message message = this.messageCodec.deserialize((InputStream)stream);
            return message;
        }
        catch (Exception e) {
            if (DISTINCT_ERRORS.contains((Throwable)e)) throw new DecoderException((Throwable)e);
            LOGGER.warn("[{}][decodeMessage] Exception occurred: {}", (Object)this.address, (Object)e.toString());
            throw new DecoderException((Throwable)e);
        }
    }

    private ByteBuf encodeMessage(Message message) {
        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
        ByteBufOutputStream stream = new ByteBufOutputStream(byteBuf);
        try {
            this.messageCodec.serialize(message, (OutputStream)stream);
        }
        catch (Exception e) {
            byteBuf.release();
            if (!DISTINCT_ERRORS.contains((Throwable)e)) {
                LOGGER.warn("[{}][encodeMessage] Exception occurred: {}", (Object)this.address, (Object)e.toString());
            }
            throw new EncoderException((Throwable)e);
        }
        return byteBuf;
    }

    private Mono<? extends Connection> connect(String remoteAddress) {
        String mappedAddr = this.addressMapper.apply(remoteAddress);
        return this.sender.connect(mappedAddr).doOnSuccess(connection -> {
            connection.onDispose().doOnTerminate(() -> this.connections.remove(remoteAddress)).subscribe();
            LOGGER.debug("[{}][connect][success] remoteAddress: {}, channel: {}", new Object[]{this.address, remoteAddress, connection.channel()});
        }).doOnError(th -> {
            if (!DISTINCT_ERRORS.contains(th)) {
                LOGGER.warn("[{}][connect][error] remoteAddress: {}, cause: {}", new Object[]{this.address, remoteAddress, th.toString()});
            }
            this.connections.remove(remoteAddress);
        }).cache();
    }

    private Mono<Void> closeServer() {
        return Mono.defer(() -> {
            if (this.server == null) {
                return Mono.empty();
            }
            LOGGER.info("[{}][closeServer] Closing server channel", (Object)this.address);
            return Mono.fromRunnable(() -> ((DisposableServer)this.server).dispose()).then(this.server.onDispose()).doOnSuccess(avoid -> LOGGER.info("[{}][closeServer] Closed server channel", (Object)this.address)).doOnError(e -> LOGGER.warn("[{}][closeServer] Exception occurred: {}", (Object)this.address, (Object)e.toString()));
        });
    }

    private Mono<Void> shutdownLoopResources() {
        return Mono.fromRunnable(() -> ((LoopResources)this.loopResources).dispose()).then(this.loopResources.disposeLater());
    }

    public static final class SenderContext {
        private final LoopResources loopResources;
        private final Function<Message, ByteBuf> messageEncoder;

        private SenderContext(LoopResources loopResources, Function<Message, ByteBuf> messageEncoder) {
            this.loopResources = loopResources;
            this.messageEncoder = messageEncoder;
        }

        public LoopResources loopResources() {
            return this.loopResources;
        }

        public Function<Message, ByteBuf> messageEncoder() {
            return this.messageEncoder;
        }
    }

    public static final class ReceiverContext {
        private final String address;
        private final Sinks.Many<Message> sink;
        private final LoopResources loopResources;
        private final Function<ByteBuf, Message> messageDecoder;

        private ReceiverContext(String address, Sinks.Many<Message> sink, LoopResources loopResources, Function<ByteBuf, Message> messageDecoder) {
            this.address = address;
            this.sink = sink;
            this.loopResources = loopResources;
            this.messageDecoder = messageDecoder;
        }

        public LoopResources loopResources() {
            return this.loopResources;
        }

        public void onMessage(ByteBuf byteBuf) {
            try {
                if (byteBuf == Unpooled.EMPTY_BUFFER) {
                    return;
                }
                if (!byteBuf.isReadable()) {
                    ReferenceCountUtil.safeRelease((Object)byteBuf);
                    return;
                }
                Message message = this.messageDecoder.apply(byteBuf);
                this.sink.emitNext((Object)message, Sinks.EmitFailureHandler.busyLooping((Duration)Duration.ofSeconds(3L)));
            }
            catch (Exception e) {
                LOGGER.error("[{}][onMessage] Exception occurred:", (Object)this.address, (Object)e);
            }
        }
    }
}

