package org.apache.pulsar.client.impl;

import com.google.common.collect.Sets;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.PulsarChannelInitializer;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.common.api.proto.CommandLookupTopic;
import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/client/impl/LookupRetryTest.class */
public class LookupRetryTest extends MockedPulsarServiceBaseTest {
    private static final Logger log = LoggerFactory.getLogger(LookupRetryTest.class);
    private static final String subscription = "reader-sub";
    private final AtomicInteger connectionsCreated = new AtomicInteger(0);
    private final ConcurrentHashMap<String, Queue<LookupError>> failureMap = new ConcurrentHashMap<>();

    /* loaded from: input_file:org/apache/pulsar/client/impl/LookupRetryTest$ErrorByTopicServerCnx.class */
    private static class ErrorByTopicServerCnx extends ServerCnx {
        private final ConcurrentHashMap<String, Queue<LookupError>> failureMap;

        ErrorByTopicServerCnx(PulsarService pulsarService, ConcurrentHashMap<String, Queue<LookupError>> concurrentHashMap) {
            super(pulsarService);
            this.failureMap = concurrentHashMap;
        }

        private Queue<LookupError> errorList(String str) {
            return this.failureMap.compute(str, (str2, queue) -> {
                if (queue == null) {
                    queue = new ArrayBlockingQueue(100);
                    for (String str2 : str2.split(",")) {
                        String[] split = str2.split(":");
                        LookupError lookupError = (LookupError) Enum.valueOf(LookupError.class, split[0]);
                        for (int i = 0; i < Integer.parseInt(split[1]); i++) {
                            queue.add(lookupError);
                        }
                    }
                }
                return queue;
            });
        }

        protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata commandPartitionedTopicMetadata) {
            LookupError poll = errorList(TopicName.get(commandPartitionedTopicMetadata.getTopic()).getLocalName()).poll();
            if (poll == LookupError.TOO_MANY) {
                this.ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.TooManyRequests, "too many", commandPartitionedTopicMetadata.getRequestId()));
            } else {
                if (poll == LookupError.TIMEOUT) {
                    return;
                }
                if (poll == null || poll == LookupError.OK) {
                    super.handlePartitionMetadataRequest(commandPartitionedTopicMetadata);
                }
            }
        }

        protected void handleLookup(CommandLookupTopic commandLookupTopic) {
            LookupError poll = errorList(TopicName.get(commandLookupTopic.getTopic()).getLocalName()).poll();
            if (poll == LookupError.TOO_MANY) {
                this.ctx.writeAndFlush(Commands.newLookupErrorResponse(ServerError.TooManyRequests, "too many", commandLookupTopic.getRequestId()));
            } else {
                if (poll == LookupError.TIMEOUT) {
                    return;
                }
                if (poll == null || poll == LookupError.OK) {
                    super.handleLookup(commandLookupTopic);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pulsar/client/impl/LookupRetryTest$LookupError.class */
    public enum LookupError {
        UNKNOWN,
        TOO_MANY,
        TIMEOUT,
        OK
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetup();
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build());
        this.admin.tenants().createTenant("public", new TenantInfoImpl(Sets.newHashSet(new String[]{"appid1", "appid2"}), Sets.newHashSet(new String[]{"test"})));
        this.admin.namespaces().createNamespace("public/default", Sets.newHashSet(new String[]{"test"}));
        this.connectionsCreated.set(0);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    public PulsarService newPulsarService(ServiceConfiguration serviceConfiguration) throws Exception {
        return new PulsarService(serviceConfiguration) { // from class: org.apache.pulsar.client.impl.LookupRetryTest.1
            protected BrokerService newBrokerService(PulsarService pulsarService) throws Exception {
                BrokerService brokerService = new BrokerService(this, this.ioEventLoopGroup);
                brokerService.setPulsarChannelInitializerFactory((pulsarService2, pulsarChannelOptions) -> {
                    return new PulsarChannelInitializer(pulsarService2, pulsarChannelOptions) { // from class: org.apache.pulsar.client.impl.LookupRetryTest.1.1
                        protected ServerCnx newServerCnx(PulsarService pulsarService2, String str) throws Exception {
                            LookupRetryTest.this.connectionsCreated.incrementAndGet();
                            return new ErrorByTopicServerCnx(pulsarService2, LookupRetryTest.this.failureMap);
                        }
                    };
                });
                return brokerService;
            }
        };
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    PulsarClient newClient() throws Exception {
        return PulsarClient.builder().serviceUrl(this.pulsar.getBrokerServiceUrl()).connectionTimeout(2, TimeUnit.SECONDS).operationTimeout(1, TimeUnit.SECONDS).lookupTimeout(10, TimeUnit.SECONDS).build();
    }

    @Test
    public void testGetPartitionedMetadataRetries() throws Exception {
        PulsarClient newClient = newClient();
        try {
            newClient.getPartitionsForTopic("TIMEOUT:2,OK:10").get();
            if (newClient != null) {
                newClient.close();
            }
            newClient = newClient();
            try {
                newClient.getPartitionsForTopic("TOO_MANY:2,OK:10").get();
                if (newClient != null) {
                    newClient.close();
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testTimeoutRetriesOnPartitionMetadata() throws Exception {
        PulsarClient newClient = newClient();
        try {
            Reader create = newClient.newReader().topic("TIMEOUT:2,OK:3").startMessageId(MessageId.latest).startMessageIdInclusive().readerName(subscription).create();
            if (create != null) {
                create.close();
            }
            if (newClient != null) {
                newClient.close();
            }
        } catch (Throwable th) {
            if (newClient != null) {
                try {
                    newClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testTooManyRetriesOnPartitionMetadata() throws Exception {
        PulsarClient newClient = newClient();
        try {
            Reader create = newClient.newReader().topic("TOO_MANY:2,OK:3").startMessageId(MessageId.latest).startMessageIdInclusive().readerName(subscription).create();
            if (create != null) {
                create.close();
            }
            if (newClient != null) {
                newClient.close();
            }
        } catch (Throwable th) {
            if (newClient != null) {
                try {
                    newClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testTooManyOnLookup() throws Exception {
        PulsarClient newClient = newClient();
        try {
            Reader create = newClient.newReader().topic("OK:1,TOO_MANY:2,OK:3").startMessageId(MessageId.latest).startMessageIdInclusive().readerName(subscription).create();
            if (create != null) {
                create.close();
            }
            if (newClient != null) {
                newClient.close();
            }
        } catch (Throwable th) {
            if (newClient != null) {
                try {
                    newClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testTimeoutOnLookup() throws Exception {
        PulsarClient newClient = newClient();
        try {
            Reader create = newClient.newReader().topic("OK:1,TIMEOUT:2,OK:3").startMessageId(MessageId.latest).startMessageIdInclusive().readerName(subscription).create();
            if (create != null) {
                create.close();
            }
            if (newClient != null) {
                newClient.close();
            }
        } catch (Throwable th) {
            if (newClient != null) {
                try {
                    newClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testManyFailures() throws Exception {
        PulsarClient newClient = newClient();
        try {
            Reader create = newClient.newReader().topic("TOO_MANY:1,TIMEOUT:1,OK:1,TIMEOUT:1,TOO_MANY:1,OK:3").startMessageId(MessageId.latest).startMessageIdInclusive().readerName(subscription).create();
            if (create != null) {
                create.close();
            }
            if (newClient != null) {
                newClient.close();
            }
        } catch (Throwable th) {
            if (newClient != null) {
                try {
                    newClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testProducerTimeoutOnPMR() throws Exception {
        PulsarClient newClient = newClient();
        try {
            Producer create = newClient.newProducer().topic("TIMEOUT:2,OK:3").create();
            if (create != null) {
                create.close();
            }
            if (newClient != null) {
                newClient.close();
            }
        } catch (Throwable th) {
            if (newClient != null) {
                try {
                    newClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testProducerTooManyOnPMR() throws Exception {
        PulsarClient newClient = newClient();
        try {
            Producer create = newClient.newProducer().topic("TOO_MANY:2,OK:3").create();
            if (create != null) {
                create.close();
            }
            if (newClient != null) {
                newClient.close();
            }
        } catch (Throwable th) {
            if (newClient != null) {
                try {
                    newClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testProducerTimeoutOnLookup() throws Exception {
        PulsarClient newClient = newClient();
        try {
            Producer create = newClient.newProducer().topic("OK:1,TIMEOUT:2,OK:3").create();
            if (create != null) {
                create.close();
            }
            if (newClient != null) {
                newClient.close();
            }
        } catch (Throwable th) {
            if (newClient != null) {
                try {
                    newClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testProducerTooManyOnLookup() throws Exception {
        PulsarClient newClient = newClient();
        try {
            Producer create = newClient.newProducer().topic("OK:1,TOO_MANY:2,OK:3").create();
            if (create != null) {
                create.close();
            }
            if (newClient != null) {
                newClient.close();
            }
        } catch (Throwable th) {
            if (newClient != null) {
                try {
                    newClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testCloseConnectionOnBrokerRejectedRequest() throws Exception {
        String brokerServiceUrl = this.pulsar.getBrokerServiceUrl();
        PulsarClient build = PulsarClient.builder().serviceUrl(brokerServiceUrl).maxNumberOfRejectedRequestPerConnection(1).build();
        try {
            build.newProducer().topic("TOO_MANY:2").create().close();
            Assert.assertEquals(this.connectionsCreated.get(), 2);
            if (build != null) {
                build.close();
            }
            build = PulsarClient.builder().serviceUrl(brokerServiceUrl).maxNumberOfRejectedRequestPerConnection(100).build();
            try {
                build.newProducer().topic("TOO_MANY:2").create().close();
                build.newProducer().topic("TOO_MANY:4").create().close();
                Assert.assertEquals(this.connectionsCreated.get(), 3);
                if (build != null) {
                    build.close();
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testCloseConnectionOnBrokerTimeout() throws Exception {
        String brokerServiceUrl = this.pulsar.getBrokerServiceUrl();
        PulsarClient build = PulsarClient.builder().serviceUrl(brokerServiceUrl).maxNumberOfRejectedRequestPerConnection(1).connectionTimeout(2, TimeUnit.SECONDS).operationTimeout(1, TimeUnit.SECONDS).lookupTimeout(10, TimeUnit.SECONDS).build();
        try {
            build.newProducer().topic("TIMEOUT:2").create().close();
            Assert.assertEquals(this.connectionsCreated.get(), 2);
            if (build != null) {
                build.close();
            }
            build = PulsarClient.builder().serviceUrl(brokerServiceUrl).maxNumberOfRejectedRequestPerConnection(100).maxNumberOfRejectedRequestPerConnection(1).connectionTimeout(2, TimeUnit.SECONDS).operationTimeout(1, TimeUnit.SECONDS).lookupTimeout(10, TimeUnit.SECONDS).build();
            try {
                build.newProducer().topic("TIMEOUT:2").create().close();
                build.newProducer().topic("TIMEOUT:2").create().close();
                Assert.assertEquals(this.connectionsCreated.get(), 3);
                if (build != null) {
                    build.close();
                }
            } finally {
            }
        } finally {
        }
    }
}
