package org.apache.pulsar.broker.namespace;

import com.google.common.collect.Range;
import com.google.common.hash.Hashing;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.coordination.CoordinationService;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl;
import org.apache.pulsar.zookeeper.ZookeeperServerTest;
import org.awaitility.Awaitility;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/namespace/OwnershipCacheTest.class */
public class OwnershipCacheTest {
    private PulsarService pulsar;
    private ServiceConfiguration config;
    private String selfBrokerUrl;
    private NamespaceBundleFactory bundleFactory;
    private NamespaceService nsService;
    private BrokerService brokerService;
    private OrderedScheduler executor;
    private MetadataStoreExtended store;
    private MetadataStoreExtended otherStore;
    private CoordinationService coordinationService;
    private ZookeeperServerTest zookeeperServer;

    @BeforeMethod
    public void setup() throws Exception {
        this.selfBrokerUrl = "tcp://localhost:8080";
        this.pulsar = (PulsarService) Mockito.mock(PulsarService.class);
        this.config = new ServiceConfiguration();
        this.executor = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("test").build();
        this.zookeeperServer = new ZookeeperServerTest(0);
        this.zookeeperServer.start();
        this.store = MetadataStoreExtended.create(this.zookeeperServer.getHostPort(), MetadataStoreConfig.builder().sessionTimeoutMillis(5000).build());
        this.coordinationService = new CoordinationServiceImpl(this.store);
        this.otherStore = MetadataStoreExtended.create(this.zookeeperServer.getHostPort(), MetadataStoreConfig.builder().sessionTimeoutMillis(5000).build());
        Mockito.when(this.pulsar.getConfigurationMetadataStore()).thenReturn(this.store);
        Mockito.when(this.pulsar.getLocalMetadataStore()).thenReturn(this.store);
        Mockito.when(this.pulsar.getConfigurationMetadataStore()).thenReturn(this.store);
        Mockito.when(this.pulsar.getCoordinationService()).thenReturn(this.coordinationService);
        this.bundleFactory = new NamespaceBundleFactory(this.pulsar, Hashing.crc32());
        this.nsService = (NamespaceService) Mockito.mock(NamespaceService.class);
        this.brokerService = (BrokerService) Mockito.mock(BrokerService.class);
        ((BrokerService) Mockito.doReturn(CompletableFuture.completedFuture(1)).when(this.brokerService)).unloadServiceUnit((NamespaceBundle) ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyLong(), (TimeUnit) ArgumentMatchers.any());
        ((PulsarService) Mockito.doReturn(this.config).when(this.pulsar)).getConfiguration();
        ((PulsarService) Mockito.doReturn(this.nsService).when(this.pulsar)).getNamespaceService();
        this.config.setBrokerServicePort(Optional.of(8080));
        this.config.setWebServicePort(Optional.empty());
        ((PulsarService) Mockito.doReturn(this.brokerService).when(this.pulsar)).getBrokerService();
        ((PulsarService) Mockito.doReturn(this.selfBrokerUrl).when(this.pulsar)).getBrokerServiceUrl();
    }

    @AfterMethod(alwaysRun = true)
    public void teardown() throws Exception {
        this.executor.shutdownNow();
        this.coordinationService.close();
        this.store.close();
        this.otherStore.close();
        this.zookeeperServer.close();
    }

    @Test
    public void testConstructor() {
        OwnershipCache ownershipCache = new OwnershipCache(this.pulsar, this.nsService);
        Assert.assertNotNull(ownershipCache);
        Assert.assertNotNull(ownershipCache.getOwnedBundles());
    }

    @Test
    public void testDisableOwnership() throws Exception {
        OwnershipCache ownershipCache = new OwnershipCache(this.pulsar, this.nsService);
        NamespaceBundle namespaceBundle = new NamespaceBundle(NamespaceName.get("pulsar/test/ns-1"), Range.closedOpen(0L, 2147483647L), this.bundleFactory);
        Assert.assertFalse(((Optional) ownershipCache.getOwnerAsync(namespaceBundle).get()).isPresent());
        Assert.assertFalse(((NamespaceEphemeralData) ownershipCache.tryAcquiringOwnership(namespaceBundle).get()).isDisabled());
        ownershipCache.disableOwnership(namespaceBundle).get();
        Assert.assertTrue(((NamespaceEphemeralData) ((Optional) ownershipCache.getOwnerAsync(namespaceBundle).get()).get()).isDisabled());
    }

    @Test
    public void testGetOrSetOwner() throws Exception {
        OwnershipCache ownershipCache = new OwnershipCache(this.pulsar, this.nsService);
        NamespaceBundle namespaceBundle = new NamespaceBundle(NamespaceName.get("pulsar/test/ns-2"), Range.closedOpen(0L, 2147483647L), this.bundleFactory);
        Assert.assertFalse(((Optional) ownershipCache.getOwnerAsync(namespaceBundle).get()).isPresent());
        NamespaceEphemeralData namespaceEphemeralData = (NamespaceEphemeralData) ownershipCache.tryAcquiringOwnership(namespaceBundle).get();
        Assert.assertEquals(namespaceEphemeralData.getNativeUrl(), this.selfBrokerUrl);
        Assert.assertFalse(namespaceEphemeralData.isDisabled());
        OwnedBundle ownedBundle = ownershipCache.getOwnedBundle(namespaceBundle);
        ((NamespaceService) Mockito.doReturn(ownershipCache).when(this.nsService)).getOwnershipCache();
        ownedBundle.handleUnloadRequest(this.pulsar, 5L, TimeUnit.SECONDS).join();
        MetadataStoreExtended create = MetadataStoreExtended.create(this.zookeeperServer.getHostPort(), MetadataStoreConfig.builder().sessionTimeoutMillis(5000).build());
        try {
            create.put(ServiceUnitUtils.path(namespaceBundle), ObjectMapperFactory.getMapper().writer().writeValueAsBytes(new NamespaceEphemeralData("pulsar://otherhost:8881", "pulsar://otherhost:8884", "http://localhost:8080", "https://localhost:4443", false)), Optional.of(-1L), EnumSet.of(CreateOption.Ephemeral)).join();
            try {
                ownershipCache.tryAcquiringOwnership(namespaceBundle).get();
                Assert.fail("Should fail to acquire");
            } catch (ExecutionException e) {
                Assert.assertEquals(e.getCause().getClass(), MetadataStoreException.LockBusyException.class);
            }
            NamespaceEphemeralData namespaceEphemeralData2 = (NamespaceEphemeralData) ((Optional) ownershipCache.getOwnerAsync(namespaceBundle).join()).get();
            Assert.assertEquals(namespaceEphemeralData2.getNativeUrl(), "pulsar://otherhost:8881");
            Assert.assertEquals(namespaceEphemeralData2.getNativeUrlTls(), "pulsar://otherhost:8884");
            Assert.assertFalse(namespaceEphemeralData2.isDisabled());
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test
    public void testGetOwner() throws Exception {
        OwnershipCache ownershipCache = new OwnershipCache(this.pulsar, this.nsService);
        NamespaceBundle namespaceBundle = new NamespaceBundle(NamespaceName.get("pulsar/test/ns-3"), Range.closedOpen(0L, 2147483647L), this.bundleFactory);
        Assert.assertFalse(((Optional) ownershipCache.getOwnerAsync(namespaceBundle).get()).isPresent());
        MetadataStoreExtended create = MetadataStoreExtended.create(this.zookeeperServer.getHostPort(), MetadataStoreConfig.builder().sessionTimeoutMillis(5000).build());
        try {
            create.put(ServiceUnitUtils.path(namespaceBundle), ObjectMapperFactory.getMapper().writer().writeValueAsBytes(new NamespaceEphemeralData("pulsar://otherhost:8881", "pulsar://otherhost:8884", "http://localhost:8080", "https://localhost:4443", false)), Optional.of(-1L), EnumSet.of(CreateOption.Ephemeral)).join();
            try {
                ownershipCache.tryAcquiringOwnership(namespaceBundle).get();
                Assert.fail("Should fail to acquire");
            } catch (ExecutionException e) {
                Assert.assertEquals(e.getCause().getClass(), MetadataStoreException.LockBusyException.class);
            }
            NamespaceEphemeralData namespaceEphemeralData = (NamespaceEphemeralData) ((Optional) ownershipCache.getOwnerAsync(namespaceBundle).join()).get();
            Assert.assertEquals(namespaceEphemeralData.getNativeUrl(), "pulsar://otherhost:8881");
            Assert.assertEquals(namespaceEphemeralData.getNativeUrlTls(), "pulsar://otherhost:8884");
            Assert.assertFalse(namespaceEphemeralData.isDisabled());
            Assert.assertEquals(namespaceEphemeralData, (NamespaceEphemeralData) ((Optional) ownershipCache.getOwnerAsync(namespaceBundle).get()).get());
            Assert.assertFalse(((Optional) ownershipCache.getOwnerAsync(new NamespaceBundle(NamespaceName.get("pulsar/test/ns-none"), Range.closedOpen(0L, 2147483647L), this.bundleFactory)).get()).isPresent());
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test
    public void testGetOwnedServiceUnit() throws Exception {
        OwnershipCache ownershipCache = new OwnershipCache(this.pulsar, this.nsService);
        NamespaceBundle namespaceBundle = new NamespaceBundle(NamespaceName.get("pulsar/test/ns-5"), Range.closedOpen(0L, 2147483647L), this.bundleFactory);
        Assert.assertFalse(((Optional) ownershipCache.getOwnerAsync(namespaceBundle).get()).isPresent());
        try {
            Objects.requireNonNull(ownershipCache.getOwnedBundle(namespaceBundle));
            Assert.fail("Should have failed");
        } catch (NullPointerException e) {
        }
        this.otherStore.put(ServiceUnitUtils.path(namespaceBundle), ObjectMapperFactory.getMapper().writer().writeValueAsBytes(new NamespaceEphemeralData("pulsar://otherhost:8881", "pulsar://otherhost:8884", "http://localhost:8080", "https://localhost:4443", false)), Optional.of(-1L), EnumSet.of(CreateOption.Ephemeral)).join();
        try {
            Objects.requireNonNull(ownershipCache.getOwnedBundle(namespaceBundle));
            Assert.fail("Should have failed");
        } catch (NullPointerException e2) {
        }
        try {
            ownershipCache.tryAcquiringOwnership(namespaceBundle).get();
            Assert.fail("Should fail to acquire");
        } catch (ExecutionException e3) {
            Assert.assertEquals(e3.getCause().getClass(), MetadataStoreException.LockBusyException.class);
        }
        NamespaceEphemeralData namespaceEphemeralData = (NamespaceEphemeralData) ((Optional) ownershipCache.getOwnerAsync(namespaceBundle).join()).get();
        Assert.assertEquals(namespaceEphemeralData.getNativeUrl(), "pulsar://otherhost:8881");
        Assert.assertEquals(namespaceEphemeralData.getNativeUrlTls(), "pulsar://otherhost:8884");
        Assert.assertFalse(namespaceEphemeralData.isDisabled());
        try {
            Objects.requireNonNull(ownershipCache.getOwnedBundle(namespaceBundle));
            Assert.fail("Should have failed");
        } catch (NullPointerException e4) {
        }
        this.otherStore.delete(ServiceUnitUtils.path(namespaceBundle), Optional.empty()).join();
        NamespaceEphemeralData namespaceEphemeralData2 = (NamespaceEphemeralData) ownershipCache.tryAcquiringOwnership(namespaceBundle).get();
        Assert.assertEquals(namespaceEphemeralData2.getNativeUrl(), this.selfBrokerUrl);
        Assert.assertFalse(namespaceEphemeralData2.isDisabled());
        Assert.assertNotNull(ownershipCache.getOwnedBundle(namespaceBundle));
    }

    @Test
    public void testGetOwnedServiceUnits() throws Exception {
        OwnershipCache ownershipCache = new OwnershipCache(this.pulsar, this.nsService);
        NamespaceBundle namespaceBundle = new NamespaceBundle(NamespaceName.get("pulsar/test/ns-6"), Range.closedOpen(0L, 2147483647L), this.bundleFactory);
        Assert.assertFalse(((Optional) ownershipCache.getOwnerAsync(namespaceBundle).get()).isPresent());
        Assert.assertTrue(ownershipCache.getOwnedBundles().isEmpty());
        this.otherStore.put(ServiceUnitUtils.path(namespaceBundle), ObjectMapperFactory.getMapper().writer().writeValueAsBytes(new NamespaceEphemeralData("pulsar://otherhost:8881", "pulsar://otherhost:8884", "http://otherhost:8080", "https://otherhost:4443", false)), Optional.of(-1L), EnumSet.of(CreateOption.Ephemeral)).join();
        Assert.assertTrue(ownershipCache.getOwnedBundles().isEmpty());
        Thread.sleep(500L);
        try {
            ownershipCache.tryAcquiringOwnership(namespaceBundle).get();
            Assert.fail("Should fail to acquire");
        } catch (ExecutionException e) {
            Assert.assertEquals(e.getCause().getClass(), MetadataStoreException.LockBusyException.class);
        }
        NamespaceEphemeralData namespaceEphemeralData = (NamespaceEphemeralData) ((Optional) ownershipCache.getOwnerAsync(namespaceBundle).join()).get();
        Assert.assertEquals(namespaceEphemeralData.getNativeUrl(), "pulsar://otherhost:8881");
        Assert.assertEquals(namespaceEphemeralData.getNativeUrlTls(), "pulsar://otherhost:8884");
        Assert.assertFalse(namespaceEphemeralData.isDisabled());
        Assert.assertTrue(ownershipCache.getOwnedBundles().isEmpty());
        this.otherStore.delete(ServiceUnitUtils.path(namespaceBundle), Optional.empty()).join();
        NamespaceEphemeralData namespaceEphemeralData2 = (NamespaceEphemeralData) ownershipCache.tryAcquiringOwnership(namespaceBundle).get();
        Assert.assertEquals(namespaceEphemeralData2.getNativeUrl(), this.selfBrokerUrl);
        Assert.assertFalse(namespaceEphemeralData2.isDisabled());
        Assert.assertEquals(ownershipCache.getOwnedBundles().size(), 1);
    }

    @Test
    public void testRemoveOwnership() throws Exception {
        OwnershipCache ownershipCache = new OwnershipCache(this.pulsar, this.nsService);
        NamespaceBundle namespaceBundle = new NamespaceBundle(NamespaceName.get("pulsar/test/ns-7"), Range.closedOpen(0L, 2147483647L), this.bundleFactory);
        Assert.assertFalse(((Optional) ownershipCache.getOwnerAsync(namespaceBundle).get()).isPresent());
        ownershipCache.removeOwnership(namespaceBundle).get();
        Assert.assertTrue(ownershipCache.getOwnedBundles().isEmpty());
        NamespaceEphemeralData namespaceEphemeralData = (NamespaceEphemeralData) ownershipCache.tryAcquiringOwnership(namespaceBundle).get();
        Assert.assertEquals(namespaceEphemeralData.getNativeUrl(), this.selfBrokerUrl);
        Assert.assertFalse(namespaceEphemeralData.isDisabled());
        Assert.assertEquals(ownershipCache.getOwnedBundles().size(), 1);
        ownershipCache.removeOwnership(namespaceBundle);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertTrue(ownershipCache.getOwnedBundles().isEmpty());
            Assert.assertFalse(((Boolean) this.store.exists(ServiceUnitUtils.path(namespaceBundle)).join()).booleanValue());
            Assert.assertNull(ownershipCache.getLocallyAcquiredLocks().get(namespaceBundle));
        });
    }

    @Test
    public void testReestablishOwnership() throws Exception {
        OwnershipCache ownershipCache = new OwnershipCache(this.pulsar, this.nsService);
        NamespaceBundle namespaceBundle = new NamespaceBundle(NamespaceName.get("pulsar/test/ns-8"), Range.closedOpen(0L, 2147483647L), this.bundleFactory);
        ServiceUnitUtils.path(namespaceBundle);
        Assert.assertFalse(((Optional) ownershipCache.getOwnerAsync(namespaceBundle).get()).isPresent());
        Assert.assertNull(ownershipCache.getOwnedBundle(namespaceBundle));
        NamespaceEphemeralData namespaceEphemeralData = (NamespaceEphemeralData) ownershipCache.tryAcquiringOwnership(namespaceBundle).get();
        Assert.assertEquals(namespaceEphemeralData.getNativeUrl(), this.selfBrokerUrl);
        Assert.assertFalse(namespaceEphemeralData.isDisabled());
        Assert.assertNotNull(ownershipCache.getOwnedBundle(namespaceBundle));
        NamespaceEphemeralData namespaceEphemeralData2 = (NamespaceEphemeralData) ((Optional) ownershipCache.getOwnerAsync(namespaceBundle).get()).get();
        Assert.assertEquals(namespaceEphemeralData2.getNativeUrl(), this.selfBrokerUrl);
        Assert.assertFalse(namespaceEphemeralData2.isDisabled());
        Assert.assertNotNull(ownershipCache.getOwnedBundle(namespaceBundle));
        ownershipCache.invalidateLocalOwnerCache();
        Assert.assertNull(ownershipCache.getOwnedBundle(namespaceBundle));
        NamespaceEphemeralData namespaceEphemeralData3 = (NamespaceEphemeralData) ownershipCache.tryAcquiringOwnership(namespaceBundle).get();
        Assert.assertEquals(namespaceEphemeralData3.getNativeUrl(), this.selfBrokerUrl);
        Assert.assertFalse(namespaceEphemeralData3.isDisabled());
        Assert.assertNotNull(ownershipCache.getOwnedBundle(namespaceBundle));
        Assert.assertTrue(((Boolean) ownershipCache.checkOwnershipAsync(namespaceBundle).get()).booleanValue());
        Assert.assertEquals(namespaceEphemeralData2.getNativeUrl(), this.selfBrokerUrl);
        Assert.assertFalse(namespaceEphemeralData2.isDisabled());
        Assert.assertNotNull(ownershipCache.getOwnedBundle(namespaceBundle));
    }
}
