package io.scalecube.transport.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.EncoderException;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.MessageCodec;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.cluster.transport.api.TransportConfig;
import io.scalecube.cluster.transport.api.TransportFactory;
import io.scalecube.net.Address;
import io.scalecube.transport.netty.tcp.TcpTransportFactory;
import java.net.InetAddress;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.netty.Connection;
import reactor.netty.DisposableServer;
import reactor.netty.resources.LoopResources;

/* loaded from: input_file:io/scalecube/transport/netty/TransportImpl.class */
public final class TransportImpl implements Transport {
    private static final Logger LOGGER = LoggerFactory.getLogger(Transport.class);
    private final MessageCodec messageCodec;
    private Address address;
    private DisposableServer server;
    private final Receiver receiver;
    private final Sender sender;
    private final DirectProcessor<Message> subject = DirectProcessor.create();
    private final FluxSink<Message> sink = this.subject.sink();
    private final MonoProcessor<Void> stop = MonoProcessor.create();
    private final MonoProcessor<Void> onStop = MonoProcessor.create();
    private final Map<Address, Mono<? extends Connection>> connections = new ConcurrentHashMap();
    private final LoopResources loopResources = LoopResources.create("sc-cluster-io", 1, true);

    /* loaded from: input_file:io/scalecube/transport/netty/TransportImpl$ReceiverContext.class */
    public static final class ReceiverContext {
        private final LoopResources loopResources;
        private final Function<ByteBuf, Message> messageDecoder;
        private final Consumer<Message> messageConsumer;

        private ReceiverContext(LoopResources loopResources, Function<ByteBuf, Message> function, Consumer<Message> consumer) {
            this.loopResources = loopResources;
            this.messageDecoder = function;
            this.messageConsumer = consumer;
        }

        public LoopResources loopResources() {
            return this.loopResources;
        }

        public Function<ByteBuf, Message> messageDecoder() {
            return this.messageDecoder;
        }

        public void onMessage(Message message) {
            this.messageConsumer.accept(message);
        }
    }

    /* loaded from: input_file:io/scalecube/transport/netty/TransportImpl$SenderContext.class */
    public static final class SenderContext {
        private final LoopResources loopResources;
        private final Function<Message, ByteBuf> messageEncoder;

        private SenderContext(LoopResources loopResources, Function<Message, ByteBuf> function) {
            this.loopResources = loopResources;
            this.messageEncoder = function;
        }

        public LoopResources loopResources() {
            return this.loopResources;
        }

        public Function<Message, ByteBuf> messageEncoder() {
            return this.messageEncoder;
        }
    }

    public TransportImpl(MessageCodec messageCodec, Receiver receiver, Sender sender) {
        this.messageCodec = messageCodec;
        this.receiver = receiver;
        this.sender = sender;
    }

    private static Address prepareAddress(DisposableServer disposableServer) {
        InetAddress address = disposableServer.address().getAddress();
        int port = disposableServer.address().getPort();
        return address.isAnyLocalAddress() ? Address.create(Address.getLocalIpAddress().getHostAddress(), port) : Address.create(address.getHostAddress(), port);
    }

    private void init(DisposableServer disposableServer) {
        this.server = disposableServer;
        this.address = prepareAddress(disposableServer);
        this.stop.then(doStop()).doFinally(signalType -> {
            this.onStop.onComplete();
        }).subscribe((Consumer) null, th -> {
            LOGGER.warn("[{}][doStop] Exception occurred: {}", this.address, th.toString());
        });
    }

    public static Transport bindAwait() {
        return bindAwait(TransportConfig.defaultConfig());
    }

    public static Transport bindAwait(TransportConfig transportConfig) {
        try {
            return (Transport) bind(transportConfig).block();
        } catch (Exception e) {
            throw Exceptions.propagate(e.getCause() != null ? e.getCause() : e);
        }
    }

    public static Mono<Transport> bind() {
        return bind(TransportConfig.defaultConfig());
    }

    public static Mono<Transport> bind(TransportConfig transportConfig) {
        return ((TransportFactory) Optional.ofNullable(Optional.ofNullable(transportConfig.transportFactory()).orElse(TransportFactory.INSTANCE)).orElse(new TcpTransportFactory())).createTransport(transportConfig).start();
    }

    public Mono<Transport> start() {
        return Mono.deferWithContext(context -> {
            return this.receiver.bind();
        }).doOnNext(this::init).doOnSuccess(disposableServer -> {
            LOGGER.info("[bind0][{}] Bound cluster transport", disposableServer.address());
        }).doOnError(th -> {
            LOGGER.error("[bind0][{}] Exception occurred: {}", this.address, th.toString());
        }).thenReturn(this).cast(Transport.class).subscriberContext(context2 -> {
            LoopResources loopResources = this.loopResources;
            Function function = this::toMessage;
            FluxSink<Message> fluxSink = this.sink;
            fluxSink.getClass();
            return context2.put(ReceiverContext.class, new ReceiverContext(loopResources, function, (v1) -> {
                r6.next(v1);
            }));
        });
    }

    public Address address() {
        return this.address;
    }

    public boolean isStopped() {
        return this.onStop.isDisposed();
    }

    public final Mono<Void> stop() {
        return Mono.defer(() -> {
            this.stop.onComplete();
            return this.onStop;
        });
    }

    private Mono<Void> doStop() {
        return Mono.defer(() -> {
            LOGGER.info("[{}][doStop] Stopping", this.address);
            this.sink.complete();
            return Flux.concatDelayError(new Publisher[]{closeServer(), shutdownLoopResources()}).then().doFinally(signalType -> {
                this.connections.clear();
            }).doOnSuccess(r5 -> {
                LOGGER.info("[{}][doStop] Stopped", this.address);
            });
        });
    }

    public final Flux<Message> listen() {
        return this.subject.onBackpressureBuffer();
    }

    public Mono<Void> send(Address address, Message message) {
        return Mono.deferWithContext(context -> {
            return this.connections.computeIfAbsent(address, this::connect0);
        }).flatMap(connection -> {
            return Mono.deferWithContext(context2 -> {
                return this.sender.send(message);
            }).subscriberContext(context3 -> {
                return context3.put(Connection.class, connection);
            });
        }).subscriberContext(context2 -> {
            return context2.put(SenderContext.class, new SenderContext(this.loopResources, this::toByteBuf));
        });
    }

    public Mono<Message> requestResponse(Address address, Message message) {
        return Mono.create(monoSink -> {
            Objects.requireNonNull(message, "request must be not null");
            Objects.requireNonNull(message.correlationId(), "correlationId must be not null");
            Flux take = listen().filter(message2 -> {
                return message2.correlationId() != null;
            }).filter(message3 -> {
                return message3.correlationId().equals(message.correlationId());
            }).take(1L);
            monoSink.getClass();
            Consumer consumer = (v1) -> {
                r1.success(v1);
            };
            monoSink.getClass();
            Consumer consumer2 = monoSink::error;
            monoSink.getClass();
            Disposable subscribe = take.subscribe(consumer, consumer2, monoSink::success);
            monoSink.onDispose(Disposables.composite(new Disposable[]{send(address, message).subscribe((Consumer) null, th -> {
                subscribe.dispose();
                monoSink.error(th);
            }), subscribe}));
        });
    }

    private Message toMessage(ByteBuf byteBuf) {
        try {
            ByteBufInputStream byteBufInputStream = new ByteBufInputStream(byteBuf, true);
            Throwable th = null;
            try {
                Message deserialize = this.messageCodec.deserialize(byteBufInputStream);
                if (byteBufInputStream != null) {
                    if (0 != 0) {
                        try {
                            byteBufInputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        byteBufInputStream.close();
                    }
                }
                return deserialize;
            } finally {
            }
        } catch (Exception e) {
            LOGGER.warn("[{}][decodeMessage] Exception occurred: {}", this.address, e.toString());
            throw new DecoderException(e);
        }
    }

    private ByteBuf toByteBuf(Message message) {
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
        try {
            this.messageCodec.serialize(message, new ByteBufOutputStream(buffer));
            return buffer;
        } catch (Exception e) {
            buffer.release();
            LOGGER.warn("[{}][encodeMessage] Exception occurred: {}", this.address, e.toString());
            throw new EncoderException(e);
        }
    }

    private Mono<? extends Connection> connect0(Address address) {
        return this.sender.connect(address).doOnSuccess(connection -> {
            connection.onDispose().doOnTerminate(() -> {
                this.connections.remove(address);
            }).subscribe();
            LOGGER.debug("[{}][connected][{}] Channel: {}", new Object[]{this.address, address, connection.channel()});
        }).doOnError(th -> {
            LOGGER.warn("[{}][connect0][{}] Exception occurred: {}", new Object[]{this.address, address, th.toString()});
            this.connections.remove(address);
        }).cache();
    }

    private Mono<Void> closeServer() {
        return Mono.defer(() -> {
            if (this.server == null) {
                return Mono.empty();
            }
            LOGGER.info("[{}][closeServer] Closing server channel", this.address);
            DisposableServer disposableServer = this.server;
            disposableServer.getClass();
            return Mono.fromRunnable(disposableServer::dispose).then(this.server.onDispose()).doOnSuccess(r5 -> {
                LOGGER.info("[{}][closeServer] Closed server channel", this.address);
            }).doOnError(th -> {
                LOGGER.warn("[{}][closeServer] Exception occurred: {}", this.address, th.toString());
            });
        });
    }

    private Mono<Void> shutdownLoopResources() {
        LoopResources loopResources = this.loopResources;
        loopResources.getClass();
        return Mono.fromRunnable(loopResources::dispose).then(this.loopResources.disposeLater());
    }
}
