package org.apache.pulsar.websocket.proxy;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.proto.CompressionType;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.websocket.WebSocketService;
import org.apache.pulsar.websocket.data.ConsumerMessage;
import org.apache.pulsar.websocket.data.ProducerMessage;
import org.apache.pulsar.websocket.service.ProxyServer;
import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
import org.apache.pulsar.websocket.service.WebSocketServiceStarter;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"websocket"})
/* loaded from: input_file:org/apache/pulsar/websocket/proxy/ProxyPublishConsumeClientSideEncryptionTest.class */
public class ProxyPublishConsumeClientSideEncryptionTest extends ProducerConsumerBase {
    private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;
    private ScheduledExecutorService executor;
    private ProxyServer proxyServer;
    private WebSocketService service;

    @Generated
    private static final Logger log = LoggerFactory.getLogger(ProxyPublishConsumeClientSideEncryptionTest.class);
    private static final Charset charset = Charset.defaultCharset();

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    public void setup() throws Exception {
        this.executor = Executors.newScheduledThreadPool(1);
        this.conf.setBacklogQuotaCheckIntervalInSeconds(5);
        super.internalSetup();
        super.producerBaseSetup();
        WebSocketProxyConfiguration webSocketProxyConfiguration = new WebSocketProxyConfiguration();
        webSocketProxyConfiguration.setWebServicePort(Optional.of(0));
        webSocketProxyConfiguration.setClusterName("test");
        webSocketProxyConfiguration.setConfigurationMetadataStoreUrl("GLOBAL_DUMMY_VALUE");
        this.service = (WebSocketService) Mockito.spy(new WebSocketService(webSocketProxyConfiguration));
        ((WebSocketService) Mockito.doReturn(registerCloseable(new ZKMetadataStore(this.mockZooKeeperGlobal))).when(this.service)).createConfigMetadataStore(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyBoolean());
        this.proxyServer = new ProxyServer(webSocketProxyConfiguration);
        WebSocketServiceStarter.start(this.proxyServer, this.service);
        log.info("Proxy Server Started");
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
        if (this.service != null) {
            this.service.close();
        }
        if (this.proxyServer != null) {
            this.proxyServer.stop();
        }
        this.executor.shutdownNow();
        log.info("Finished Cleaning Up Test setup");
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "encryptKeyNames")
    public Object[][] encryptKeyNames() {
        return new Object[]{new Object[]{"client-ecdsa.pem"}, new Object[]{"client-rsa.pem"}};
    }

    @Test(dataProvider = "encryptKeyNames")
    public void testWssSendAndJavaConsumeWithEncryption(String str) throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("public/default/tp_");
        this.admin.topics().createNonPartitionedTopic(newUniqueName);
        this.admin.topics().createSubscription(newUniqueName, "s1", MessageId.earliest);
        int intValue = ((Integer) this.proxyServer.getListenPortHTTP().get()).intValue();
        CryptoKeyReaderForTest cryptoKeyReaderForTest = new CryptoKeyReaderForTest();
        ClientSideEncryptionWssProducer clientSideEncryptionWssProducer = new ClientSideEncryptionWssProducer("localhost", intValue, newUniqueName, "wss-p1", cryptoKeyReaderForTest, str, this.executor);
        clientSideEncryptionWssProducer.start();
        ProducerMessage producerMessage = new ProducerMessage();
        producerMessage.key = "k";
        producerMessage.payload = "msg-123";
        log.info("send success: {}", clientSideEncryptionWssProducer.sendMessage(producerMessage).toString());
        Consumer subscribe = this.pulsarClient.newConsumer().cryptoKeyReader(cryptoKeyReaderForTest).topic(new String[]{newUniqueName}).subscriptionName("s1").subscribe();
        Assert.assertEquals(new String(subscribe.receive(2, TimeUnit.SECONDS).getData(), charset), "msg-123");
        clientSideEncryptionWssProducer.close();
        subscribe.close();
        this.admin.topics().delete(newUniqueName);
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "compressionTypes")
    public Object[][] compressionTypes() {
        return new Object[]{new Object[]{CompressionType.NONE}, new Object[]{CompressionType.LZ4}, new Object[]{CompressionType.ZLIB}, new Object[]{CompressionType.SNAPPY}, new Object[]{CompressionType.ZSTD}};
    }

    @Test(dataProvider = "compressionTypes")
    public void testWssSendAndJavaConsumeWithEncryptionAndCompression(CompressionType compressionType) throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("public/default/tp_");
        this.admin.topics().createNonPartitionedTopic(newUniqueName);
        this.admin.topics().createSubscription(newUniqueName, "s1", MessageId.earliest);
        int intValue = ((Integer) this.proxyServer.getListenPortHTTP().get()).intValue();
        CryptoKeyReaderForTest cryptoKeyReaderForTest = new CryptoKeyReaderForTest();
        ClientSideEncryptionWssProducer clientSideEncryptionWssProducer = new ClientSideEncryptionWssProducer("localhost", intValue, newUniqueName, "wss-p1", cryptoKeyReaderForTest, "client-ecdsa.pem", this.executor);
        clientSideEncryptionWssProducer.start();
        ProducerMessage producerMessage = new ProducerMessage();
        producerMessage.key = "k";
        producerMessage.payload = "msg-123";
        producerMessage.compressionType = compressionType;
        log.info("send success: {}", clientSideEncryptionWssProducer.sendMessage(producerMessage).toString());
        Consumer subscribe = this.pulsarClient.newConsumer().cryptoKeyReader(cryptoKeyReaderForTest).topic(new String[]{newUniqueName}).subscriptionName("s1").subscribe();
        Assert.assertEquals(new String(subscribe.receive(2, TimeUnit.SECONDS).getData(), charset), "msg-123");
        clientSideEncryptionWssProducer.close();
        subscribe.close();
        this.admin.topics().delete(newUniqueName);
    }

    @Test(dataProvider = "encryptKeyNames")
    public void testJavaSendAndWssConsumeWithEncryption(String str) throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("public/default/tp_");
        this.admin.topics().createNonPartitionedTopic(newUniqueName);
        this.admin.topics().createSubscription(newUniqueName, "s1", MessageId.earliest);
        CryptoKeyReaderForTest cryptoKeyReaderForTest = new CryptoKeyReaderForTest();
        Producer create = this.pulsarClient.newProducer().topic(newUniqueName).addEncryptionKey(str).cryptoKeyReader(cryptoKeyReaderForTest).create();
        create.send("msg-123".getBytes(StandardCharsets.UTF_8));
        ClientSideEncryptionWssConsumer clientSideEncryptionWssConsumer = new ClientSideEncryptionWssConsumer("localhost", ((Integer) this.proxyServer.getListenPortHTTP().get()).intValue(), newUniqueName, "s1", SubscriptionType.Shared, cryptoKeyReaderForTest);
        clientSideEncryptionWssConsumer.start();
        Assert.assertEquals(clientSideEncryptionWssConsumer.receive(2, TimeUnit.SECONDS).payload, "msg-123");
        create.close();
        clientSideEncryptionWssConsumer.close();
        this.admin.topics().delete(newUniqueName);
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "compressionTypesForJ")
    public Object[][] compressionTypesForJ() {
        return new Object[]{new Object[]{org.apache.pulsar.client.api.CompressionType.NONE}, new Object[]{org.apache.pulsar.client.api.CompressionType.LZ4}, new Object[]{org.apache.pulsar.client.api.CompressionType.ZLIB}, new Object[]{org.apache.pulsar.client.api.CompressionType.SNAPPY}, new Object[]{org.apache.pulsar.client.api.CompressionType.ZSTD}};
    }

    @Test(dataProvider = "compressionTypesForJ")
    public void testJavaSendAndWssConsumeWithEncryptionAndCompression(org.apache.pulsar.client.api.CompressionType compressionType) throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("public/default/tp_");
        this.admin.topics().createNonPartitionedTopic(newUniqueName);
        this.admin.topics().createSubscription(newUniqueName, "s1", MessageId.earliest);
        CryptoKeyReaderForTest cryptoKeyReaderForTest = new CryptoKeyReaderForTest();
        Producer create = this.pulsarClient.newProducer().topic(newUniqueName).addEncryptionKey("client-ecdsa.pem").compressionType(compressionType).cryptoKeyReader(cryptoKeyReaderForTest).create();
        create.send("msg-123".getBytes(StandardCharsets.UTF_8));
        ClientSideEncryptionWssConsumer clientSideEncryptionWssConsumer = new ClientSideEncryptionWssConsumer("localhost", ((Integer) this.proxyServer.getListenPortHTTP().get()).intValue(), newUniqueName, "s1", SubscriptionType.Shared, cryptoKeyReaderForTest);
        clientSideEncryptionWssConsumer.start();
        Assert.assertEquals(clientSideEncryptionWssConsumer.receive(2, TimeUnit.SECONDS).payload, "msg-123");
        create.close();
        clientSideEncryptionWssConsumer.close();
        this.admin.topics().delete(newUniqueName);
    }

    @Test
    public void testJavaSendAndWssConsumeWithEncryptionAndCompressionAndBatch() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("public/default/tp_");
        this.admin.topics().createNonPartitionedTopic(newUniqueName);
        this.admin.topics().createSubscription(newUniqueName, "s1", MessageId.earliest);
        CryptoKeyReaderForTest cryptoKeyReaderForTest = new CryptoKeyReaderForTest();
        HashSet hashSet = new HashSet();
        Producer create = this.pulsarClient.newProducer().enableBatching(true).batchingMaxMessages(1000).batchingMaxPublishDelay(1L, TimeUnit.HOURS).topic(newUniqueName).addEncryptionKey("client-ecdsa.pem").compressionType(org.apache.pulsar.client.api.CompressionType.LZ4).cryptoKeyReader(cryptoKeyReaderForTest).create();
        for (int i = 0; i < 10; i++) {
            String str = "msg-" + i;
            hashSet.add(str);
            create.sendAsync(str.getBytes(StandardCharsets.UTF_8));
        }
        create.flush();
        ClientSideEncryptionWssConsumer clientSideEncryptionWssConsumer = new ClientSideEncryptionWssConsumer("localhost", ((Integer) this.proxyServer.getListenPortHTTP().get()).intValue(), newUniqueName, "s1", SubscriptionType.Shared, cryptoKeyReaderForTest);
        clientSideEncryptionWssConsumer.start();
        HashSet hashSet2 = new HashSet();
        while (true) {
            ConsumerMessage receive = clientSideEncryptionWssConsumer.receive(2, TimeUnit.SECONDS);
            if (receive == null) {
                Assert.assertEquals(hashSet2, hashSet);
                create.close();
                clientSideEncryptionWssConsumer.close();
                this.admin.topics().delete(newUniqueName);
                return;
            }
            hashSet2.add(receive.payload);
        }
    }

    @Test(dataProvider = "encryptKeyNames")
    public void testWssSendAndWssConsumeWithEncryption(String str) throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("public/default/tp_");
        this.admin.topics().createNonPartitionedTopic(newUniqueName);
        this.admin.topics().createSubscription(newUniqueName, "s1", MessageId.earliest);
        int intValue = ((Integer) this.proxyServer.getListenPortHTTP().get()).intValue();
        CryptoKeyReaderForTest cryptoKeyReaderForTest = new CryptoKeyReaderForTest();
        ClientSideEncryptionWssProducer clientSideEncryptionWssProducer = new ClientSideEncryptionWssProducer("localhost", intValue, newUniqueName, "wss-p1", cryptoKeyReaderForTest, str, this.executor);
        clientSideEncryptionWssProducer.start();
        ProducerMessage producerMessage = new ProducerMessage();
        producerMessage.key = "k";
        producerMessage.payload = "msg-123";
        log.info("send success: {}", clientSideEncryptionWssProducer.sendMessage(producerMessage).toString());
        ClientSideEncryptionWssConsumer clientSideEncryptionWssConsumer = new ClientSideEncryptionWssConsumer("localhost", intValue, newUniqueName, "s1", SubscriptionType.Shared, cryptoKeyReaderForTest);
        clientSideEncryptionWssConsumer.start();
        ConsumerMessage receive = clientSideEncryptionWssConsumer.receive(2, TimeUnit.SECONDS);
        Assert.assertEquals(receive.payload, "msg-123");
        Assert.assertEquals(((EncryptionContext.EncryptionKey) receive.encryptionContext.getKeys().get(str)).getMetadata(), CryptoKeyReaderForTest.RANDOM_METADATA);
        clientSideEncryptionWssProducer.close();
        clientSideEncryptionWssConsumer.close();
        this.admin.topics().delete(newUniqueName);
    }
}
