package io.reactivesocket.transport.tcp;

import io.reactivesocket.DuplexConnection;
import io.reactivesocket.Frame;
import io.reactivex.netty.channel.Connection;
import org.reactivestreams.Publisher;
import rx.RxReactiveStreams;

/* loaded from: input_file:io/reactivesocket/transport/tcp/TcpDuplexConnection.class */
public class TcpDuplexConnection implements DuplexConnection {
    private final Connection<Frame, Frame> connection;
    private final Publisher<Void> closeNotifier;
    private final Publisher<Void> close;

    public TcpDuplexConnection(Connection<Frame, Frame> connection) {
        this.connection = connection;
        this.closeNotifier = RxReactiveStreams.toPublisher(connection.closeListener());
        this.close = RxReactiveStreams.toPublisher(connection.close());
    }

    public Publisher<Void> send(Publisher<Frame> publisher) {
        return RxReactiveStreams.toPublisher(this.connection.writeAndFlushOnEach(RxReactiveStreams.toObservable(publisher)));
    }

    public Publisher<Frame> receive() {
        return RxReactiveStreams.toPublisher(this.connection.getInput());
    }

    public double availability() {
        return this.connection.unsafeNettyChannel().isActive() ? 1.0d : 0.0d;
    }

    public Publisher<Void> close() {
        return this.close;
    }

    public Publisher<Void> onClose() {
        return this.closeNotifier;
    }

    public String toString() {
        return this.connection.unsafeNettyChannel().toString();
    }
}
