package org.apache.pulsar.client.api;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.pulsar.broker.MultiBrokerBaseTest;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups = {"broker-api"})
/* loaded from: input_file:org/apache/pulsar/client/api/LookupPropertiesTest.class */
public class LookupPropertiesTest extends MultiBrokerBaseTest {
    private static final Logger log = LoggerFactory.getLogger(LookupPropertiesTest.class);
    private static final String BROKER_KEY = "lookup.broker.id";
    private static final String CLIENT_KEY = "broker.id";

    /* loaded from: input_file:org/apache/pulsar/client/api/LookupPropertiesTest$BrokerIdAwareLoadManager.class */
    public static class BrokerIdAwareLoadManager extends ExtensibleLoadManagerImpl {
        static final List<String> clientIdList = Collections.synchronizedList(new ArrayList());

        public CompletableFuture<Optional<BrokerLookupData>> assign(Optional<ServiceUnitId> optional, ServiceUnitId serviceUnitId, LookupOptions lookupOptions) {
            Optional<String> clientId = getClientId(lookupOptions);
            List<String> list = clientIdList;
            Objects.requireNonNull(list);
            clientId.ifPresent((v1) -> {
                r1.add(v1);
            });
            return super.assign(optional, serviceUnitId, lookupOptions);
        }

        public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId serviceUnitId, Set<String> set, LookupOptions lookupOptions) {
            String str = lookupOptions.getProperties() == null ? null : (String) lookupOptions.getProperties().get(LookupPropertiesTest.CLIENT_KEY);
            return str == null ? super.selectAsync(serviceUnitId, set, lookupOptions) : getBrokerRegistry().getAvailableBrokerLookupDataAsync().thenCompose(map -> {
                return (CompletionStage) map.entrySet().stream().filter(entry -> {
                    String str2 = (String) ((BrokerLookupData) entry.getValue()).properties().get(LookupPropertiesTest.BROKER_KEY);
                    return str2 != null && str2.equals(str);
                }).findAny().map((v0) -> {
                    return v0.getKey();
                }).map((v0) -> {
                    return Optional.of(v0);
                }).map((v0) -> {
                    return CompletableFuture.completedFuture(v0);
                }).orElseGet(() -> {
                    return super.selectAsync(serviceUnitId, set, lookupOptions);
                });
            });
        }

        private static Optional<String> getClientId(LookupOptions lookupOptions) {
            return lookupOptions.getProperties() == null ? Optional.empty() : Optional.ofNullable((String) lookupOptions.getProperties().get(LookupPropertiesTest.CLIENT_KEY));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    public void startBroker() throws Exception {
        addCustomConfigs(this.conf, 0);
        super.startBroker();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.broker.MultiBrokerBaseTest
    public ServiceConfiguration createConfForAdditionalBroker(int i) {
        return addCustomConfigs(getDefaultConf(), i + 10);
    }

    private static ServiceConfiguration addCustomConfigs(ServiceConfiguration serviceConfiguration, int i) {
        serviceConfiguration.setDefaultNumberOfNamespaceBundles(16);
        serviceConfiguration.setLoadBalancerAutoBundleSplitEnabled(false);
        serviceConfiguration.setLoadManagerClassName(BrokerIdAwareLoadManager.class.getName());
        serviceConfiguration.setLoadBalancerAverageResourceUsageDifferenceThresholdPercentage(100);
        serviceConfiguration.setLoadBalancerDebugModeEnabled(true);
        serviceConfiguration.setBrokerShutdownTimeoutMs(1000L);
        Properties properties = new Properties();
        properties.setProperty(BROKER_KEY, "broker-" + i);
        serviceConfiguration.setProperties(properties);
        return serviceConfiguration;
    }

    @Test
    public void testLookupProperty() throws Exception {
        this.admin.namespaces().unload("public/default");
        this.admin.topics().createPartitionedTopic("test-lookup-property", 16);
        PulsarClientImpl build = PulsarClient.builder().serviceUrl(this.pulsar.getBrokerServiceUrl()).lookupProperties(Collections.singletonMap(CLIENT_KEY, "broker-10")).build();
        try {
            PartitionedProducerImpl create = build.newProducer().topic("test-lookup-property").create();
            try {
                Assert.assertNotNull(create);
                Set set = (Set) create.getProducers().stream().map((v0) -> {
                    return v0.getClientCnx();
                }).collect(Collectors.toSet());
                Assert.assertEquals(set.size(), 1);
                Assert.assertEquals(((InetSocketAddress) ((ClientCnx) set.stream().findAny().orElseThrow()).ctx().channel().remoteAddress()).getPort(), (Integer) this.additionalBrokers.get(0).getBrokerListenPort().orElseThrow());
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        }
    }

    @Test
    public void testConcurrentLookupProperties() throws Exception {
        PulsarClientImpl build = PulsarClient.builder().serviceUrl(this.pulsar.getBrokerServiceUrl()).build();
        try {
            ArrayList arrayList = new ArrayList();
            BrokerIdAwareLoadManager.clientIdList.clear();
            List list = IntStream.range(0, 10).mapToObj(i -> {
                return "key-" + i;
            }).toList();
            Iterator it = list.iterator();
            while (it.hasNext()) {
                build.getConfiguration().setLookupProperties(Collections.singletonMap(CLIENT_KEY, (String) it.next()));
                arrayList.add(build.getLookup().getBroker(TopicName.get("test-concurrent-lookup-properties")));
                build.getConfiguration().setLookupProperties(Collections.emptyMap());
            }
            FutureUtil.waitForAll(arrayList).get();
            Assert.assertEquals(list, BrokerIdAwareLoadManager.clientIdList);
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }
}
