package io.rsocket.transport.netty;

import io.rsocket.Frame;
import io.rsocket.internal.BaseDuplexConnection;
import java.util.Objects;
import org.reactivestreams.Publisher;
import reactor.core.Fuseable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;

/* loaded from: input_file:io/rsocket/transport/netty/TcpDuplexConnection.class */
public final class TcpDuplexConnection extends BaseDuplexConnection {
    private final Connection connection;

    public TcpDuplexConnection(Connection connection) {
        this.connection = (Connection) Objects.requireNonNull(connection, "connection must not be null");
        connection.channel().closeFuture().addListener(future -> {
            if (isDisposed()) {
                return;
            }
            dispose();
        });
    }

    protected void doOnClose() {
        if (this.connection.isDisposed()) {
            return;
        }
        this.connection.dispose();
    }

    public Flux<Frame> receive() {
        return this.connection.inbound().receive().map(byteBuf -> {
            return Frame.from(byteBuf.retain());
        });
    }

    public Mono<Void> send(Publisher<Frame> publisher) {
        return Flux.from(publisher).transform(flux -> {
            if (!(flux instanceof Fuseable.QueueSubscription)) {
                return new SendPublisher(flux, this.connection.channel(), frame -> {
                    return frame.content().retain();
                }, (v0) -> {
                    return v0.readableBytes();
                });
            }
            Fuseable.QueueSubscription queueSubscription = (Fuseable.QueueSubscription) flux;
            queueSubscription.requestFusion(2);
            return new SendPublisher(queueSubscription, flux, this.connection.channel(), frame2 -> {
                return frame2.content().retain();
            }, (v0) -> {
                return v0.readableBytes();
            });
        }).then();
    }
}
