package org.apache.pulsar.client.api;

import com.google.common.collect.Sets;
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;
import java.net.URI;
import java.security.KeyPair;
import java.security.KeyPairGenerator;
import java.security.NoSuchAlgorithmException;
import java.security.PrivateKey;
import java.time.Duration;
import java.util.Base64;
import java.util.Date;
import java.util.HashSet;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker-api"})
/* loaded from: input_file:org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.class */
public class TokenAuthenticatedProducerConsumerTest extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(TokenAuthenticatedProducerConsumerTest.class);
    private final String ADMIN_TOKEN;
    private final String TOKEN_PUBLIC_KEY;

    TokenAuthenticatedProducerConsumerTest() throws NoSuchAlgorithmException {
        KeyPair generateKeyPair = KeyPairGenerator.getInstance("RSA").generateKeyPair();
        this.TOKEN_PUBLIC_KEY = "data:;base64," + Base64.getEncoder().encodeToString(generateKeyPair.getPublic().getEncoded());
        this.ADMIN_TOKEN = generateToken(generateKeyPair);
    }

    private String generateToken(KeyPair keyPair) {
        PrivateKey privateKey = keyPair.getPrivate();
        return Jwts.builder().setSubject("admin").setExpiration(new Date(System.currentTimeMillis() + Duration.ofHours(1L).toMillis())).signWith(privateKey, SignatureAlgorithm.forSigningKey(privateKey)).compact();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        this.conf.setAuthenticationEnabled(true);
        this.conf.setAuthorizationEnabled(true);
        HashSet hashSet = new HashSet();
        hashSet.add("admin");
        this.conf.setSuperUserRoles(hashSet);
        HashSet hashSet2 = new HashSet();
        hashSet2.add(AuthenticationProviderToken.class.getName());
        this.conf.setAuthenticationProviders(hashSet2);
        this.conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
        this.conf.setBrokerClientAuthenticationParameters("token:" + this.ADMIN_TOKEN);
        this.conf.setClusterName("test");
        Properties properties = new Properties();
        properties.setProperty("tokenPublicKey", this.TOKEN_PUBLIC_KEY);
        this.conf.setProperties(properties);
        super.init();
    }

    protected final void clientSetup() throws Exception {
        this.admin = (PulsarAdmin) Mockito.spy(PulsarAdmin.builder().serviceHttpUrl(this.brokerUrl.toString()).authentication(AuthenticationFactory.token(this.ADMIN_TOKEN)).build());
        replacePulsarClient(PulsarClient.builder().serviceUrl(new URI(this.pulsar.getBrokerServiceUrl()).toString()).statsInterval(0L, TimeUnit.SECONDS).authentication(AuthenticationFactory.token(this.ADMIN_TOKEN)));
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "batch")
    public Object[][] codecProvider() {
        return new Object[]{new Object[]{0}, new Object[]{1000}};
    }

    private void testSyncProducerAndConsumer() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic"}).subscriptionName("my-subscriber-name").subscribe();
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic").create();
        for (int i = 0; i < 10; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        Message message = null;
        HashSet newHashSet = Sets.newHashSet();
        for (int i2 = 0; i2 < 10; i2++) {
            message = subscribe.receive(5, TimeUnit.SECONDS);
            String str = new String(message.getData());
            log.debug("Received message: [{}]", str);
            testMessageOrderAndDuplicates(newHashSet, str, "my-message-" + i2);
        }
        subscribe.acknowledgeCumulative(message);
        subscribe.close();
    }

    @Test
    public void testTokenProducerAndConsumer() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        clientSetup();
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(this.brokerUrl.toString()).build());
        this.admin.tenants().createTenant("my-property", new TenantInfoImpl(Sets.newHashSet(new String[]{"appid1", "appid2"}), Sets.newHashSet(new String[]{"test"})));
        this.admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet(new String[]{"test"}));
        testSyncProducerAndConsumer();
        log.info("-- Exiting {} test --", this.methodName);
    }
}
