package org.apache.pulsar.broker.protocol;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import java.io.File;
import java.io.FileOutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.common.util.PortManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.class */
public abstract class SimpleProtocolHandlerTestsBase extends BrokerTestBase {
    private static final Logger log = LoggerFactory.getLogger(SimpleProtocolHandlerTestsBase.class);
    private File tempDirectory;
    private boolean useSeparateThreadPool;

    /* loaded from: input_file:org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase$MyProtocolHandler.class */
    public static final class MyProtocolHandler implements ProtocolHandler {
        private ServiceConfiguration conf;
        private final List<Integer> ports = new ArrayList();

        public String protocolName() {
            return "test";
        }

        public boolean accept(String str) {
            return "test".equals(str);
        }

        public void initialize(ServiceConfiguration serviceConfiguration) throws Exception {
            this.conf = serviceConfiguration;
        }

        public String getProtocolDataToAdvertise() {
            return "test";
        }

        public void start(BrokerService brokerService) {
        }

        public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelInitializers() {
            int nextLockedFreePort = PortManager.nextLockedFreePort();
            this.ports.add(Integer.valueOf(nextLockedFreePort));
            return Collections.singletonMap(new InetSocketAddress(this.conf.getBindAddress(), nextLockedFreePort), new ChannelInitializer<SocketChannel>() { // from class: org.apache.pulsar.broker.protocol.SimpleProtocolHandlerTestsBase.MyProtocolHandler.1
                /* JADX INFO: Access modifiers changed from: protected */
                public void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(new ChannelHandler[]{new ChannelInboundHandlerAdapter() { // from class: org.apache.pulsar.broker.protocol.SimpleProtocolHandlerTestsBase.MyProtocolHandler.1.1
                        public void channelActive(ChannelHandlerContext channelHandlerContext) {
                            ByteBuf buffer = channelHandlerContext.alloc().buffer();
                            buffer.writeBytes("ok".getBytes(StandardCharsets.UTF_8));
                            channelHandlerContext.writeAndFlush(buffer).addListener(channelFuture -> {
                                channelHandlerContext.close();
                            });
                        }

                        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
                            SimpleProtocolHandlerTestsBase.log.error("error", th);
                            channelHandlerContext.close();
                        }
                    }});
                }
            });
        }

        public void close() {
            this.ports.removeIf((v0) -> {
                return PortManager.releaseLockedPort(v0);
            });
        }
    }

    public SimpleProtocolHandlerTestsBase(boolean z) {
        this.useSeparateThreadPool = z;
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    protected void setup() throws Exception {
        this.tempDirectory = Files.createTempDirectory("SimpleProtocolHandlerTest", new FileAttribute[0]).toFile();
        this.conf.setUseSeparateThreadPoolForProtocolHandlers(this.useSeparateThreadPool);
        this.conf.setProtocolHandlerDirectory(this.tempDirectory.getAbsolutePath());
        this.conf.setMessagingProtocols(Collections.singleton("test"));
        buildMockNarFile(this.tempDirectory);
        super.baseSetup();
    }

    @Test
    public void testBootstrapProtocolHandler() throws Exception {
        SocketAddress socketAddress = (SocketAddress) this.pulsar.getProtocolHandlers().getEndpoints().entrySet().stream().filter(entry -> {
            return ((String) entry.getValue()).equals("test");
        }).map((v0) -> {
            return v0.getKey();
        }).findAny().get();
        Socket socket = new Socket();
        try {
            socket.connect(socketAddress);
            Assert.assertEquals(IOUtils.toString(socket.getInputStream(), StandardCharsets.UTF_8), "ok");
            socket.close();
        } catch (Throwable th) {
            try {
                socket.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
        if (this.tempDirectory != null) {
            FileUtils.deleteDirectory(this.tempDirectory);
        }
    }

    private static void buildMockNarFile(File file) throws Exception {
        ZipOutputStream zipOutputStream = new ZipOutputStream(new FileOutputStream(new File(file, "temp.nar")));
        try {
            zipOutputStream.putNextEntry(new ZipEntry("META-INF/"));
            zipOutputStream.putNextEntry(new ZipEntry("META-INF/services/"));
            zipOutputStream.putNextEntry(new ZipEntry("META-INF/bundled-dependencies/"));
            zipOutputStream.putNextEntry(new ZipEntry("META-INF/services/pulsar-protocol-handler.yml"));
            zipOutputStream.write(("name: test\ndescription: this is a test\nhandlerClass: " + MyProtocolHandler.class.getName() + "\n").getBytes(StandardCharsets.UTF_8));
            zipOutputStream.closeEntry();
            zipOutputStream.close();
        } catch (Throwable th) {
            try {
                zipOutputStream.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }
}
