package io.scalecube.transport.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.EncoderException;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.MessageCodec;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.cluster.transport.api.TransportConfig;
import io.scalecube.net.Address;
import java.net.InetAddress;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.DisposableServer;
import reactor.netty.NettyInbound;
import reactor.netty.NettyOutbound;
import reactor.netty.channel.BootstrapHandlers;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.TcpClient;
import reactor.netty.tcp.TcpServer;

/* loaded from: input_file:io/scalecube/transport/netty/TransportImpl.class */
public final class TransportImpl implements Transport {
    private static final Logger LOGGER = LoggerFactory.getLogger(Transport.class);
    private final TransportConfig config;
    private final LoopResources loopResources;
    private final DirectProcessor<Message> messagesSubject;
    private final FluxSink<Message> messageSink;
    private final Map<Address, Mono<? extends Connection>> connections;
    private final ExceptionHandler exceptionHandler;
    private final TransportChannelInitializer channelInitializer;
    private final MonoProcessor<Void> stop;
    private final MonoProcessor<Void> onStop;
    private final Address address;
    private final DisposableServer server;
    private final MessageCodec messageCodec;

    /* loaded from: input_file:io/scalecube/transport/netty/TransportImpl$TransportChannelInitializer.class */
    private final class TransportChannelInitializer implements BiConsumer<ConnectionObserver, Channel> {
        private static final int LENGTH_FIELD_LENGTH = 4;

        private TransportChannelInitializer() {
        }

        @Override // java.util.function.BiConsumer
        public void accept(ConnectionObserver connectionObserver, Channel channel) {
            ChannelPipeline pipeline = channel.pipeline();
            pipeline.addLast(new ChannelHandler[]{new LengthFieldPrepender(LENGTH_FIELD_LENGTH)});
            pipeline.addLast(new ChannelHandler[]{new LengthFieldBasedFrameDecoder(TransportImpl.this.config.maxFrameLength(), 0, LENGTH_FIELD_LENGTH, 0, LENGTH_FIELD_LENGTH)});
            pipeline.addLast(new ChannelHandler[]{TransportImpl.this.exceptionHandler});
        }
    }

    public TransportImpl(TransportConfig transportConfig) {
        this.config = transportConfig;
        this.loopResources = LoopResources.create("sc-cluster-io", 1, true);
        this.messagesSubject = DirectProcessor.create();
        this.messageSink = this.messagesSubject.sink();
        this.connections = new ConcurrentHashMap();
        this.exceptionHandler = new ExceptionHandler();
        this.channelInitializer = new TransportChannelInitializer();
        this.stop = MonoProcessor.create();
        this.onStop = MonoProcessor.create();
        this.messageCodec = transportConfig.messageCodec();
        this.address = null;
        this.server = null;
    }

    private TransportImpl(DisposableServer disposableServer, TransportImpl transportImpl) {
        this.server = disposableServer;
        this.address = prepareAddress(disposableServer);
        this.config = transportImpl.config;
        this.loopResources = transportImpl.loopResources;
        this.messagesSubject = transportImpl.messagesSubject;
        this.messageSink = transportImpl.messageSink;
        this.connections = transportImpl.connections;
        this.exceptionHandler = transportImpl.exceptionHandler;
        this.channelInitializer = transportImpl.channelInitializer;
        this.stop = transportImpl.stop;
        this.onStop = transportImpl.onStop;
        this.messageCodec = transportImpl.messageCodec;
        this.stop.then(doStop()).doFinally(signalType -> {
            this.onStop.onComplete();
        }).subscribe((Consumer) null, th -> {
            LOGGER.warn("[{}][doStop] Exception occurred: {}", this.address, th.toString());
        });
    }

    private static Address prepareAddress(DisposableServer disposableServer) {
        InetAddress address = disposableServer.address().getAddress();
        int port = disposableServer.address().getPort();
        return address.isAnyLocalAddress() ? Address.create(Address.getLocalIpAddress().getHostAddress(), port) : Address.create(address.getHostAddress(), port);
    }

    public static Transport bindAwait() {
        return bindAwait(TransportConfig.defaultConfig());
    }

    public static Transport bindAwait(TransportConfig transportConfig) {
        try {
            return (Transport) bind(transportConfig).block();
        } catch (Exception e) {
            throw Exceptions.propagate(e.getCause() != null ? e.getCause() : e);
        }
    }

    public static Mono<Transport> bind() {
        return bind(TransportConfig.defaultConfig());
    }

    public static Mono<Transport> bind(TransportConfig transportConfig) {
        return new TransportImpl(transportConfig).bind0();
    }

    public Mono<Transport> bind0() {
        return newTcpServer().handle(this::onMessage).bind().doOnSuccess(disposableServer -> {
            LOGGER.info("[bind0][{}] Bound cluster transport", disposableServer.address());
        }).doOnError(th -> {
            LOGGER.error("[bind0][{}] Exception occurred: {}", Integer.valueOf(this.config.port()), th.toString());
        }).map(disposableServer2 -> {
            return new TransportImpl(disposableServer2, this);
        }).cast(Transport.class);
    }

    public Address address() {
        return this.address;
    }

    public boolean isStopped() {
        return this.onStop.isDisposed();
    }

    public final Mono<Void> stop() {
        return Mono.defer(() -> {
            this.stop.onComplete();
            return this.onStop;
        });
    }

    private Mono<Void> doStop() {
        return Mono.defer(() -> {
            LOGGER.info("[{}][doStop] Stopping", this.address);
            this.messageSink.complete();
            return Flux.concatDelayError(new Publisher[]{closeServer(), shutdownLoopResources()}).then().doFinally(signalType -> {
                this.connections.clear();
            }).doOnSuccess(r5 -> {
                LOGGER.info("[{}][doStop] Stopped", this.address);
            });
        });
    }

    public final Flux<Message> listen() {
        return this.messagesSubject.onBackpressureBuffer();
    }

    public Mono<Void> send(Address address, Message message) {
        return this.connections.computeIfAbsent(address, this::connect0).map((v0) -> {
            return v0.outbound();
        }).flatMap(nettyOutbound -> {
            return nettyOutbound.send(Mono.just(message).map(this::toByteBuf), byteBuf -> {
                return true;
            }).then();
        }).then();
    }

    public Mono<Message> requestResponse(Address address, Message message) {
        return Mono.create(monoSink -> {
            Objects.requireNonNull(message, "request must be not null");
            Objects.requireNonNull(message.correlationId(), "correlationId must be not null");
            Flux take = listen().filter(message2 -> {
                return message2.correlationId() != null;
            }).filter(message3 -> {
                return message3.correlationId().equals(message.correlationId());
            }).take(1L);
            monoSink.getClass();
            Consumer consumer = (v1) -> {
                r1.success(v1);
            };
            monoSink.getClass();
            Consumer consumer2 = monoSink::error;
            monoSink.getClass();
            Disposable subscribe = take.subscribe(consumer, consumer2, monoSink::success);
            monoSink.onDispose(Disposables.composite(new Disposable[]{send(address, message).subscribe((Consumer) null, th -> {
                subscribe.dispose();
                monoSink.error(th);
            }), subscribe}));
        });
    }

    private Mono<Void> onMessage(NettyInbound nettyInbound, NettyOutbound nettyOutbound) {
        Flux map = nettyInbound.receive().retain().map(this::toMessage);
        FluxSink<Message> fluxSink = this.messageSink;
        fluxSink.getClass();
        return map.doOnNext((v1) -> {
            r1.next(v1);
        }).then();
    }

    private Message toMessage(ByteBuf byteBuf) {
        try {
            ByteBufInputStream byteBufInputStream = new ByteBufInputStream(byteBuf, true);
            Throwable th = null;
            try {
                Message deserialize = this.messageCodec.deserialize(byteBufInputStream);
                if (byteBufInputStream != null) {
                    if (0 != 0) {
                        try {
                            byteBufInputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        byteBufInputStream.close();
                    }
                }
                return deserialize;
            } finally {
            }
        } catch (Exception e) {
            LOGGER.warn("[{}][toMessage] Exception occurred: {}", this.address, e.toString());
            throw new DecoderException(e);
        }
    }

    private ByteBuf toByteBuf(Message message) {
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
        try {
            this.messageCodec.serialize(message, new ByteBufOutputStream(buffer));
            return buffer;
        } catch (Exception e) {
            buffer.release();
            LOGGER.warn("[{}][toByteBuf] Exception occurred: {}", this.address, e.toString());
            throw new EncoderException(e);
        }
    }

    private Mono<? extends Connection> connect0(Address address) {
        return newTcpClient(address).doOnDisconnected(connection -> {
            LOGGER.debug("[{}][disconnected][{}] Channel: {}", new Object[]{this.address, address, connection.channel()});
            this.connections.remove(address);
        }).doOnConnected(connection2 -> {
            LOGGER.debug("[{}][connected][{}] Channel: {}", new Object[]{this.address, address, connection2.channel()});
        }).connect().doOnError(th -> {
            LOGGER.debug("[{}][connect0][{}] Exception occurred: {}", new Object[]{this.address, address, th.toString()});
            this.connections.remove(address);
        }).cache();
    }

    private Mono<Void> closeServer() {
        return Mono.defer(() -> {
            if (this.server == null) {
                return Mono.empty();
            }
            LOGGER.info("[{}][closeServer] Closing server channel", this.address);
            DisposableServer disposableServer = this.server;
            disposableServer.getClass();
            return Mono.fromRunnable(disposableServer::dispose).then(this.server.onDispose()).doOnSuccess(r5 -> {
                LOGGER.info("[{}][closeServer] Closed server channel", this.address);
            }).doOnError(th -> {
                LOGGER.warn("[{}][closeServer] Exception occurred: {}", this.address, th.toString());
            });
        });
    }

    private Mono<Void> shutdownLoopResources() {
        LoopResources loopResources = this.loopResources;
        loopResources.getClass();
        return Mono.fromRunnable(loopResources::dispose).then(this.loopResources.disposeLater());
    }

    private TcpServer newTcpServer() {
        TcpServer port = TcpServer.create().runOn(this.loopResources).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.SO_REUSEADDR, true).port(this.config.port());
        if (this.config.host() != null) {
            port = port.host(this.config.host());
        }
        return port.bootstrap(serverBootstrap -> {
            return BootstrapHandlers.updateConfiguration(serverBootstrap, "inbound", this.channelInitializer);
        });
    }

    private TcpClient newTcpClient(Address address) {
        return TcpClient.create(ConnectionProvider.newConnection()).runOn(this.loopResources).host(address.host()).port(address.port()).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf(this.config.connectTimeout())).bootstrap(bootstrap -> {
            return BootstrapHandlers.updateConfiguration(bootstrap, "outbound", this.channelInitializer);
        });
    }
}
