package org.apache.pulsar.client.api;

import com.github.tomakehurst.wiremock.WireMockServer;
import com.github.tomakehurst.wiremock.client.WireMock;
import com.github.tomakehurst.wiremock.common.FileSource;
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
import com.github.tomakehurst.wiremock.extension.Extension;
import com.github.tomakehurst.wiremock.extension.Parameters;
import com.github.tomakehurst.wiremock.extension.ResponseTransformer;
import com.github.tomakehurst.wiremock.http.Request;
import com.github.tomakehurst.wiremock.http.Response;
import com.google.common.collect.Sets;
import io.jsonwebtoken.SignatureAlgorithm;
import io.jsonwebtoken.impl.DefaultJwtBuilder;
import io.jsonwebtoken.security.Keys;
import java.net.URI;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.KeyPair;
import java.security.PrivateKey;
import java.time.Duration;
import java.util.Base64;
import java.util.Date;
import java.util.HashSet;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;
import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker-api"})
/* loaded from: input_file:org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.class */
public class TokenOauth2AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(TokenOauth2AuthenticatedProducerConsumerTest.class);
    private WireMockServer server;
    private final String ADMIN_ROLE = "Xd23RHsUnvUlP7wchjNYOaIfazgeHd9x@clients";
    private final String CREDENTIALS_FILE = "./src/test/resources/authentication/token/credentials_file.json";
    private final String AUDIENCE = "https://dev-kt-aa9ne.us.auth0.com/api/v2/";

    /* loaded from: input_file:org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest$OAuth2Transformer.class */
    class OAuth2Transformer extends ResponseTransformer {
        private final PrivateKey privateKey;
        private final long tokenTTL;

        OAuth2Transformer(KeyPair keyPair, long j) {
            this.privateKey = keyPair.getPrivate();
            this.tokenTTL = j;
        }

        public Response transform(Request request, Response response, FileSource fileSource, Parameters parameters) {
            return Response.Builder.like(response).but().body("{\n  \"access_token\": \"%s\",\n  \"expires_in\": %d,\n  \"token_type\":\"Bearer\"\n}\n".formatted(generateToken(), Long.valueOf(TimeUnit.MILLISECONDS.toSeconds(this.tokenTTL)))).build();
        }

        public String getName() {
            return "o-auth-token-transformer";
        }

        public boolean applyGlobally() {
            return false;
        }

        private String generateToken() {
            long currentTimeMillis = System.currentTimeMillis();
            DefaultJwtBuilder defaultJwtBuilder = new DefaultJwtBuilder();
            defaultJwtBuilder.setHeaderParam("typ", "JWT");
            defaultJwtBuilder.setHeaderParam("alg", "RS256");
            defaultJwtBuilder.setIssuer(TokenOauth2AuthenticatedProducerConsumerTest.this.server.baseUrl());
            defaultJwtBuilder.setSubject("Xd23RHsUnvUlP7wchjNYOaIfazgeHd9x@clients");
            defaultJwtBuilder.setAudience("https://dev-kt-aa9ne.us.auth0.com/api/v2/");
            defaultJwtBuilder.setIssuedAt(new Date(currentTimeMillis));
            defaultJwtBuilder.setNotBefore(new Date(currentTimeMillis));
            defaultJwtBuilder.setExpiration(new Date(currentTimeMillis + this.tokenTTL));
            defaultJwtBuilder.signWith(this.privateKey);
            return defaultJwtBuilder.compact();
        }
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod(alwaysRun = true)
    protected void setup() throws Exception {
        KeyPair keyPairFor = Keys.keyPairFor(SignatureAlgorithm.RS256);
        this.server = new WireMockServer(WireMockConfiguration.wireMockConfig().port(0).extensions(new Extension[]{new OAuth2Transformer(keyPairFor, 3000L)}));
        this.server.start();
        this.server.stubFor(WireMock.get(WireMock.urlEqualTo("/.well-known/openid-configuration")).willReturn(WireMock.aResponse().withHeader("Content-Type", new String[]{"application/json"}).withBody("{\n  \"issuer\": \"%s\",\n  \"token_endpoint\": \"%s/oauth/token\"\n}\n".replace("%s", this.server.baseUrl()))));
        this.server.stubFor(WireMock.post(WireMock.urlEqualTo("/oauth/token")).withRequestBody(WireMock.equalTo("audience=https%3A%2F%2Fdev-kt-aa9ne.us.auth0.com%2Fapi%2Fv2%2F&client_id=Xd23RHsUnvUlP7wchjNYOaIfazgeHd9x&client_secret=rT7ps7WY8uhdVuBTKWZkttwLdQotmdEliaM5rLfmgNibvqziZ-g07ZH52N_poGAb&grant_type=client_credentials")).willReturn(WireMock.aResponse().withTransformers(new String[]{"o-auth-token-transformer"}).withStatus(200)));
        this.conf.setAuthenticationEnabled(true);
        this.conf.setAuthorizationEnabled(true);
        this.conf.setAuthenticationRefreshCheckSeconds(1);
        HashSet hashSet = new HashSet();
        hashSet.add("Xd23RHsUnvUlP7wchjNYOaIfazgeHd9x@clients");
        this.conf.setSuperUserRoles(hashSet);
        HashSet hashSet2 = new HashSet();
        hashSet2.add(AuthenticationProviderToken.class.getName());
        this.conf.setAuthenticationProviders(hashSet2);
        this.conf.setBrokerClientAuthenticationPlugin(AuthenticationOAuth2.class.getName());
        this.conf.setBrokerClientAuthenticationParameters("{\n  \"privateKey\": \"./src/test/resources/authentication/token/credentials_file.json\",\n  \"issuerUrl\": \"" + this.server.baseUrl() + "\",\n  \"audience\": \"https://dev-kt-aa9ne.us.auth0.com/api/v2/\"\n}\n");
        this.conf.setClusterName("test");
        Properties properties = new Properties();
        properties.setProperty("tokenPublicKey", "data:;base64," + Base64.getEncoder().encodeToString(keyPairFor.getPublic().getEncoded()));
        this.conf.setProperties(properties);
        super.init();
    }

    protected final void clientSetup() throws Exception {
        Path absolutePath = Paths.get("./src/test/resources/authentication/token/credentials_file.json", new String[0]).toAbsolutePath();
        log.info("Credentials File path: {}", absolutePath.toString());
        Authentication clientCredentials = AuthenticationFactoryOAuth2.clientCredentials(new URL(this.server.baseUrl()), absolutePath.toUri().toURL(), "https://dev-kt-aa9ne.us.auth0.com/api/v2/");
        this.admin = (PulsarAdmin) Mockito.spy(PulsarAdmin.builder().serviceHttpUrl(this.brokerUrl.toString()).authentication(clientCredentials).build());
        replacePulsarClient(PulsarClient.builder().serviceUrl(new URI(this.pulsar.getBrokerServiceUrl()).toString()).statsInterval(0L, TimeUnit.SECONDS).authentication(clientCredentials));
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
        this.server.stop();
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "batch")
    public Object[][] codecProvider() {
        return new Object[]{new Object[]{0}, new Object[]{1000}};
    }

    private void testSyncProducerAndConsumer() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic"}).subscriptionName("my-subscriber-name").subscribe();
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic").create();
        for (int i = 0; i < 10; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        Message message = null;
        HashSet newHashSet = Sets.newHashSet();
        for (int i2 = 0; i2 < 10; i2++) {
            message = subscribe.receive(5, TimeUnit.SECONDS);
            String str = new String(message.getData());
            log.debug("Received message: [{}]", str);
            testMessageOrderAndDuplicates(newHashSet, str, "my-message-" + i2);
        }
        subscribe.acknowledgeCumulative(message);
        subscribe.close();
    }

    @Test
    public void testTokenProducerAndConsumer() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        clientSetup();
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(this.brokerUrl.toString()).build());
        this.admin.tenants().createTenant("my-property", new TenantInfoImpl(Sets.newHashSet(new String[]{"appid1", "appid2"}), Sets.newHashSet(new String[]{"test"})));
        this.admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet(new String[]{"test"}));
        testSyncProducerAndConsumer();
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test
    public void testOAuth2TokenRefreshedWithoutReconnect() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        clientSetup();
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(this.brokerUrl.toString()).build());
        this.admin.tenants().createTenant("my-property", new TenantInfoImpl(Sets.newHashSet(new String[]{"appid1", "appid2"}), Sets.newHashSet(new String[]{"test"})));
        this.admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet(new String[]{"test"}));
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic"}).subscriptionName("my-subscriber-name").subscribe();
        ProducerImpl create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic").create();
        for (int i = 0; i < 10; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        Message message = null;
        HashSet newHashSet = Sets.newHashSet();
        for (int i2 = 0; i2 < 10; i2++) {
            message = subscribe.receive(5, TimeUnit.SECONDS);
            String str = new String(message.getData());
            log.debug("Received message: [{}]", str);
            testMessageOrderAndDuplicates(newHashSet, str, "my-message-" + i2);
        }
        subscribe.acknowledgeCumulative(message);
        ProducerImpl producerImpl = create;
        String commandData = producerImpl.getClientCnx().getAuthenticationDataProvider().getCommandData();
        long lastDisconnectedTimestamp = create.getLastDisconnectedTimestamp();
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).with().pollInterval(Duration.ofMillis(250L)).untilAsserted(() -> {
            Assert.assertNotEquals(producerImpl.getClientCnx().getAuthenticationDataProvider().getCommandData(), commandData);
        });
        Assert.assertEquals(create.getLastDisconnectedTimestamp(), lastDisconnectedTimestamp);
        for (int i3 = 0; i3 < 10; i3++) {
            create.send(("my-message-" + i3).getBytes());
        }
        Message message2 = null;
        HashSet newHashSet2 = Sets.newHashSet();
        for (int i4 = 0; i4 < 10; i4++) {
            message2 = subscribe.receive(5, TimeUnit.SECONDS);
            String str2 = new String(message2.getData());
            log.debug("Received message: [{}]", str2);
            testMessageOrderAndDuplicates(newHashSet2, str2, "my-message-" + i4);
        }
        subscribe.acknowledgeCumulative(message2);
        subscribe.close();
    }
}
