/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.api;

import com.google.common.collect.Sets;
import java.net.URI;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"broker-api"})
public class TokenOauth2AuthenticatedProducerConsumerTest
extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(TokenOauth2AuthenticatedProducerConsumerTest.class);
    private final String TOKEN_TEST_PUBLIC_KEY = "data:;base64,MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA2tZd/4gJda3U2Pc3tpgRAN7JPGWx/Gn17v/0IiZlNNRbP/Mmf0Vc6G1qsnaRaWNWOR+t6/a6ekFHJMikQ1N2X6yfz4UjMc8/G2FDPRmWjA+GURzARjVhxc/BBEYGoD0Kwvbq/u9CZm2QjlKrYaLfg3AeB09j0btNrDJ8rBsNzU6AuzChRvXj9IdcE/A/4N/UQ+S9cJ4UXP6NJbToLwajQ5km+CnxdGE6nfB7LWHvOFHjn9C2Rb9e37CFlmeKmIVFkagFM0gbmGOb6bnGI8Bp/VNGV0APef4YaBvBTqwoZ1Z4aDHy5eRxXfAMdtBkBupmBXqL6bpd15XRYUbu/7ck9QIDAQAB";
    private final String ADMIN_ROLE = "Xd23RHsUnvUlP7wchjNYOaIfazgeHd9x@clients";
    private final String CREDENTIALS_FILE = "./src/test/resources/authentication/token/credentials_file.json";

    @Override
    @BeforeMethod(alwaysRun=true)
    protected void setup() throws Exception {
        this.conf.setAuthenticationEnabled(true);
        this.conf.setAuthorizationEnabled(true);
        this.conf.setAuthenticationRefreshCheckSeconds(5);
        HashSet<String> superUserRoles = new HashSet<String>();
        superUserRoles.add("Xd23RHsUnvUlP7wchjNYOaIfazgeHd9x@clients");
        this.conf.setSuperUserRoles(superUserRoles);
        HashSet<String> providers = new HashSet<String>();
        providers.add(AuthenticationProviderToken.class.getName());
        this.conf.setAuthenticationProviders(providers);
        this.conf.setClusterName("test");
        Properties properties = new Properties();
        properties.setProperty("tokenPublicKey", "data:;base64,MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA2tZd/4gJda3U2Pc3tpgRAN7JPGWx/Gn17v/0IiZlNNRbP/Mmf0Vc6G1qsnaRaWNWOR+t6/a6ekFHJMikQ1N2X6yfz4UjMc8/G2FDPRmWjA+GURzARjVhxc/BBEYGoD0Kwvbq/u9CZm2QjlKrYaLfg3AeB09j0btNrDJ8rBsNzU6AuzChRvXj9IdcE/A/4N/UQ+S9cJ4UXP6NJbToLwajQ5km+CnxdGE6nfB7LWHvOFHjn9C2Rb9e37CFlmeKmIVFkagFM0gbmGOb6bnGI8Bp/VNGV0APef4YaBvBTqwoZ1Z4aDHy5eRxXfAMdtBkBupmBXqL6bpd15XRYUbu/7ck9QIDAQAB");
        this.conf.setProperties(properties);
        super.init();
    }

    protected final void clientSetup() throws Exception {
        Path path = Paths.get("./src/test/resources/authentication/token/credentials_file.json", new String[0]).toAbsolutePath();
        log.info("Credentials File path: {}", (Object)path.toString());
        Authentication authentication = AuthenticationFactoryOAuth2.clientCredentials((URL)new URL("https://dev-kt-aa9ne.us.auth0.com"), (URL)new URL("file://" + path.toString()), (String)"https://dev-kt-aa9ne.us.auth0.com/api/v2/");
        this.admin = (PulsarAdmin)Mockito.spy((Object)PulsarAdmin.builder().serviceHttpUrl(this.brokerUrl.toString()).authentication(authentication).build());
        this.replacePulsarClient(PulsarClient.builder().serviceUrl(new URI(this.pulsar.getBrokerServiceUrl()).toString()).statsInterval(0L, TimeUnit.SECONDS).authentication(authentication));
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @DataProvider(name="batch")
    public Object[][] codecProvider() {
        return new Object[][]{{0}, {1000}};
    }

    private void testSyncProducerAndConsumer() throws Exception {
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic"}).subscriptionName("my-subscriber-name").subscribe();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic");
        Producer producer = producerBuilder.create();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            msg = consumer.receive(5, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        consumer.acknowledgeCumulative(msg);
        consumer.close();
    }

    @Test
    public void testTokenProducerAndConsumer() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        this.clientSetup();
        this.admin.clusters().createCluster("test", new ClusterData(this.brokerUrl.toString()));
        this.admin.tenants().createTenant("my-property", new TenantInfo((Set)Sets.newHashSet((Object[])new String[]{"appid1", "appid2"}), (Set)Sets.newHashSet((Object[])new String[]{"test"})));
        this.admin.namespaces().createNamespace("my-property/my-ns", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.testSyncProducerAndConsumer();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testOAuth2TokenRefreshedWithoutReconnect() throws Exception {
        int i;
        log.info("-- Starting {} test --", (Object)this.methodName);
        this.clientSetup();
        this.admin.clusters().createCluster("test", new ClusterData(this.brokerUrl.toString()));
        this.admin.tenants().createTenant("my-property", new TenantInfo((Set)Sets.newHashSet((Object[])new String[]{"appid1", "appid2"}), (Set)Sets.newHashSet((Object[])new String[]{"test"})));
        this.admin.namespaces().createNamespace("my-property/my-ns", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic"}).subscriptionName("my-subscriber-name").subscribe();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic");
        Producer producer = producerBuilder.create();
        for (int i2 = 0; i2 < 10; ++i2) {
            String message = "my-message-" + i2;
            producer.send((Object)message.getBytes());
        }
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        for (int i3 = 0; i3 < 10; ++i3) {
            msg = consumer.receive(5, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i3;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        consumer.acknowledgeCumulative(msg);
        ProducerImpl producerImpl = (ProducerImpl)producer;
        String accessTokenOld = producerImpl.getClientCnx().getAuthenticationDataProvider().getCommandData();
        long lastDisconnectTime = producer.getLastDisconnectedTimestamp();
        Awaitility.await().atLeast(10L, TimeUnit.SECONDS).atMost(20L, TimeUnit.SECONDS).with().pollInterval(Duration.ofSeconds(1L)).untilAsserted(() -> {
            String accessTokenNew = producerImpl.getClientCnx().getAuthenticationDataProvider().getCommandData();
            Assert.assertNotEquals((Object)accessTokenNew, (Object)accessTokenOld);
        });
        long lastDisconnectTimeAfterTokenExpired = producer.getLastDisconnectedTimestamp();
        Assert.assertEquals((long)lastDisconnectTimeAfterTokenExpired, (long)lastDisconnectTime);
        for (i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        msg = null;
        messageSet = Sets.newHashSet();
        for (i = 0; i < 10; ++i) {
            msg = consumer.receive(5, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        consumer.acknowledgeCumulative(msg);
        consumer.close();
    }
}

