package org.apache.pulsar.tests.integration;

import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.Security;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.EncryptionKeyInfo;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
import org.apache.pulsar.tests.TestRetrySupport;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.class */
public class SimpleProducerConsumerTest extends TestRetrySupport {
    private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerTest.class);
    private PulsarContainer pulsarContainer;
    private URI lookupUrl;
    private PulsarClient pulsarClient;

    @BeforeClass(alwaysRun = true)
    public void setup() throws Exception {
        incrementSetupNumber();
        Security.addProvider(new BouncyCastleProvider());
        this.pulsarContainer = new PulsarContainer();
        this.pulsarContainer.start();
        this.pulsarClient = PulsarClient.builder().serviceUrl(this.pulsarContainer.getPlainTextPulsarBrokerUrl()).build();
        this.lookupUrl = new URI(this.pulsarContainer.getPlainTextPulsarBrokerUrl());
        PulsarAdmin build = PulsarAdmin.builder().serviceHttpUrl(this.pulsarContainer.getPulsarAdminUrl()).build();
        try {
            build.tenants().createTenant("my-property", TenantInfo.builder().adminRoles(new HashSet(Arrays.asList("appid1", "appid2"))).allowedClusters(Collections.singleton("standalone")).build());
            build.namespaces().createNamespace("my-property/my-ns");
            build.namespaces().setNamespaceReplicationClusters("my-property/my-ns", Collections.singleton("standalone"));
        } finally {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        }
    }

    @AfterClass(alwaysRun = true)
    public void cleanup() throws Exception {
        markCurrentSetupNumberCleaned();
        if (this.pulsarClient != null) {
            this.pulsarClient.close();
            this.pulsarClient = null;
        }
        if (this.pulsarContainer != null) {
            this.pulsarContainer.stop();
            this.pulsarContainer.close();
            this.pulsarContainer = null;
        }
    }

    private PulsarClient newPulsarClient(String str, int i) throws PulsarClientException {
        return PulsarClient.builder().serviceUrl(str).statsInterval(i, TimeUnit.SECONDS).build();
    }

    @Test
    public void testRSAEncryption() throws Exception {
        String str = "persistent://my-property/my-ns/myrsa-topic1-" + System.currentTimeMillis();
        HashSet hashSet = new HashSet();
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/myrsa-topic1"}).subscriptionName("my-subscriber-name").cryptoKeyReader(new CryptoKeyReader() { // from class: org.apache.pulsar.tests.integration.SimpleProducerConsumerTest.1EncKeyReader
            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            public EncryptionKeyInfo getPublicKey(String str2, Map<String, String> map) {
                String str3 = "./src/test/resources/certificate/public-key." + str2;
                if (!Files.isReadable(Paths.get(str3, new String[0]))) {
                    Assert.fail("Certificate file " + str3 + " is not present or not readable.");
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str3, new String[0])));
                    return this.keyInfo;
                } catch (IOException e) {
                    Assert.fail("Failed to read certificate from " + str3);
                    return null;
                }
            }

            public EncryptionKeyInfo getPrivateKey(String str2, Map<String, String> map) {
                String str3 = "./src/test/resources/certificate/private-key." + str2;
                if (!Files.isReadable(Paths.get(str3, new String[0]))) {
                    Assert.fail("Certificate file " + str3 + " is not present or not readable.");
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str3, new String[0])));
                    return this.keyInfo;
                } catch (IOException e) {
                    Assert.fail("Failed to read certificate from " + str3);
                    return null;
                }
            }
        }).subscribe();
        Consumer subscribe2 = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("my-subscriber-name-normal").subscribe();
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1").addEncryptionKey("client-rsa.pem").cryptoKeyReader(new CryptoKeyReader() { // from class: org.apache.pulsar.tests.integration.SimpleProducerConsumerTest.1EncKeyReader
            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            public EncryptionKeyInfo getPublicKey(String str2, Map<String, String> map) {
                String str3 = "./src/test/resources/certificate/public-key." + str2;
                if (!Files.isReadable(Paths.get(str3, new String[0]))) {
                    Assert.fail("Certificate file " + str3 + " is not present or not readable.");
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str3, new String[0])));
                    return this.keyInfo;
                } catch (IOException e) {
                    Assert.fail("Failed to read certificate from " + str3);
                    return null;
                }
            }

            public EncryptionKeyInfo getPrivateKey(String str2, Map<String, String> map) {
                String str3 = "./src/test/resources/certificate/private-key." + str2;
                if (!Files.isReadable(Paths.get(str3, new String[0]))) {
                    Assert.fail("Certificate file " + str3 + " is not present or not readable.");
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str3, new String[0])));
                    return this.keyInfo;
                } catch (IOException e) {
                    Assert.fail("Failed to read certificate from " + str3);
                    return null;
                }
            }
        }).create();
        Producer create2 = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1").addEncryptionKey("client-rsa.pem").cryptoKeyReader(new CryptoKeyReader() { // from class: org.apache.pulsar.tests.integration.SimpleProducerConsumerTest.1EncKeyReader
            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            public EncryptionKeyInfo getPublicKey(String str2, Map<String, String> map) {
                String str3 = "./src/test/resources/certificate/public-key." + str2;
                if (!Files.isReadable(Paths.get(str3, new String[0]))) {
                    Assert.fail("Certificate file " + str3 + " is not present or not readable.");
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str3, new String[0])));
                    return this.keyInfo;
                } catch (IOException e) {
                    Assert.fail("Failed to read certificate from " + str3);
                    return null;
                }
            }

            public EncryptionKeyInfo getPrivateKey(String str2, Map<String, String> map) {
                String str3 = "./src/test/resources/certificate/private-key." + str2;
                if (!Files.isReadable(Paths.get(str3, new String[0]))) {
                    Assert.fail("Certificate file " + str3 + " is not present or not readable.");
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str3, new String[0])));
                    return this.keyInfo;
                } catch (IOException e) {
                    Assert.fail("Failed to read certificate from " + str3);
                    return null;
                }
            }
        }).create();
        for (int i = 0; i < 10; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        for (int i2 = 10; i2 < 20; i2++) {
            create2.send(("my-message-" + i2).getBytes());
        }
        MessageImpl receive = subscribe2.receive(500, TimeUnit.MILLISECONDS);
        Assert.assertNull(receive);
        for (int i3 = 0; i3 < 20; i3++) {
            receive = subscribe.receive(5, TimeUnit.SECONDS);
            receive.getEncryptionCtx().orElseThrow(() -> {
                return new IllegalStateException("encryption-ctx not present for encrypted message");
            });
            String str2 = new String(receive.getData());
            log.debug("Received message: [{}]", str2);
            testMessageOrderAndDuplicates(hashSet, str2, "my-message-" + i3);
        }
        subscribe.acknowledgeCumulative(receive);
        subscribe.close();
    }

    protected <T> void testMessageOrderAndDuplicates(Set<T> set, T t, T t2) {
        Assert.assertEquals(t, t2, "Received message " + t + " did not match the expected message " + t2);
        Assert.assertTrue(set.add(t), "Received duplicate message " + t);
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testRedeliveryOfFailedMessages() throws Exception {
        PulsarClient build = PulsarClient.builder().serviceUrl(this.pulsarContainer.getPlainTextPulsarBrokerUrl()).build();
        try {
            final HashMap hashMap = new HashMap();
            hashMap.put("version", "1.0");
            Producer create = build.newProducer().topic("persistent://my-property/my-ns/myrsa-topic2").addEncryptionKey("client-rsa.pem").compressionType(CompressionType.LZ4).cryptoKeyReader(new CryptoKeyReader() { // from class: org.apache.pulsar.tests.integration.SimpleProducerConsumerTest.2EncKeyReader
                EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

                public EncryptionKeyInfo getPublicKey(String str, Map<String, String> map) {
                    String str2 = "./src/test/resources/certificate/public-key." + str;
                    if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                        Assert.fail("Certificate file " + str2 + " is not present or not readable.");
                        return null;
                    }
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                        this.keyInfo.setMetadata(hashMap);
                        return this.keyInfo;
                    } catch (IOException e) {
                        Assert.fail("Failed to read certificate from " + str2);
                        return null;
                    }
                }

                public EncryptionKeyInfo getPrivateKey(String str, Map<String, String> map) {
                    String str2 = "./src/test/resources/certificate/private-key." + str;
                    if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                        Assert.fail("Certificate file " + str2 + " is not present or not readable.");
                        return null;
                    }
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                        this.keyInfo.setMetadata(hashMap);
                        return this.keyInfo;
                    } catch (IOException e) {
                        Assert.fail("Failed to read certificate from " + str2);
                        return null;
                    }
                }
            }).create();
            PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
            try {
                Consumer subscribe = newPulsarClient.newConsumer().topicsPattern("persistent://my-property/my-ns/myrsa-topic2").subscriptionName("my-subscriber-name").cryptoKeyReader(new CryptoKeyReader() { // from class: org.apache.pulsar.tests.integration.SimpleProducerConsumerTest.2EncKeyReader
                    EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

                    public EncryptionKeyInfo getPublicKey(String str, Map<String, String> map) {
                        String str2 = "./src/test/resources/certificate/public-key." + str;
                        if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                            Assert.fail("Certificate file " + str2 + " is not present or not readable.");
                            return null;
                        }
                        try {
                            this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                            this.keyInfo.setMetadata(hashMap);
                            return this.keyInfo;
                        } catch (IOException e) {
                            Assert.fail("Failed to read certificate from " + str2);
                            return null;
                        }
                    }

                    public EncryptionKeyInfo getPrivateKey(String str, Map<String, String> map) {
                        String str2 = "./src/test/resources/certificate/private-key." + str;
                        if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                            Assert.fail("Certificate file " + str2 + " is not present or not readable.");
                            return null;
                        }
                        try {
                            this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                            this.keyInfo.setMetadata(hashMap);
                            return this.keyInfo;
                        } catch (IOException e) {
                            Assert.fail("Failed to read certificate from " + str2);
                            return null;
                        }
                    }
                }).subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).subscribe();
                PulsarClient newPulsarClient2 = newPulsarClient(this.lookupUrl.toString(), 0);
                try {
                    Consumer subscribe2 = newPulsarClient2.newConsumer().topicsPattern("persistent://my-property/my-ns/myrsa-topic2").subscriptionName("my-subscriber-name").cryptoKeyReader(new CryptoKeyReader() { // from class: org.apache.pulsar.tests.integration.SimpleProducerConsumerTest.1InvalidKeyReader
                        EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

                        public EncryptionKeyInfo getPublicKey(String str, Map<String, String> map) {
                            return null;
                        }

                        public EncryptionKeyInfo getPrivateKey(String str, Map<String, String> map) {
                            return null;
                        }
                    }).subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).subscribe();
                    PulsarClient newPulsarClient3 = newPulsarClient(this.lookupUrl.toString(), 0);
                    try {
                        Consumer subscribe3 = newPulsarClient3.newConsumer().topicsPattern("persistent://my-property/my-ns/myrsa-topic2").subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).subscribe();
                        HashSet hashSet = new HashSet();
                        for (int i = 0; i < 100; i++) {
                            create.send(("my-message" + i).getBytes());
                        }
                        Assert.assertNull(subscribe2.receive(3, TimeUnit.SECONDS));
                        Assert.assertNull(subscribe3.receive(3, TimeUnit.SECONDS));
                        for (int i2 = 0; i2 < 100; i2++) {
                            Message receive = subscribe.receive();
                            hashSet.add(new String(receive.getData()));
                            subscribe.acknowledge(receive);
                        }
                        Assert.assertNull(subscribe2.receive(3, TimeUnit.SECONDS));
                        Assert.assertNull(subscribe3.receive(3, TimeUnit.SECONDS));
                        for (int i3 = 0; i3 < 100; i3++) {
                            Assert.assertTrue(hashSet.contains("my-message" + i3));
                        }
                        subscribe.close();
                        subscribe2.close();
                        subscribe3.close();
                        if (Collections.singletonList(newPulsarClient3).get(0) != null) {
                            newPulsarClient3.close();
                        }
                        if (Collections.singletonList(newPulsarClient2).get(0) != null) {
                            newPulsarClient2.close();
                        }
                        if (Collections.singletonList(newPulsarClient).get(0) != null) {
                            newPulsarClient.close();
                        }
                    } catch (Throwable th) {
                        if (Collections.singletonList(newPulsarClient3).get(0) != null) {
                            newPulsarClient3.close();
                        }
                        throw th;
                    }
                } catch (Throwable th2) {
                    if (Collections.singletonList(newPulsarClient2).get(0) != null) {
                        newPulsarClient2.close();
                    }
                    throw th2;
                }
            } catch (Throwable th3) {
                if (Collections.singletonList(newPulsarClient).get(0) != null) {
                    newPulsarClient.close();
                }
                throw th3;
            }
        } finally {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        }
    }

    @Test
    public void testEncryptionFailure() throws Exception {
        HashSet hashSet = new HashSet();
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/myenc-ns/myenc-topic1"}).subscriptionName("my-subscriber-name").acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        try {
            this.pulsarClient.newProducer().topic("persistent://my-property/use/myenc-ns/myenc-topic1").addEncryptionKey("client-non-existant-rsa.pem").cryptoKeyReader(new CryptoKeyReader() { // from class: org.apache.pulsar.tests.integration.SimpleProducerConsumerTest.3EncKeyReader
                EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

                public EncryptionKeyInfo getPublicKey(String str, Map<String, String> map) {
                    String str2 = "./src/test/resources/certificate/public-key." + str;
                    if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                        return null;
                    }
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                        return this.keyInfo;
                    } catch (IOException e) {
                        SimpleProducerConsumerTest.log.error("Failed to read certificate from {}", str2);
                        return null;
                    }
                }

                public EncryptionKeyInfo getPrivateKey(String str, Map<String, String> map) {
                    String str2 = "./src/test/resources/certificate/private-key." + str;
                    if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                        return null;
                    }
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                        return this.keyInfo;
                    } catch (IOException e) {
                        SimpleProducerConsumerTest.log.error("Failed to read certificate from {}", str2);
                        return null;
                    }
                }
            }).create();
            Assert.fail("Producer creation should not suceed if failing to read key");
        } catch (Exception e) {
        }
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/use/myenc-ns/myenc-topic1").addEncryptionKey("client-rsa.pem").cryptoKeyReader(new CryptoKeyReader() { // from class: org.apache.pulsar.tests.integration.SimpleProducerConsumerTest.3EncKeyReader
            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            public EncryptionKeyInfo getPublicKey(String str, Map<String, String> map) {
                String str2 = "./src/test/resources/certificate/public-key." + str;
                if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                    return this.keyInfo;
                } catch (IOException e2) {
                    SimpleProducerConsumerTest.log.error("Failed to read certificate from {}", str2);
                    return null;
                }
            }

            public EncryptionKeyInfo getPrivateKey(String str, Map<String, String> map) {
                String str2 = "./src/test/resources/certificate/private-key." + str;
                if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                    return this.keyInfo;
                } catch (IOException e2) {
                    SimpleProducerConsumerTest.log.error("Failed to read certificate from {}", str2);
                    return null;
                }
            }
        }).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        for (int i = 0; i < 10; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        MessageImpl receive = subscribe.receive(5, TimeUnit.SECONDS);
        Assert.assertNull(receive, "Receive should have failed with no keyreader");
        subscribe.close();
        Consumer subscribe2 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/myenc-ns/myenc-topic1"}).subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        int i2 = 0;
        try {
            receive = (MessageImpl) subscribe2.receive(5, TimeUnit.SECONDS);
            String str = new String(receive.getData());
            i2 = 0 + 1;
            String str2 = "my-message-" + 0;
            Assert.assertNotEquals(str, str2, "Received encrypted message " + str + " should not match the expected message " + str2);
            subscribe2.acknowledgeCumulative(receive);
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail("Failed to receive message even after ConsumerCryptoFailureAction.CONSUME is set.");
        }
        subscribe2.close();
        Consumer subscribe3 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/myenc-ns/myenc-topic1"}).subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.FAIL).cryptoKeyReader(new CryptoKeyReader() { // from class: org.apache.pulsar.tests.integration.SimpleProducerConsumerTest.3EncKeyReader
            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            public EncryptionKeyInfo getPublicKey(String str3, Map<String, String> map) {
                String str22 = "./src/test/resources/certificate/public-key." + str3;
                if (!Files.isReadable(Paths.get(str22, new String[0]))) {
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str22, new String[0])));
                    return this.keyInfo;
                } catch (IOException e22) {
                    SimpleProducerConsumerTest.log.error("Failed to read certificate from {}", str22);
                    return null;
                }
            }

            public EncryptionKeyInfo getPrivateKey(String str3, Map<String, String> map) {
                String str22 = "./src/test/resources/certificate/private-key." + str3;
                if (!Files.isReadable(Paths.get(str22, new String[0]))) {
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str22, new String[0])));
                    return this.keyInfo;
                } catch (IOException e22) {
                    SimpleProducerConsumerTest.log.error("Failed to read certificate from {}", str22);
                    return null;
                }
            }
        }).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        for (int i3 = i2; i3 < 9; i3++) {
            receive = subscribe3.receive(5, TimeUnit.SECONDS);
            receive.getEncryptionCtx().orElseThrow(() -> {
                return new IllegalStateException("encryption-ctx not present for encrypted message");
            });
            String str3 = new String(receive.getData());
            log.debug("Received message: [{}]", str3);
            testMessageOrderAndDuplicates(hashSet, str3, "my-message-" + i3);
        }
        subscribe3.acknowledgeCumulative(receive);
        subscribe3.close();
        subscribe3.close();
        Assert.assertNull(this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/myenc-ns/myenc-topic1"}).subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.DISCARD).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe().receive(5, TimeUnit.SECONDS), "Message received even aftet ConsumerCryptoFailureAction.DISCARD is set.");
    }

    @Test
    public void testEncryptionConsumerWithoutCryptoReader() throws Exception {
        final HashMap hashMap = new HashMap();
        hashMap.put("version", "1.0");
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic3").addEncryptionKey("client-rsa.pem").compressionType(CompressionType.LZ4).cryptoKeyReader(new CryptoKeyReader() { // from class: org.apache.pulsar.tests.integration.SimpleProducerConsumerTest.4EncKeyReader
            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            public EncryptionKeyInfo getPublicKey(String str, Map<String, String> map) {
                String str2 = "./src/test/resources/certificate/public-key." + str;
                if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                    Assert.fail("Certificate file " + str2 + " is not present or not readable.");
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                    this.keyInfo.setMetadata(hashMap);
                    return this.keyInfo;
                } catch (IOException e) {
                    Assert.fail("Failed to read certificate from " + str2);
                    return null;
                }
            }

            public EncryptionKeyInfo getPrivateKey(String str, Map<String, String> map) {
                String str2 = "./src/test/resources/certificate/private-key." + str;
                if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                    Assert.fail("Certificate file " + str2 + " is not present or not readable.");
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                    this.keyInfo.setMetadata(hashMap);
                    return this.keyInfo;
                } catch (IOException e) {
                    Assert.fail("Failed to read certificate from " + str2);
                    return null;
                }
            }
        }).create();
        Consumer subscribe = this.pulsarClient.newConsumer().topicsPattern("persistent://my-property/my-ns/myrsa-topic3").subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME).subscribe();
        create.send("my-message".getBytes());
        Assert.assertEquals("my-message", decryptMessage((TopicMessageImpl) subscribe.receive(5, TimeUnit.SECONDS), "client-rsa.pem", new CryptoKeyReader() { // from class: org.apache.pulsar.tests.integration.SimpleProducerConsumerTest.4EncKeyReader
            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            public EncryptionKeyInfo getPublicKey(String str, Map<String, String> map) {
                String str2 = "./src/test/resources/certificate/public-key." + str;
                if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                    Assert.fail("Certificate file " + str2 + " is not present or not readable.");
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                    this.keyInfo.setMetadata(hashMap);
                    return this.keyInfo;
                } catch (IOException e) {
                    Assert.fail("Failed to read certificate from " + str2);
                    return null;
                }
            }

            public EncryptionKeyInfo getPrivateKey(String str, Map<String, String> map) {
                String str2 = "./src/test/resources/certificate/private-key." + str;
                if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                    Assert.fail("Certificate file " + str2 + " is not present or not readable.");
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                    this.keyInfo.setMetadata(hashMap);
                    return this.keyInfo;
                } catch (IOException e) {
                    Assert.fail("Failed to read certificate from " + str2);
                    return null;
                }
            }
        }));
        subscribe.close();
    }

    private String decryptMessage(TopicMessageImpl<byte[]> topicMessageImpl, String str, CryptoKeyReader cryptoKeyReader) throws Exception {
        Optional encryptionCtx = topicMessageImpl.getEncryptionCtx();
        Assert.assertTrue(encryptionCtx.isPresent());
        EncryptionContext encryptionContext = (EncryptionContext) encryptionCtx.orElseThrow(() -> {
            return new IllegalStateException("encryption-ctx not present for encrypted message");
        });
        Map keys = encryptionContext.getKeys();
        Assert.assertEquals(keys.size(), 1);
        EncryptionContext.EncryptionKey encryptionKey = (EncryptionContext.EncryptionKey) keys.get(str);
        byte[] keyValue = encryptionKey.getKeyValue();
        Assert.assertEquals((String) encryptionKey.getMetadata().get("version"), "1.0");
        CompressionType compressionType = encryptionContext.getCompressionType();
        int uncompressedMessageSize = encryptionContext.getUncompressedMessageSize();
        byte[] param = encryptionContext.getParam();
        String algorithm = encryptionContext.getAlgorithm();
        int intValue = ((Integer) encryptionContext.getBatchSize().orElse(0)).intValue();
        ByteBuffer wrap = ByteBuffer.wrap(topicMessageImpl.getData());
        MessageCryptoBc messageCryptoBc = new MessageCryptoBc("test", false);
        MessageMetadata uncompressedSize = new MessageMetadata().setEncryptionParam(param).setProducerName("test").setSequenceId(123L).setPublishTime(12333453454L).setCompression(CompressionCodecProvider.convertToWireProtocol(compressionType)).setUncompressedSize(uncompressedMessageSize);
        if (algorithm != null) {
            uncompressedSize.setEncryptionAlgo(algorithm);
        }
        uncompressedSize.addEncryptionKey().setKey(str).setValue(keyValue);
        ByteBuffer allocate = ByteBuffer.allocate(messageCryptoBc.getMaxOutputSize(wrap.remaining()));
        messageCryptoBc.decrypt(() -> {
            return uncompressedSize;
        }, wrap, allocate, cryptoKeyReader);
        ByteBuf decode = CompressionCodecProvider.getCompressionCodec(compressionType).decode(Unpooled.wrappedBuffer(allocate), uncompressedMessageSize);
        if (intValue > 0) {
            decode = Commands.deSerializeSingleMessageInBatch(decode, new SingleMessageMetadata(), 0, intValue);
        }
        byte[] bArr = new byte[decode.readableBytes()];
        decode.readBytes(bArr);
        decode.release();
        return new String(bArr);
    }
}
