package org.apache.pulsar.broker.auth;

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import java.net.SocketAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/auth/PortForwarder.class */
public class PortForwarder implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(PortForwarder.class);
    private final SocketAddress targetAddress;
    private final Channel serverChannel;
    private final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    private final EventLoopGroup workerGroup = new NioEventLoopGroup();

    /* loaded from: input_file:org/apache/pulsar/broker/auth/PortForwarder$BackendHandler.class */
    private class BackendHandler extends ChannelInboundHandlerAdapter {
        private final Channel inboundChannel;

        public BackendHandler(Channel channel) {
            this.inboundChannel = channel;
        }

        public void channelActive(ChannelHandlerContext channelHandlerContext) {
            channelHandlerContext.read();
        }

        public void channelRead(final ChannelHandlerContext channelHandlerContext, Object obj) {
            this.inboundChannel.writeAndFlush(obj).addListener(new ChannelFutureListener() { // from class: org.apache.pulsar.broker.auth.PortForwarder.BackendHandler.1
                public void operationComplete(ChannelFuture channelFuture) {
                    if (channelFuture.isSuccess()) {
                        channelHandlerContext.channel().read();
                    } else {
                        channelFuture.channel().close();
                    }
                }
            });
        }

        public void channelInactive(ChannelHandlerContext channelHandlerContext) {
            PortForwarder.closeOnFlush(this.inboundChannel);
        }

        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
            PortForwarder.LOG.error("backend exception", th);
            PortForwarder.closeOnFlush(channelHandlerContext.channel());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/broker/auth/PortForwarder$FrontendHandler.class */
    public class FrontendHandler extends ChannelInboundHandlerAdapter {
        private volatile Channel outboundChannel;

        private FrontendHandler() {
        }

        public void channelActive(ChannelHandlerContext channelHandlerContext) {
            final Channel channel = channelHandlerContext.channel();
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(channel.eventLoop()).channel(channelHandlerContext.channel().getClass()).handler(new BackendHandler(channel)).option(ChannelOption.AUTO_READ, false);
            ChannelFuture connect = bootstrap.connect(PortForwarder.this.targetAddress);
            this.outboundChannel = connect.channel();
            connect.addListener(new ChannelFutureListener() { // from class: org.apache.pulsar.broker.auth.PortForwarder.FrontendHandler.1
                public void operationComplete(ChannelFuture channelFuture) {
                    if (channelFuture.isSuccess()) {
                        channel.read();
                    } else {
                        channel.close();
                    }
                }
            });
        }

        public void channelRead(final ChannelHandlerContext channelHandlerContext, Object obj) {
            if (this.outboundChannel.isActive()) {
                this.outboundChannel.writeAndFlush(obj).addListener(new ChannelFutureListener() { // from class: org.apache.pulsar.broker.auth.PortForwarder.FrontendHandler.2
                    public void operationComplete(ChannelFuture channelFuture) {
                        if (channelFuture.isSuccess()) {
                            channelHandlerContext.channel().read();
                        } else {
                            channelFuture.channel().close();
                        }
                    }
                });
            }
        }

        public void channelInactive(ChannelHandlerContext channelHandlerContext) {
            if (this.outboundChannel != null) {
                PortForwarder.closeOnFlush(this.outboundChannel);
            }
        }

        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
            PortForwarder.LOG.error("frontend exception", th);
            PortForwarder.closeOnFlush(channelHandlerContext.channel());
        }
    }

    /* loaded from: input_file:org/apache/pulsar/broker/auth/PortForwarder$Initializer.class */
    private class Initializer extends ChannelInitializer<SocketChannel> {
        private Initializer() {
        }

        public void initChannel(SocketChannel socketChannel) {
            socketChannel.pipeline().addLast(new ChannelHandler[]{new LoggingHandler(PortForwarder.class, LogLevel.DEBUG), new FrontendHandler()});
        }
    }

    public PortForwarder(SocketAddress socketAddress, SocketAddress socketAddress2) {
        this.targetAddress = socketAddress2;
        try {
            this.serverChannel = new ServerBootstrap().group(this.bossGroup, this.workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler(PortForwarder.class, LogLevel.DEBUG)).childHandler(new Initializer()).childOption(ChannelOption.AUTO_READ, false).bind(socketAddress).sync().channel();
            LOG.info("Started port forwarding service on {}, target: {}", socketAddress, socketAddress2);
        } catch (Exception e) {
            throw new RuntimeException(String.format("failed to bind to %s: %s", socketAddress, e.getMessage()), e);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.serverChannel.close().sync();
        this.bossGroup.shutdownGracefully();
        this.workerGroup.shutdownGracefully();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void closeOnFlush(Channel channel) {
        if (channel.isActive()) {
            channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
        }
    }
}
