package reactor.io.net.impl.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.processor.CancelException;
import reactor.fn.Consumer;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.net.ChannelStream;
import reactor.io.net.ReactorChannel;
import reactor.io.net.impl.netty.NettyChannelHandlerBridge;
import reactor.rx.Streams;
import reactor.rx.broadcast.Broadcaster;
import reactor.rx.subscription.PushSubscription;

/* loaded from: input_file:reactor/io/net/impl/netty/NettyChannelStream.class */
public class NettyChannelStream<IN, OUT> extends ChannelStream<IN, OUT> {
    private final Channel ioChannel;

    /* loaded from: input_file:reactor/io/net/impl/netty/NettyChannelStream$NettyConsumerSpec.class */
    private class NettyConsumerSpec implements ReactorChannel.ConsumerSpec {
        private NettyConsumerSpec() {
        }

        @Override // reactor.io.net.ReactorChannel.ConsumerSpec
        public ReactorChannel.ConsumerSpec close(final Consumer<Void> consumer) {
            NettyChannelStream.this.ioChannel.pipeline().addLast(new ChannelHandler[]{new ChannelDuplexHandler() { // from class: reactor.io.net.impl.netty.NettyChannelStream.NettyConsumerSpec.1
                public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
                    consumer.accept((Object) null);
                    super.channelInactive(channelHandlerContext);
                }
            }});
            return this;
        }

        @Override // reactor.io.net.ReactorChannel.ConsumerSpec
        public ReactorChannel.ConsumerSpec readIdle(long j, final Consumer<Void> consumer) {
            NettyChannelStream.this.ioChannel.pipeline().addFirst(new ChannelHandler[]{new IdleStateHandler(j, 0L, 0L, TimeUnit.MILLISECONDS) { // from class: reactor.io.net.impl.netty.NettyChannelStream.NettyConsumerSpec.2
                protected void channelIdle(ChannelHandlerContext channelHandlerContext, IdleStateEvent idleStateEvent) throws Exception {
                    if (idleStateEvent.state() == IdleState.READER_IDLE) {
                        consumer.accept((Object) null);
                    }
                    super.channelIdle(channelHandlerContext, idleStateEvent);
                }
            }});
            return this;
        }

        @Override // reactor.io.net.ReactorChannel.ConsumerSpec
        public ReactorChannel.ConsumerSpec writeIdle(long j, final Consumer<Void> consumer) {
            NettyChannelStream.this.ioChannel.pipeline().addLast(new ChannelHandler[]{new IdleStateHandler(0L, j, 0L, TimeUnit.MILLISECONDS) { // from class: reactor.io.net.impl.netty.NettyChannelStream.NettyConsumerSpec.3
                protected void channelIdle(ChannelHandlerContext channelHandlerContext, IdleStateEvent idleStateEvent) throws Exception {
                    if (idleStateEvent.state() == IdleState.WRITER_IDLE) {
                        consumer.accept((Object) null);
                    }
                    super.channelIdle(channelHandlerContext, idleStateEvent);
                }
            }});
            return this;
        }
    }

    public NettyChannelStream(Environment environment, Codec<Buffer, IN, OUT> codec, long j, Dispatcher dispatcher, Channel channel) {
        super(environment, codec, j, dispatcher);
        this.ioChannel = channel;
    }

    public void subscribe(Subscriber<? super IN> subscriber) {
        this.ioChannel.pipeline().fireUserEventTriggered(new NettyChannelHandlerBridge.ChannelInputSubscriberEvent(subscriber));
    }

    @Override // reactor.io.net.ChannelStream
    public void doSubscribeWriter(Publisher<? extends OUT> publisher, final Subscriber<? super Void> subscriber) {
        Publisher<? extends OUT> map = getEncoder() != null ? Streams.wrap(publisher).map(getEncoder()) : publisher;
        if (this.ioChannel.eventLoop().inEventLoop()) {
            this.ioChannel.write(map).addListener(new ChannelFutureListener() { // from class: reactor.io.net.impl.netty.NettyChannelStream.1
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    subscriber.onSubscribe(Broadcaster.HOT_SUBSCRIPTION);
                    if (channelFuture.isSuccess()) {
                        subscriber.onComplete();
                    } else {
                        subscriber.onError(channelFuture.cause());
                    }
                }
            });
        } else {
            final Publisher<? extends OUT> publisher2 = map;
            this.ioChannel.eventLoop().execute(new Runnable() { // from class: reactor.io.net.impl.netty.NettyChannelStream.2
                @Override // java.lang.Runnable
                public void run() {
                    NettyChannelStream.this.ioChannel.write(publisher2).addListener(new ChannelFutureListener() { // from class: reactor.io.net.impl.netty.NettyChannelStream.2.1
                        public void operationComplete(ChannelFuture channelFuture) throws Exception {
                            subscriber.onSubscribe(Broadcaster.HOT_SUBSCRIPTION);
                            if (channelFuture.isSuccess()) {
                                subscriber.onComplete();
                            } else {
                                subscriber.onError(channelFuture.cause());
                            }
                        }
                    });
                }
            });
        }
    }

    @Override // reactor.io.net.ReactorChannel
    public InetSocketAddress remoteAddress() {
        return (InetSocketAddress) this.ioChannel.remoteAddress();
    }

    @Override // reactor.io.net.ReactorChannel
    public ReactorChannel.ConsumerSpec on() {
        return new NettyConsumerSpec();
    }

    @Override // reactor.io.net.ChannelStream
    public Channel delegate() {
        return this.ioChannel;
    }

    @Override // reactor.io.net.ChannelStream
    public void doDecoded(IN in) {
        NettyChannelHandlerBridge nettyChannelHandlerBridge = this.ioChannel.pipeline().get(NettyChannelHandlerBridge.class);
        PushSubscription<IN> subscription = nettyChannelHandlerBridge == null ? null : nettyChannelHandlerBridge.subscription();
        if (subscription != null) {
            try {
                subscription.onNext(in);
            } catch (CancelException e) {
            }
        }
    }

    public String toString() {
        return getClass().getName() + " {channel=" + this.ioChannel + '}';
    }
}
