/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.IntStream;
import javax.naming.AuthenticationException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authorization.AuthorizationProvider;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.RestException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker-impl"})
public class PatternTopicsConsumerImplAuthTest
extends ProducerConsumerBase {
    private static final long testTimeout = 90000L;
    private static final Logger log = LoggerFactory.getLogger(PatternTopicsConsumerImplAuthTest.class);
    private static final String clientRole = "pluggableRole";
    private static final String superUserRole = "superUser";
    private static final Set<String> clientAuthProviderSupportedRoles = Sets.newHashSet((Object[])new String[]{"pluggableRole"});

    @Override
    @BeforeMethod
    public void setup() throws Exception {
        this.isTcpLookup = true;
        this.conf.setAuthenticationEnabled(true);
        this.conf.setAuthorizationEnabled(true);
        HashSet<String> superUserRoles = new HashSet<String>();
        superUserRoles.add(superUserRole);
        this.conf.setSuperUserRoles(superUserRoles);
        HashSet<String> providers = new HashSet<String>();
        providers.add(TestAuthenticationProvider.class.getName());
        this.conf.setAuthenticationProviders(providers);
        this.conf.setAuthorizationProvider(TestAuthorizationProvider.class.getName());
        this.conf.setClusterName("test");
        super.internalSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    private PulsarAdmin buildAdminClient() throws Exception {
        ClientAuthentication adminAuthentication = new ClientAuthentication(superUserRole);
        return PulsarAdmin.builder().serviceHttpUrl(this.brokerUrl.toString()).authentication((Authentication)adminAuthentication).build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=90000L)
    public void testBinaryProtoToGetTopicsOfNamespace() throws Exception {
        String key = "BinaryProtoToGetTopics";
        String subscriptionName = "my-ex-subscription-" + key;
        String topicName1 = "persistent://my-property/my-ns/pattern-topic-1-" + key;
        String topicName2 = "persistent://my-property/my-ns/pattern-topic-2-" + key;
        String topicName3 = "persistent://my-property/my-ns/pattern-topic-3-" + key;
        String topicName4 = "non-persistent://my-property/my-ns/pattern-topic-4-" + key;
        Pattern pattern = Pattern.compile("my-property/my-ns/pattern-topic.*");
        PulsarAdmin admin = this.buildAdminClient();
        try {
            String lookupUrl = this.pulsar.getBrokerServiceUrl();
            ClientAuthentication authentication = new ClientAuthentication(clientRole);
            ClientAuthentication authenticationInvalidRole = new ClientAuthentication("test-role");
            PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl).authentication((Authentication)authentication).operationTimeout(1000, TimeUnit.MILLISECONDS).build();
            try {
                PulsarClient pulsarClientInvalidRole = PulsarClient.builder().serviceUrl(lookupUrl).operationTimeout(1000, TimeUnit.MILLISECONDS).authentication((Authentication)authenticationInvalidRole).build();
                try {
                    Consumer consumer;
                    admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(this.brokerUrl.toString()).build());
                    admin.tenants().createTenant("my-property", (TenantInfo)new TenantInfoImpl((Set)Sets.newHashSet((Object[])new String[]{"appid1", "appid2"}), (Set)Sets.newHashSet((Object[])new String[]{"test"})));
                    admin.namespaces().createNamespace("my-property/my-ns", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
                    admin.namespaces().grantPermissionOnNamespace("my-property/my-ns", clientRole, EnumSet.allOf(AuthAction.class));
                    admin.topics().createPartitionedTopic(topicName2, 2);
                    admin.topics().createPartitionedTopic(topicName3, 3);
                    String messagePredicate = "my-message-" + key + "-";
                    int totalMessages = 30;
                    Producer producer1 = pulsarClient.newProducer().topic(topicName1).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
                    Producer producer2 = pulsarClient.newProducer().topic(topicName2).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
                    Producer producer3 = pulsarClient.newProducer().topic(topicName3).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
                    Producer producer4 = pulsarClient.newProducer().topic(topicName4).enableBatching(false).create();
                    try {
                        consumer = pulsarClientInvalidRole.newConsumer().topicsPattern(pattern).patternAutoDiscoveryPeriod(2).subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).receiverQueueSize(4).subscribe();
                        Assert.fail((String)"should have failed with authorization error");
                    }
                    catch (PulsarClientException.AuthorizationException authorizationException) {
                        // empty catch block
                    }
                    consumer = pulsarClient.newConsumer().topicsPattern(pattern).patternAutoDiscoveryPeriod(2).subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).receiverQueueSize(4).subscribe();
                    Assert.assertTrue((boolean)consumer.getTopic().startsWith("MultiTopicsConsumer-"));
                    Assert.assertSame((Object)pattern, (Object)((PatternMultiTopicsConsumerImpl)consumer).getPattern());
                    List topics = ((PatternMultiTopicsConsumerImpl)consumer).getPartitions();
                    List consumers = ((PatternMultiTopicsConsumerImpl)consumer).getConsumers();
                    Assert.assertEquals((int)topics.size(), (int)6);
                    Assert.assertEquals((int)consumers.size(), (int)6);
                    Assert.assertEquals((int)((PatternMultiTopicsConsumerImpl)consumer).getPartitionedTopics().size(), (int)2);
                    topics.forEach(topic -> log.debug("topic: {}", topic));
                    consumers.forEach(c -> log.debug("consumer: {}", (Object)c.getTopic()));
                    IntStream.range(0, topics.size()).forEach(index -> Assert.assertEquals((String)((ConsumerImpl)consumers.get(index)).getTopic(), (String)((String)topics.get(index))));
                    ((PatternMultiTopicsConsumerImpl)consumer).getPartitionedTopics().forEach(topic -> log.debug("getTopics topic: {}", topic));
                    for (int i = 0; i < totalMessages / 3; ++i) {
                        producer1.send((Object)(messagePredicate + "producer1-" + i).getBytes());
                        producer2.send((Object)(messagePredicate + "producer2-" + i).getBytes());
                        producer3.send((Object)(messagePredicate + "producer3-" + i).getBytes());
                        producer4.send((Object)(messagePredicate + "producer4-" + i).getBytes());
                    }
                    int messageSet = 0;
                    Message message = consumer.receive();
                    do {
                        Assert.assertTrue((boolean)(message instanceof TopicMessageImpl));
                        ++messageSet;
                        consumer.acknowledge(message);
                        log.debug("Consumer acknowledged : " + new String(message.getData()));
                    } while ((message = consumer.receive(500, TimeUnit.MILLISECONDS)) != null);
                    Assert.assertEquals((int)messageSet, (int)totalMessages);
                    consumer.unsubscribe();
                    consumer.close();
                    producer1.close();
                    producer2.close();
                    producer3.close();
                    producer4.close();
                }
                finally {
                    if (Collections.singletonList(pulsarClientInvalidRole).get(0) != null) {
                        pulsarClientInvalidRole.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(pulsarClient).get(0) != null) {
                    pulsarClient.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(admin).get(0) != null) {
                admin.close();
            }
        }
    }

    public static class TestAuthenticationProvider
    implements AuthenticationProvider {
        public void close() throws IOException {
        }

        public void initialize(ServiceConfiguration config) throws IOException {
        }

        public String getAuthMethodName() {
            return "test";
        }

        public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
            return authData.getCommandData() != null ? authData.getCommandData() : authData.getHttpHeader("user");
        }
    }

    public static class ClientAuthentication
    implements Authentication {
        String user;

        public ClientAuthentication(String user) {
            this.user = user;
        }

        public void close() throws IOException {
        }

        public String getAuthMethodName() {
            return "test";
        }

        public AuthenticationDataProvider getAuthData() throws PulsarClientException {
            AuthenticationDataProvider provider = new AuthenticationDataProvider(){

                public boolean hasDataForHttp() {
                    return true;
                }

                public Set<Map.Entry<String, String>> getHttpHeaders() {
                    return Sets.newHashSet((Object[])new Map.Entry[]{Maps.immutableEntry((Object)"user", (Object)user)});
                }

                public boolean hasDataFromCommand() {
                    return true;
                }

                public String getCommandData() {
                    return user;
                }
            };
            return provider;
        }

        public void configure(Map<String, String> authParams) {
        }

        public void start() throws PulsarClientException {
        }
    }

    public static class TestAuthorizationProvider
    implements AuthorizationProvider {
        public ServiceConfiguration conf;

        public void close() throws IOException {
        }

        public void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException {
            this.conf = conf;
        }

        public CompletableFuture<Boolean> canProduceAsync(TopicName topicName, String role, AuthenticationDataSource authenticationData) {
            return CompletableFuture.completedFuture(clientAuthProviderSupportedRoles.contains(role));
        }

        public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String role, AuthenticationDataSource authenticationData, String subscription) {
            return CompletableFuture.completedFuture(clientAuthProviderSupportedRoles.contains(role));
        }

        public CompletableFuture<Boolean> canLookupAsync(TopicName topicName, String role, AuthenticationDataSource authenticationData) {
            return CompletableFuture.completedFuture(clientAuthProviderSupportedRoles.contains(role));
        }

        public CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
            return null;
        }

        public CompletableFuture<Boolean> allowSourceOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
            return null;
        }

        public CompletableFuture<Boolean> allowSinkOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
            return null;
        }

        public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions, String role, String authenticationData) {
            return CompletableFuture.completedFuture(null);
        }

        public CompletableFuture<Void> grantPermissionAsync(TopicName topicname, Set<AuthAction> actions, String role, String authenticationData) {
            return CompletableFuture.completedFuture(null);
        }

        public CompletableFuture<Void> grantSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName, Set<String> roles, String authDataJson) {
            return CompletableFuture.completedFuture(null);
        }

        public CompletableFuture<Void> revokeSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName, String role, String authDataJson) {
            return CompletableFuture.completedFuture(null);
        }

        public CompletableFuture<Boolean> isTenantAdmin(String tenant, String role, TenantInfo tenantInfo, AuthenticationDataSource authenticationData) {
            return CompletableFuture.completedFuture(true);
        }

        public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName, String role, TenantOperation operation, AuthenticationDataSource authData) {
            return CompletableFuture.completedFuture(true);
        }

        public Boolean allowTenantOperation(String tenantName, String role, TenantOperation operation, AuthenticationDataSource authData) {
            return true;
        }

        public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName, String role, NamespaceOperation operation, AuthenticationDataSource authData) {
            CompletableFuture<Boolean> isAuthorizedFuture = role.equals(PatternTopicsConsumerImplAuthTest.superUserRole) || role.equals(PatternTopicsConsumerImplAuthTest.clientRole) ? CompletableFuture.completedFuture(true) : CompletableFuture.completedFuture(false);
            return isAuthorizedFuture;
        }

        public Boolean allowNamespaceOperation(NamespaceName namespaceName, String role, NamespaceOperation operation, AuthenticationDataSource authData) {
            try {
                return this.allowNamespaceOperationAsync(namespaceName, role, operation, authData).get();
            }
            catch (InterruptedException | ExecutionException e) {
                throw new RestException((Throwable)e);
            }
        }

        public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topic, String role, TopicOperation operation, AuthenticationDataSource authData) {
            CompletableFuture<Boolean> isAuthorizedFuture = role.equals(PatternTopicsConsumerImplAuthTest.superUserRole) || role.equals(PatternTopicsConsumerImplAuthTest.clientRole) ? CompletableFuture.completedFuture(true) : CompletableFuture.completedFuture(false);
            return isAuthorizedFuture;
        }

        public Boolean allowTopicOperation(TopicName topicName, String role, TopicOperation operation, AuthenticationDataSource authData) {
            try {
                return this.allowTopicOperationAsync(topicName, role, operation, authData).get();
            }
            catch (InterruptedException | ExecutionException e) {
                throw new RestException((Throwable)e);
            }
        }

        public CompletableFuture<Boolean> allowTopicPolicyOperationAsync(TopicName topic, String role, PolicyName policy, PolicyOperation operation, AuthenticationDataSource authData) {
            CompletableFuture<Boolean> isAuthorizedFuture = role.equals(PatternTopicsConsumerImplAuthTest.superUserRole) || role.equals(PatternTopicsConsumerImplAuthTest.clientRole) ? CompletableFuture.completedFuture(true) : CompletableFuture.completedFuture(false);
            return isAuthorizedFuture;
        }

        public Boolean allowTopicPolicyOperation(TopicName topicName, String role, PolicyName policy, PolicyOperation operation, AuthenticationDataSource authData) {
            try {
                return this.allowTopicPolicyOperationAsync(topicName, role, policy, operation, authData).get();
            }
            catch (InterruptedException | ExecutionException e) {
                throw new RestException((Throwable)e);
            }
        }
    }
}

