package io.rsocket.transport.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.RSocketErrorException;
import io.rsocket.frame.ErrorFrameCodec;
import io.rsocket.frame.FrameLengthCodec;
import io.rsocket.internal.BaseDuplexConnection;
import java.net.SocketAddress;
import java.util.Objects;
import java.util.function.Consumer;
import reactor.core.publisher.Flux;
import reactor.netty.Connection;

/* loaded from: input_file:io/rsocket/transport/netty/TcpDuplexConnection.class */
public final class TcpDuplexConnection extends BaseDuplexConnection {
    private final Connection connection;

    public TcpDuplexConnection(Connection connection) {
        this.connection = (Connection) Objects.requireNonNull(connection, "connection must not be null");
        connection.channel().closeFuture().addListener(future -> {
            if (isDisposed()) {
                return;
            }
            dispose();
        });
        connection.outbound().send(this.sender).then().subscribe();
    }

    public ByteBufAllocator alloc() {
        return this.connection.channel().alloc();
    }

    public SocketAddress remoteAddress() {
        return this.connection.channel().remoteAddress();
    }

    protected void doOnClose() {
        this.sender.dispose();
        this.connection.dispose();
    }

    public void sendErrorAndClose(RSocketErrorException rSocketErrorException) {
        ByteBuf encode = ErrorFrameCodec.encode(alloc(), 0, rSocketErrorException);
        this.connection.outbound().sendObject(FrameLengthCodec.encode(alloc(), encode.readableBytes(), encode)).then().subscribe((Consumer) null, th -> {
            this.onClose.onError(th);
        }, () -> {
            Throwable cause = rSocketErrorException.getCause();
            if (cause == null) {
                this.onClose.onComplete();
            } else {
                this.onClose.onError(cause);
            }
        });
    }

    public Flux<ByteBuf> receive() {
        return this.connection.inbound().receive().map(FrameLengthCodec::frame);
    }

    public void sendFrame(int i, ByteBuf byteBuf) {
        super.sendFrame(i, FrameLengthCodec.encode(alloc(), byteBuf.readableBytes(), byteBuf));
    }
}
