package org.apache.pulsar.broker.loadbalance.extensions.channel;

import java.lang.reflect.Field;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistryImpl;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.admin.Brokers;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.client.api.schema.proto.Test;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.topics.TopicCompactionStrategy;
import org.apache.pulsar.compaction.StrategicTwoPhaseCompactor;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreTableView;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.awaitility.Awaitility;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.class */
public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
    private PulsarService pulsar1;
    private PulsarService pulsar2;
    private ServiceUnitStateChannel channel1;
    private ServiceUnitStateChannel channel2;
    private String namespaceName;
    private String namespaceName2;
    private String brokerId1;
    private String brokerId2;
    private String brokerId3;
    private String bundle;
    private String bundle1;
    private String bundle2;
    private String bundle3;
    private String childBundle1Range;
    private String childBundle2Range;
    private String childBundle11;
    private String childBundle12;
    private String childBundle31;
    private String childBundle32;
    private PulsarTestContext additionalPulsarTestContext;
    private LoadManagerContext loadManagerContext;
    private BrokerRegistryImpl registry;
    private PulsarAdmin pulsarAdmin;
    private ExtensibleLoadManagerImpl loadManager;
    private final String serviceUnitStateTableViewClassName;
    private Brokers brokers;

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "serviceUnitStateTableViewClassName")
    public static Object[][] serviceUnitStateTableViewClassName() {
        return new Object[]{new Object[]{ServiceUnitStateTableViewImpl.class.getName()}, new Object[]{ServiceUnitStateMetadataStoreTableViewImpl.class.getName()}};
    }

    @Factory(dataProvider = "serviceUnitStateTableViewClassName")
    public ServiceUnitStateChannelTest(String str) {
        this.serviceUnitStateTableViewClassName = str;
    }

    private void updateConfig(ServiceConfiguration serviceConfiguration) {
        serviceConfiguration.setAllowAutoTopicCreation(true);
        serviceConfiguration.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
        serviceConfiguration.setLoadBalancerDebugModeEnabled(true);
        serviceConfiguration.setBrokerServiceCompactionMonitorIntervalInSeconds(10);
        serviceConfiguration.setLoadManagerServiceUnitStateTableViewClassName(this.serviceUnitStateTableViewClassName);
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    protected void setup() throws Exception {
        updateConfig(this.conf);
        super.internalSetup(this.conf);
        this.namespaceName = "my-tenant/my-ns";
        this.namespaceName2 = "my-tenant/my-ns2";
        this.admin.tenants().createTenant("my-tenant", createDefaultTenantInfo());
        this.admin.namespaces().createNamespace(this.namespaceName);
        this.admin.namespaces().createNamespace(this.namespaceName2);
        this.pulsar1 = this.pulsar;
        this.registry = (BrokerRegistryImpl) Mockito.spy(new BrokerRegistryImpl(this.pulsar1));
        this.registry.start();
        this.pulsarAdmin = (PulsarAdmin) Mockito.spy(this.pulsar.getAdminClient());
        this.loadManagerContext = (LoadManagerContext) Mockito.mock(LoadManagerContext.class);
        ((LoadManagerContext) Mockito.doReturn(Mockito.mock(LoadDataStore.class)).when(this.loadManagerContext)).brokerLoadDataStore();
        ((LoadManagerContext) Mockito.doReturn(Mockito.mock(LoadDataStore.class)).when(this.loadManagerContext)).topBundleLoadDataStore();
        this.loadManager = (ExtensibleLoadManagerImpl) Mockito.mock(ExtensibleLoadManagerImpl.class);
        ServiceConfiguration defaultConf = getDefaultConf();
        updateConfig(defaultConf);
        this.additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf);
        this.pulsar2 = this.additionalPulsarTestContext.getPulsarService();
        this.channel1 = createChannel(this.pulsar1);
        this.channel1.start();
        this.channel2 = createChannel(this.pulsar2);
        this.channel2.start();
        this.brokerId1 = (String) FieldUtils.readDeclaredField(this.channel1, "brokerId", true);
        this.brokerId2 = (String) FieldUtils.readDeclaredField(this.channel2, "brokerId", true);
        this.brokerId3 = "broker-3";
        this.bundle = this.namespaceName + "/0x00000000_0xffffffff";
        this.bundle1 = this.namespaceName + "/0x00000000_0xfffffff0";
        this.bundle2 = this.namespaceName + "/0xfffffff0_0xffffffff";
        this.bundle3 = this.namespaceName2 + "/0x00000000_0xffffffff";
        this.childBundle1Range = "0x7fffffff_0xffffffff";
        this.childBundle2Range = "0x00000000_0x7fffffff";
        this.childBundle11 = this.namespaceName + "/" + this.childBundle1Range;
        this.childBundle12 = this.namespaceName + "/" + this.childBundle2Range;
        this.childBundle31 = this.namespaceName2 + "/" + this.childBundle1Range;
        this.childBundle32 = this.namespaceName2 + "/" + this.childBundle2Range;
        this.brokers = (Brokers) Mockito.mock(Brokers.class);
        ((Brokers) Mockito.doReturn(CompletableFuture.failedFuture(new RuntimeException("failed"))).when(this.brokers)).healthcheckAsync((TopicVersion) ArgumentMatchers.any(), (Optional) ArgumentMatchers.any());
    }

    @BeforeMethod
    protected void initChannels() throws Exception {
        disableChannels();
        cleanTableViews();
        cleanOwnershipMonitorCounters(this.channel1);
        cleanOwnershipMonitorCounters(this.channel2);
        cleanOpsCounters(this.channel1);
        cleanOpsCounters(this.channel2);
        cleanMetadataState(this.channel1);
        cleanMetadataState(this.channel2);
        enableChannels();
        Mockito.reset(new PulsarAdmin[]{this.pulsarAdmin});
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass
    protected void cleanup() throws Exception {
        this.channel1.close();
        this.channel2.close();
        if (this.additionalPulsarTestContext != null) {
            this.additionalPulsarTestContext.close();
            this.additionalPulsarTestContext = null;
        }
        this.pulsar1 = null;
        this.pulsar2 = null;
        super.internalCleanup();
    }

    @Test(priority = Test.TestEnum.SHARED_VALUE)
    public void channelOwnerTest() throws Exception {
        String str = (String) ((Optional) this.channel1.getChannelOwnerAsync().get(2L, TimeUnit.SECONDS)).get();
        AssertJUnit.assertEquals(str, (String) ((Optional) this.channel2.getChannelOwnerAsync().get(2L, TimeUnit.SECONDS)).get());
        LeaderElectionService leaderElectionService = (LeaderElectionService) FieldUtils.readDeclaredField(this.channel1, "leaderElectionService", true);
        leaderElectionService.close();
        waitUntilNewChannelOwner(this.channel2, str);
        leaderElectionService.start();
        waitUntilNewChannelOwner(this.channel1, str);
        Optional optional = (Optional) this.channel1.getChannelOwnerAsync().get(2L, TimeUnit.SECONDS);
        AssertJUnit.assertEquals(optional, (Optional) this.channel2.getChannelOwnerAsync().get(2L, TimeUnit.SECONDS));
        Assert.assertNotEquals(str, optional);
        if (optional.equals(Optional.of(this.brokerId1))) {
            Assert.assertTrue(((Boolean) this.channel1.isChannelOwnerAsync().get(2L, TimeUnit.SECONDS)).booleanValue());
            Assert.assertFalse(((Boolean) this.channel2.isChannelOwnerAsync().get(2L, TimeUnit.SECONDS)).booleanValue());
        } else {
            Assert.assertFalse(((Boolean) this.channel1.isChannelOwnerAsync().get(2L, TimeUnit.SECONDS)).booleanValue());
            Assert.assertTrue(((Boolean) this.channel2.isChannelOwnerAsync().get(2L, TimeUnit.SECONDS)).booleanValue());
        }
    }

    @org.testng.annotations.Test(priority = 100)
    public void channelValidationTest() throws ExecutionException, InterruptedException, IllegalAccessException, PulsarServerException, TimeoutException {
        ServiceUnitStateChannelImpl createChannel = createChannel(this.pulsar);
        AssertJUnit.assertEquals(6, validateChannelStart(createChannel));
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        try {
            Future<?> submit = newSingleThreadExecutor.submit(() -> {
                try {
                    createChannel.start();
                } catch (PulsarServerException e) {
                    throw new RuntimeException((Throwable) e);
                }
            });
            int validateChannelStart = validateChannelStart(createChannel);
            submit.get();
            Assert.assertTrue(validateChannelStart > 0);
            FieldUtils.writeDeclaredField(createChannel, "channelState", ServiceUnitStateChannelImpl.ChannelState.LeaderElectionServiceStarted, true);
            AssertJUnit.assertNotNull(((Optional) createChannel.getChannelOwnerAsync().get(2L, TimeUnit.SECONDS)).get());
            Future<?> submit2 = newSingleThreadExecutor.submit(() -> {
                try {
                    createChannel.close();
                } catch (PulsarServerException e) {
                    throw new RuntimeException((Throwable) e);
                }
            });
            int validateChannelStart2 = validateChannelStart(createChannel);
            submit2.get();
            Assert.assertTrue(validateChannelStart2 > 0);
            AssertJUnit.assertEquals(6, validateChannelStart(createChannel));
            createChannel.close();
            AssertJUnit.assertEquals(6, validateChannelStart(createChannel));
            createChannel.start();
            AssertJUnit.assertNotNull(((Optional) createChannel.getChannelOwnerAsync().get(2L, TimeUnit.SECONDS)).get());
            Assert.assertThrows(IllegalStateException.class, () -> {
                createChannel.start();
            });
            createChannel.close();
            if (Collections.singletonList(newSingleThreadExecutor).get(0) != null) {
                newSingleThreadExecutor.shutdownNow();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(newSingleThreadExecutor).get(0) != null) {
                newSingleThreadExecutor.shutdownNow();
            }
            throw th;
        }
    }

    private int validateChannelStart(ServiceUnitStateChannelImpl serviceUnitStateChannelImpl) throws InterruptedException, TimeoutException {
        int i = 0;
        try {
            serviceUnitStateChannelImpl.isChannelOwnerAsync().get(2L, TimeUnit.SECONDS);
        } catch (ExecutionException e) {
            if (e.getCause() instanceof IllegalStateException) {
                i = 0 + 1;
            }
        }
        try {
            ((Optional) serviceUnitStateChannelImpl.getChannelOwnerAsync().get(2L, TimeUnit.SECONDS)).get();
        } catch (ExecutionException e2) {
            if (e2.getCause() instanceof IllegalStateException) {
                i++;
            }
        }
        try {
            ((Optional) serviceUnitStateChannelImpl.getOwnerAsync(this.bundle).get(2L, TimeUnit.SECONDS)).get();
        } catch (ExecutionException e3) {
            if (e3.getCause() instanceof IllegalStateException) {
                i++;
            }
        }
        try {
            serviceUnitStateChannelImpl.publishAssignEventAsync(this.bundle, this.brokerId1).get(2L, TimeUnit.SECONDS);
        } catch (ExecutionException e4) {
            if (e4.getCause() instanceof IllegalStateException) {
                i++;
            }
        }
        try {
            serviceUnitStateChannelImpl.publishUnloadEventAsync(new Unload(this.brokerId1, this.bundle, Optional.of(this.brokerId2))).get(2L, TimeUnit.SECONDS);
        } catch (ExecutionException e5) {
            if (e5.getCause() instanceof IllegalStateException) {
                i++;
            }
        }
        try {
            serviceUnitStateChannelImpl.publishSplitEventAsync(new Split(this.bundle, this.brokerId1, Map.of(this.childBundle1Range, Optional.empty(), this.childBundle2Range, Optional.empty()))).get(2L, TimeUnit.SECONDS);
        } catch (ExecutionException e6) {
            if (e6.getCause() instanceof IllegalStateException) {
                i++;
            }
        }
        return i;
    }

    @org.testng.annotations.Test(priority = 2)
    public void assignmentTest() throws ExecutionException, InterruptedException, IllegalAccessException, TimeoutException {
        ConcurrentHashMap concurrentHashMap = (ConcurrentHashMap) Mockito.spy(getOwnerRequests(this.channel1));
        ConcurrentHashMap concurrentHashMap2 = (ConcurrentHashMap) Mockito.spy(getOwnerRequests(this.channel2));
        CompletableFuture ownerAsync = this.channel1.getOwnerAsync(this.bundle);
        CompletableFuture ownerAsync2 = this.channel2.getOwnerAsync(this.bundle);
        Assert.assertTrue(((Optional) ownerAsync.get()).isEmpty());
        Assert.assertTrue(((Optional) ownerAsync2.get()).isEmpty());
        CompletableFuture publishAssignEventAsync = this.channel1.publishAssignEventAsync(this.bundle, this.brokerId1);
        CompletableFuture publishAssignEventAsync2 = this.channel2.publishAssignEventAsync(this.bundle, this.brokerId2);
        AssertJUnit.assertNotNull(publishAssignEventAsync);
        AssertJUnit.assertNotNull(publishAssignEventAsync2);
        waitUntilOwnerChanges(this.channel1, this.bundle, null);
        waitUntilOwnerChanges(this.channel2, this.bundle, null);
        String str = (String) publishAssignEventAsync.get(5L, TimeUnit.SECONDS);
        AssertJUnit.assertEquals(str, (String) publishAssignEventAsync2.get(5L, TimeUnit.SECONDS));
        Assert.assertTrue(str.equals(this.brokerId1) || str.equals(this.brokerId2), str);
        AssertJUnit.assertEquals((Optional) this.channel1.getOwnerAsync(this.bundle).get(), (Optional) this.channel2.getOwnerAsync(this.bundle).get());
        AssertJUnit.assertEquals(concurrentHashMap.size(), 0);
        AssertJUnit.assertEquals(concurrentHashMap2.size(), 0);
        validateHandlerCounters(this.channel1, 1L, 0L, 1L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L);
        validateHandlerCounters(this.channel2, 1L, 0L, 1L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L);
        validateEventCounters(this.channel1, 1L, 0L, 0L, 0L, 0L, 0L);
        validateEventCounters(this.channel2, 1L, 0L, 0L, 0L, 0L, 0L);
    }

    @org.testng.annotations.Test(priority = Test.TestMessage.INTFIELD_FIELD_NUMBER)
    public void assignmentTestWhenOneAssignmentFails() throws ExecutionException, InterruptedException, IllegalAccessException, TimeoutException {
        ConcurrentHashMap<String, CompletableFuture<Optional<String>>> ownerRequests = getOwnerRequests(this.channel1);
        ConcurrentHashMap<String, CompletableFuture<Optional<String>>> ownerRequests2 = getOwnerRequests(this.channel2);
        AssertJUnit.assertEquals(0, ownerRequests.size());
        AssertJUnit.assertEquals(0, ownerRequests2.size());
        ServiceUnitStateTableView tableView = getTableView(this.channel1);
        ServiceUnitStateTableView serviceUnitStateTableView = (ServiceUnitStateTableView) Mockito.spy(tableView);
        ((ServiceUnitStateTableView) Mockito.doReturn(CompletableFuture.failedFuture(new RuntimeException())).when(serviceUnitStateTableView)).put((String) ArgumentMatchers.any(), (ServiceUnitStateData) ArgumentMatchers.any());
        try {
            setTableView(this.channel1, serviceUnitStateTableView);
            CompletableFuture ownerAsync = this.channel1.getOwnerAsync(this.bundle);
            CompletableFuture ownerAsync2 = this.channel2.getOwnerAsync(this.bundle);
            Assert.assertTrue(((Optional) ownerAsync.get()).isEmpty());
            Assert.assertTrue(((Optional) ownerAsync2.get()).isEmpty());
            CompletableFuture publishAssignEventAsync = this.channel1.publishAssignEventAsync(this.bundle, this.brokerId1);
            CompletableFuture publishAssignEventAsync2 = this.channel2.publishAssignEventAsync(this.bundle, this.brokerId2);
            Assert.assertTrue(publishAssignEventAsync.isCompletedExceptionally());
            AssertJUnit.assertNotNull(publishAssignEventAsync2);
            AssertJUnit.assertEquals((String) publishAssignEventAsync2.get(5L, TimeUnit.SECONDS), this.brokerId2);
            waitUntilNewOwner(this.channel1, this.bundle, this.brokerId2);
            AssertJUnit.assertEquals(0, ownerRequests.size());
            AssertJUnit.assertEquals(0, ownerRequests2.size());
            setTableView(this.channel1, tableView);
        } catch (Throwable th) {
            setTableView(this.channel1, tableView);
            throw th;
        }
    }

    @org.testng.annotations.Test(priority = Test.TestMessage.TESTENUM_FIELD_NUMBER)
    public void transferTest() throws ExecutionException, InterruptedException, TimeoutException, IllegalAccessException {
        CompletableFuture ownerAsync = this.channel1.getOwnerAsync(this.bundle);
        CompletableFuture ownerAsync2 = this.channel2.getOwnerAsync(this.bundle);
        Assert.assertTrue(((Optional) ownerAsync.get()).isEmpty());
        Assert.assertTrue(((Optional) ownerAsync2.get()).isEmpty());
        this.channel1.publishAssignEventAsync(this.bundle, this.brokerId1);
        waitUntilNewOwner(this.channel1, this.bundle, this.brokerId1);
        waitUntilNewOwner(this.channel2, this.bundle, this.brokerId1);
        Optional optional = (Optional) this.channel1.getOwnerAsync(this.bundle).get();
        AssertJUnit.assertEquals(optional, (Optional) this.channel2.getOwnerAsync(this.bundle).get());
        AssertJUnit.assertEquals(optional, Optional.of(this.brokerId1));
        this.channel1.publishUnloadEventAsync(new Unload(this.brokerId1, this.bundle, Optional.of(this.brokerId2)));
        waitUntilNewOwner(this.channel1, this.bundle, this.brokerId2);
        waitUntilNewOwner(this.channel2, this.bundle, this.brokerId2);
        Optional optional2 = (Optional) this.channel1.getOwnerAsync(this.bundle).get(5L, TimeUnit.SECONDS);
        AssertJUnit.assertEquals(optional2, (Optional) this.channel2.getOwnerAsync(this.bundle).get(5L, TimeUnit.SECONDS));
        AssertJUnit.assertEquals(optional2, Optional.of(this.brokerId2));
        validateHandlerCounters(this.channel1, 2L, 0L, 2L, 0L, 1L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L);
        validateHandlerCounters(this.channel2, 2L, 0L, 2L, 0L, 1L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L);
        validateEventCounters(this.channel1, 1L, 0L, 0L, 0L, 1L, 0L);
        validateEventCounters(this.channel2, 0L, 0L, 0L, 0L, 0L, 0L);
    }

    @org.testng.annotations.Test(priority = Test.TestMessage.NESTEDFIELD_FIELD_NUMBER)
    public void transferTestWhenDestBrokerFails() throws ExecutionException, InterruptedException, IllegalAccessException {
        ConcurrentHashMap<String, CompletableFuture<Optional<String>>> ownerRequests = getOwnerRequests(this.channel1);
        ConcurrentHashMap<String, CompletableFuture<Optional<String>>> ownerRequests2 = getOwnerRequests(this.channel2);
        AssertJUnit.assertEquals(0, ownerRequests.size());
        AssertJUnit.assertEquals(0, ownerRequests2.size());
        this.channel1.publishAssignEventAsync(this.bundle, this.brokerId1);
        waitUntilNewOwner(this.channel1, this.bundle, this.brokerId1);
        waitUntilNewOwner(this.channel2, this.bundle, this.brokerId1);
        Optional optional = (Optional) this.channel1.getOwnerAsync(this.bundle).get();
        AssertJUnit.assertEquals(optional, (Optional) this.channel2.getOwnerAsync(this.bundle).get());
        AssertJUnit.assertEquals(optional, Optional.of(this.brokerId1));
        ServiceUnitStateTableView tableView = getTableView(this.channel2);
        ServiceUnitStateTableView serviceUnitStateTableView = (ServiceUnitStateTableView) Mockito.spy(tableView);
        ((ServiceUnitStateTableView) Mockito.doReturn(CompletableFuture.failedFuture(new RuntimeException())).when(serviceUnitStateTableView)).put((String) ArgumentMatchers.any(), (ServiceUnitStateData) ArgumentMatchers.any());
        try {
            setTableView(this.channel2, serviceUnitStateTableView);
            FieldUtils.writeDeclaredField(this.channel1, "inFlightStateWaitingTimeInMillis", 3000, true);
            FieldUtils.writeDeclaredField(this.channel2, "inFlightStateWaitingTimeInMillis", 3000, true);
            this.channel1.publishUnloadEventAsync(new Unload(this.brokerId1, this.bundle, Optional.of(this.brokerId2)));
            waitUntilState(this.channel1, this.bundle);
            waitUntilState(this.channel2, this.bundle);
            CompletableFuture ownerAsync = this.channel1.getOwnerAsync(this.bundle);
            CompletableFuture ownerAsync2 = this.channel2.getOwnerAsync(this.bundle);
            Assert.assertTrue(ownerAsync.isDone());
            AssertJUnit.assertEquals(this.brokerId2, (String) ((Optional) ownerAsync.get()).get());
            Assert.assertFalse(ownerAsync2.isDone());
            AssertJUnit.assertEquals(0, ownerRequests.size());
            AssertJUnit.assertEquals(1, ownerRequests2.size());
            Awaitility.await().atMost(10L, TimeUnit.SECONDS).untilAsserted(() -> {
                Assert.assertTrue(ownerAsync2.isCompletedExceptionally());
            });
            AssertJUnit.assertEquals(0, ownerRequests2.size());
            ((ExtensibleLoadManagerImpl) Mockito.doReturn(CompletableFuture.completedFuture(Optional.of(this.brokerId1))).when(this.loadManager)).selectAsync((ServiceUnitId) ArgumentMatchers.any(), (Set) ArgumentMatchers.any(), (LookupOptions) ArgumentMatchers.any());
            setTableView(this.channel2, tableView);
            try {
                FieldUtils.writeDeclaredField(this.channel1, "inFlightStateWaitingTimeInMillis", 1, true);
                FieldUtils.writeDeclaredField(this.channel2, "inFlightStateWaitingTimeInMillis", 1, true);
                this.channel1.monitorOwnerships(List.of(this.brokerId1, this.brokerId2));
                this.channel2.monitorOwnerships(List.of(this.brokerId1, this.brokerId2));
                waitUntilNewOwner(this.channel1, this.bundle, this.brokerId1);
                waitUntilNewOwner(this.channel2, this.bundle, this.brokerId1);
                Optional optional2 = (Optional) this.channel1.getOwnerAsync(this.bundle).get();
                AssertJUnit.assertEquals(optional2, (Optional) this.channel2.getOwnerAsync(this.bundle).get());
                AssertJUnit.assertEquals(optional2, Optional.of(this.brokerId1));
                validateMonitorCounters(((Boolean) this.channel1.isChannelOwnerAsync().get()).booleanValue() ? this.channel1 : this.channel2, 0L, 0L, 1L, 0L, 0L, 0L, 0L);
                FieldUtils.writeDeclaredField(this.channel1, "inFlightStateWaitingTimeInMillis", 30000, true);
                FieldUtils.writeDeclaredField(this.channel2, "inFlightStateWaitingTimeInMillis", 30000, true);
            } catch (Throwable th) {
                FieldUtils.writeDeclaredField(this.channel1, "inFlightStateWaitingTimeInMillis", 30000, true);
                FieldUtils.writeDeclaredField(this.channel2, "inFlightStateWaitingTimeInMillis", 30000, true);
                throw th;
            }
        } catch (Throwable th2) {
            setTableView(this.channel2, tableView);
            throw th2;
        }
    }

    @org.testng.annotations.Test(priority = 6)
    public void splitAndRetryTest() throws Exception {
        this.channel1.publishAssignEventAsync(this.bundle, this.brokerId1);
        waitUntilNewOwner(this.channel1, this.bundle, this.brokerId1);
        waitUntilNewOwner(this.channel2, this.bundle, this.brokerId1);
        Optional optional = (Optional) this.channel1.getOwnerAsync(this.bundle).get();
        Optional optional2 = (Optional) this.channel2.getOwnerAsync(this.bundle).get();
        AssertJUnit.assertEquals(optional, Optional.of(this.brokerId1));
        AssertJUnit.assertEquals(optional2, Optional.of(this.brokerId1));
        Assert.assertTrue(optional.isPresent());
        NamespaceService namespaceService = this.pulsar1.getNamespaceService();
        CompletableFuture completableFuture = new CompletableFuture();
        AtomicInteger atomicInteger = new AtomicInteger(3);
        completableFuture.completeExceptionally(new MetadataStoreException.BadVersionException("BadVersion"));
        ((NamespaceService) Mockito.doAnswer(invocationOnMock -> {
            return atomicInteger.decrementAndGet() > 0 ? completableFuture : invocationOnMock.callRealMethod();
        }).when(namespaceService)).updateNamespaceBundles((NamespaceName) ArgumentMatchers.any(), (NamespaceBundles) ArgumentMatchers.any());
        ((PulsarService) Mockito.doReturn(namespaceService).when(this.pulsar1)).getNamespaceService();
        ((NamespaceService) Mockito.doReturn(CompletableFuture.completedFuture(List.of("test-topic-1", "test-topic-2"))).when(namespaceService)).getOwnedTopicListForNamespaceBundle((NamespaceBundle) ArgumentMatchers.any());
        this.channel1.publishSplitEventAsync(new Split(this.bundle, (String) optional.get(), Map.of(this.childBundle1Range, Optional.empty(), this.childBundle2Range, Optional.empty())));
        waitUntilState(this.channel1, this.bundle, ServiceUnitState.Init);
        waitUntilState(this.channel2, this.bundle, ServiceUnitState.Init);
        validateHandlerCounters(this.channel1, 1L, 0L, 3L, 0L, 0L, 0L, 1L, 0L, 0L, 0L, 1L, 0L, 1L, 0L);
        validateHandlerCounters(this.channel2, 1L, 0L, 3L, 0L, 0L, 0L, 1L, 0L, 0L, 0L, 1L, 0L, 1L, 0L);
        validateEventCounters(this.channel1, 1L, 0L, 1L, 0L, 0L, 0L);
        validateEventCounters(this.channel2, 0L, 0L, 0L, 0L, 0L, 0L);
        ((ServiceUnitStateChannelImpl) Mockito.verify(this.channel1, Mockito.times(3))).splitServiceUnitOnceAndRetry((NamespaceService) ArgumentMatchers.any(), (NamespaceBundleFactory) ArgumentMatchers.any(), (NamespaceBundleSplitAlgorithm) ArgumentMatchers.any(), (NamespaceBundle) ArgumentMatchers.any(), (List) ArgumentMatchers.any(), (List) ArgumentMatchers.any(), (ServiceUnitStateData) ArgumentMatchers.any(), (AtomicInteger) ArgumentMatchers.any(), ArgumentMatchers.anyLong(), (CompletableFuture) ArgumentMatchers.any());
        waitUntilNewOwner(this.channel1, this.childBundle11, this.brokerId1);
        waitUntilNewOwner(this.channel1, this.childBundle12, this.brokerId1);
        waitUntilNewOwner(this.channel2, this.childBundle11, this.brokerId1);
        waitUntilNewOwner(this.channel2, this.childBundle12, this.brokerId1);
        AssertJUnit.assertEquals(Optional.of(this.brokerId1), this.channel1.getOwnerAsync(this.childBundle11).get());
        AssertJUnit.assertEquals(Optional.of(this.brokerId1), this.channel1.getOwnerAsync(this.childBundle12).get());
        AssertJUnit.assertEquals(Optional.of(this.brokerId1), this.channel2.getOwnerAsync(this.childBundle11).get());
        AssertJUnit.assertEquals(Optional.of(this.brokerId1), this.channel2.getOwnerAsync(this.childBundle12).get());
        FieldUtils.writeDeclaredField(this.channel1, "inFlightStateWaitingTimeInMillis", 1, true);
        FieldUtils.writeDeclaredField(this.channel1, "stateTombstoneDelayTimeInMillis", 1, true);
        FieldUtils.writeDeclaredField(this.channel2, "inFlightStateWaitingTimeInMillis", 1, true);
        FieldUtils.writeDeclaredField(this.channel2, "stateTombstoneDelayTimeInMillis", 1, true);
        this.channel1.monitorOwnerships(List.of(this.brokerId1, this.brokerId2));
        this.channel2.monitorOwnerships(List.of(this.brokerId1, this.brokerId2));
        waitUntilState(this.channel1, this.bundle, ServiceUnitState.Init);
        waitUntilState(this.channel2, this.bundle, ServiceUnitState.Init);
        validateMonitorCounters(((Boolean) this.channel1.isChannelOwnerAsync().get()).booleanValue() ? this.channel1 : this.channel2, 0L, 0L, 0L, 0L, 0L, 0L, 0L);
        try {
            disableChannels();
            overrideTableView(this.channel1, this.childBundle11, null);
            overrideTableView(this.channel2, this.childBundle11, null);
            overrideTableView(this.channel1, this.childBundle12, null);
            overrideTableView(this.channel2, this.childBundle12, null);
            enableChannels();
            FieldUtils.writeDeclaredField(this.channel1, "inFlightStateWaitingTimeInMillis", 30000, true);
            FieldUtils.writeDeclaredField(this.channel1, "stateTombstoneDelayTimeInMillis", 300000, true);
            FieldUtils.writeDeclaredField(this.channel2, "inFlightStateWaitingTimeInMillis", 30000, true);
            FieldUtils.writeDeclaredField(this.channel2, "stateTombstoneDelayTimeInMillis", 300000, true);
        } catch (Throwable th) {
            enableChannels();
            throw th;
        }
    }

    @org.testng.annotations.Test(priority = 7)
    public void handleMetadataSessionEventTest() throws IllegalAccessException {
        long currentTimeMillis = System.currentTimeMillis();
        ServiceUnitStateChannelImpl serviceUnitStateChannelImpl = this.channel1;
        serviceUnitStateChannelImpl.handleMetadataSessionEvent(SessionEvent.SessionReestablished);
        SessionEvent lastMetadataSessionEvent = getLastMetadataSessionEvent(serviceUnitStateChannelImpl);
        long lastMetadataSessionEventTimestamp = getLastMetadataSessionEventTimestamp(serviceUnitStateChannelImpl);
        AssertJUnit.assertEquals(SessionEvent.SessionReestablished, lastMetadataSessionEvent);
        MatcherAssert.assertThat(Long.valueOf(lastMetadataSessionEventTimestamp), Matchers.greaterThanOrEqualTo(Long.valueOf(currentTimeMillis)));
        long currentTimeMillis2 = System.currentTimeMillis();
        serviceUnitStateChannelImpl.handleMetadataSessionEvent(SessionEvent.SessionLost);
        SessionEvent lastMetadataSessionEvent2 = getLastMetadataSessionEvent(serviceUnitStateChannelImpl);
        long lastMetadataSessionEventTimestamp2 = getLastMetadataSessionEventTimestamp(serviceUnitStateChannelImpl);
        AssertJUnit.assertEquals(SessionEvent.SessionLost, lastMetadataSessionEvent2);
        MatcherAssert.assertThat(Long.valueOf(lastMetadataSessionEventTimestamp2), Matchers.greaterThanOrEqualTo(Long.valueOf(currentTimeMillis2)));
        long currentTimeMillis3 = System.currentTimeMillis();
        serviceUnitStateChannelImpl.handleMetadataSessionEvent(SessionEvent.ConnectionLost);
        SessionEvent lastMetadataSessionEvent3 = getLastMetadataSessionEvent(serviceUnitStateChannelImpl);
        long lastMetadataSessionEventTimestamp3 = getLastMetadataSessionEventTimestamp(serviceUnitStateChannelImpl);
        AssertJUnit.assertEquals(SessionEvent.SessionLost, lastMetadataSessionEvent3);
        MatcherAssert.assertThat(Long.valueOf(lastMetadataSessionEventTimestamp3), Matchers.lessThanOrEqualTo(Long.valueOf(currentTimeMillis3)));
        long currentTimeMillis4 = System.currentTimeMillis();
        serviceUnitStateChannelImpl.handleMetadataSessionEvent(SessionEvent.Reconnected);
        SessionEvent lastMetadataSessionEvent4 = getLastMetadataSessionEvent(serviceUnitStateChannelImpl);
        long lastMetadataSessionEventTimestamp4 = getLastMetadataSessionEventTimestamp(serviceUnitStateChannelImpl);
        AssertJUnit.assertEquals(SessionEvent.SessionLost, lastMetadataSessionEvent4);
        MatcherAssert.assertThat(Long.valueOf(lastMetadataSessionEventTimestamp4), Matchers.lessThanOrEqualTo(Long.valueOf(currentTimeMillis4)));
    }

    @org.testng.annotations.Test(priority = 8)
    public void handleBrokerCreationEventTest() throws IllegalAccessException {
        ConcurrentHashMap<String, CompletableFuture<Void>> cleanupJobs = getCleanupJobs(this.channel1);
        String str = this.brokerId2;
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        cleanupJobs.put(str, completableFuture);
        this.channel1.handleBrokerRegistrationEvent(str, NotificationType.Created);
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).untilAsserted(() -> {
            AssertJUnit.assertEquals(0, cleanupJobs.size());
            Assert.assertTrue(completableFuture.isCancelled());
        });
    }

    @org.testng.annotations.Test(priority = 9)
    public void handleBrokerDeletionEventTest() throws Exception {
        ConcurrentHashMap<String, CompletableFuture<Void>> cleanupJobs = getCleanupJobs(this.channel1);
        ConcurrentHashMap<String, CompletableFuture<Void>> cleanupJobs2 = getCleanupJobs(this.channel2);
        ConcurrentHashMap concurrentHashMap = (ConcurrentHashMap) Mockito.spy(cleanupJobs);
        ConcurrentHashMap concurrentHashMap2 = (ConcurrentHashMap) Mockito.spy(cleanupJobs2);
        ServiceUnitStateChannelImpl serviceUnitStateChannelImpl = this.channel1;
        ServiceUnitStateChannelImpl serviceUnitStateChannelImpl2 = this.channel2;
        String str = (String) ((Optional) this.channel1.getChannelOwnerAsync().get(2L, TimeUnit.SECONDS)).get();
        AssertJUnit.assertEquals(str, (String) ((Optional) this.channel2.getChannelOwnerAsync().get(2L, TimeUnit.SECONDS)).get());
        if (str.equals(this.brokerId2)) {
            serviceUnitStateChannelImpl = this.channel2;
            serviceUnitStateChannelImpl2 = (ServiceUnitStateChannelImpl) this.channel1;
            concurrentHashMap2 = concurrentHashMap;
            concurrentHashMap = concurrentHashMap2;
        }
        ConcurrentHashMap concurrentHashMap3 = concurrentHashMap;
        ConcurrentHashMap concurrentHashMap4 = concurrentHashMap2;
        FieldUtils.writeDeclaredField(serviceUnitStateChannelImpl, "cleanupJobs", concurrentHashMap3, true);
        FieldUtils.writeDeclaredField(serviceUnitStateChannelImpl2, "cleanupJobs", concurrentHashMap4, true);
        CompletableFuture ownerAsync = this.channel1.getOwnerAsync(this.bundle1);
        CompletableFuture ownerAsync2 = this.channel2.getOwnerAsync(this.bundle2);
        ((ExtensibleLoadManagerImpl) Mockito.doReturn(CompletableFuture.completedFuture(Optional.of(this.brokerId2))).when(this.loadManager)).selectAsync((ServiceUnitId) ArgumentMatchers.any(), (Set) ArgumentMatchers.any(), (LookupOptions) ArgumentMatchers.any());
        Assert.assertTrue(((Optional) ownerAsync.get()).isEmpty());
        Assert.assertTrue(((Optional) ownerAsync2.get()).isEmpty());
        String str2 = this.brokerId1;
        this.channel1.publishAssignEventAsync(this.bundle1, str2);
        this.channel2.publishAssignEventAsync(this.bundle2, str2);
        waitUntilNewOwner(this.channel1, this.bundle1, str2);
        waitUntilNewOwner(this.channel2, this.bundle1, str2);
        waitUntilNewOwner(this.channel1, this.bundle2, str2);
        waitUntilNewOwner(this.channel2, this.bundle2, str2);
        this.channel1.publishUnloadEventAsync(new Unload(str2, this.bundle1, Optional.of(this.brokerId2)));
        waitUntilNewOwner(this.channel1, this.bundle1, this.brokerId2);
        waitUntilNewOwner(this.channel2, this.bundle1, this.brokerId2);
        serviceUnitStateChannelImpl.handleMetadataSessionEvent(SessionEvent.SessionReestablished);
        serviceUnitStateChannelImpl2.handleMetadataSessionEvent(SessionEvent.SessionReestablished);
        FieldUtils.writeDeclaredField(serviceUnitStateChannelImpl, "lastMetadataSessionEventTimestamp", Long.valueOf(System.currentTimeMillis() - 181000), true);
        FieldUtils.writeDeclaredField(serviceUnitStateChannelImpl2, "lastMetadataSessionEventTimestamp", Long.valueOf(System.currentTimeMillis() - 181000), true);
        ((PulsarAdmin) Mockito.doReturn(this.brokers).when(this.pulsarAdmin)).brokers();
        serviceUnitStateChannelImpl.handleBrokerRegistrationEvent(str2, NotificationType.Deleted);
        serviceUnitStateChannelImpl2.handleBrokerRegistrationEvent(str2, NotificationType.Deleted);
        serviceUnitStateChannelImpl.handleBrokerRegistrationEvent(this.brokerId2, NotificationType.Deleted);
        serviceUnitStateChannelImpl2.handleBrokerRegistrationEvent(this.brokerId2, NotificationType.Deleted);
        waitUntilNewOwner(this.channel1, this.bundle1, this.brokerId2);
        waitUntilNewOwner(this.channel2, this.bundle1, this.brokerId2);
        waitUntilNewOwner(this.channel1, this.bundle2, this.brokerId2);
        waitUntilNewOwner(this.channel2, this.bundle2, this.brokerId2);
        ((ConcurrentHashMap) Mockito.verify(concurrentHashMap3, Mockito.times(1))).computeIfAbsent((String) ArgumentMatchers.eq(str2), (Function) ArgumentMatchers.any());
        ((ConcurrentHashMap) Mockito.verify(concurrentHashMap4, Mockito.times(0))).computeIfAbsent((String) ArgumentMatchers.eq(str2), (Function) ArgumentMatchers.any());
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).untilAsserted(() -> {
            AssertJUnit.assertEquals(0, concurrentHashMap3.size());
            AssertJUnit.assertEquals(0, concurrentHashMap4.size());
        });
        validateMonitorCounters(serviceUnitStateChannelImpl, 2L, 0L, 3L, 0L, 2L, 0L, 0L);
        this.channel1.publishUnloadEventAsync(new Unload(this.brokerId2, this.bundle1, Optional.of(str2)));
        this.channel1.publishUnloadEventAsync(new Unload(this.brokerId2, this.bundle2, Optional.of(str2)));
        waitUntilNewOwner(this.channel1, this.bundle1, str2);
        waitUntilNewOwner(this.channel2, this.bundle1, str2);
        waitUntilNewOwner(this.channel1, this.bundle2, str2);
        waitUntilNewOwner(this.channel2, this.bundle2, str2);
        serviceUnitStateChannelImpl.handleMetadataSessionEvent(SessionEvent.SessionReestablished);
        serviceUnitStateChannelImpl2.handleMetadataSessionEvent(SessionEvent.SessionReestablished);
        serviceUnitStateChannelImpl.handleBrokerRegistrationEvent(str2, NotificationType.Deleted);
        serviceUnitStateChannelImpl2.handleBrokerRegistrationEvent(str2, NotificationType.Deleted);
        ((ConcurrentHashMap) Mockito.verify(concurrentHashMap3, Mockito.times(2))).computeIfAbsent((String) ArgumentMatchers.eq(str2), (Function) ArgumentMatchers.any());
        ((ConcurrentHashMap) Mockito.verify(concurrentHashMap4, Mockito.times(0))).computeIfAbsent((String) ArgumentMatchers.eq(str2), (Function) ArgumentMatchers.any());
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).untilAsserted(() -> {
            AssertJUnit.assertEquals(1, concurrentHashMap3.size());
            AssertJUnit.assertEquals(0, concurrentHashMap4.size());
        });
        validateMonitorCounters(serviceUnitStateChannelImpl, 2L, 0L, 3L, 0L, 3L, 0L, 0L);
        Mockito.reset(new PulsarAdmin[]{this.pulsarAdmin});
        serviceUnitStateChannelImpl.handleBrokerRegistrationEvent(str2, NotificationType.Created);
        serviceUnitStateChannelImpl2.handleBrokerRegistrationEvent(str2, NotificationType.Created);
        ((ConcurrentHashMap) Mockito.verify(concurrentHashMap3, Mockito.times(2))).computeIfAbsent((String) ArgumentMatchers.eq(str2), (Function) ArgumentMatchers.any());
        ((ConcurrentHashMap) Mockito.verify(concurrentHashMap4, Mockito.times(0))).computeIfAbsent((String) ArgumentMatchers.eq(str2), (Function) ArgumentMatchers.any());
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).untilAsserted(() -> {
            AssertJUnit.assertEquals(0, concurrentHashMap3.size());
            AssertJUnit.assertEquals(0, concurrentHashMap4.size());
        });
        validateMonitorCounters(serviceUnitStateChannelImpl, 2L, 0L, 3L, 0L, 3L, 0L, 1L);
        ((PulsarAdmin) Mockito.doReturn(this.brokers).when(this.pulsarAdmin)).brokers();
        FieldUtils.writeDeclaredField(serviceUnitStateChannelImpl, "maxCleanupDelayTimeInSecs", 3, true);
        serviceUnitStateChannelImpl.handleBrokerRegistrationEvent(str2, NotificationType.Deleted);
        serviceUnitStateChannelImpl2.handleBrokerRegistrationEvent(str2, NotificationType.Deleted);
        ((ConcurrentHashMap) Mockito.verify(concurrentHashMap3, Mockito.times(3))).computeIfAbsent((String) ArgumentMatchers.eq(str2), (Function) ArgumentMatchers.any());
        ((ConcurrentHashMap) Mockito.verify(concurrentHashMap4, Mockito.times(0))).computeIfAbsent((String) ArgumentMatchers.eq(str2), (Function) ArgumentMatchers.any());
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).untilAsserted(() -> {
            AssertJUnit.assertEquals(1, concurrentHashMap3.size());
            AssertJUnit.assertEquals(0, concurrentHashMap4.size());
        });
        validateMonitorCounters(serviceUnitStateChannelImpl, 2L, 0L, 3L, 0L, 4L, 0L, 1L);
        waitUntilNewOwner(this.channel1, this.bundle1, this.brokerId2);
        waitUntilNewOwner(this.channel2, this.bundle1, this.brokerId2);
        waitUntilNewOwner(this.channel1, this.bundle2, this.brokerId2);
        waitUntilNewOwner(this.channel2, this.bundle2, this.brokerId2);
        ((ConcurrentHashMap) Mockito.verify(concurrentHashMap3, Mockito.times(3))).computeIfAbsent((String) ArgumentMatchers.eq(str2), (Function) ArgumentMatchers.any());
        ((ConcurrentHashMap) Mockito.verify(concurrentHashMap4, Mockito.times(0))).computeIfAbsent((String) ArgumentMatchers.eq(str2), (Function) ArgumentMatchers.any());
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).untilAsserted(() -> {
            AssertJUnit.assertEquals(0, concurrentHashMap3.size());
            AssertJUnit.assertEquals(0, concurrentHashMap4.size());
        });
        validateMonitorCounters(serviceUnitStateChannelImpl, 3L, 0L, 5L, 0L, 4L, 0L, 1L);
        Mockito.reset(new PulsarAdmin[]{this.pulsarAdmin});
        this.channel1.publishUnloadEventAsync(new Unload(this.brokerId2, this.bundle1, Optional.of(str2)));
        this.channel1.publishUnloadEventAsync(new Unload(this.brokerId2, this.bundle2, Optional.of(str2)));
        waitUntilNewOwner(this.channel1, this.bundle1, str2);
        waitUntilNewOwner(this.channel2, this.bundle1, str2);
        waitUntilNewOwner(this.channel1, this.bundle2, str2);
        waitUntilNewOwner(this.channel2, this.bundle2, str2);
        serviceUnitStateChannelImpl.handleMetadataSessionEvent(SessionEvent.SessionLost);
        serviceUnitStateChannelImpl2.handleMetadataSessionEvent(SessionEvent.SessionLost);
        serviceUnitStateChannelImpl.handleBrokerRegistrationEvent(str2, NotificationType.Deleted);
        serviceUnitStateChannelImpl2.handleBrokerRegistrationEvent(str2, NotificationType.Deleted);
        ((ConcurrentHashMap) Mockito.verify(concurrentHashMap3, Mockito.times(3))).computeIfAbsent((String) ArgumentMatchers.eq(str2), (Function) ArgumentMatchers.any());
        ((ConcurrentHashMap) Mockito.verify(concurrentHashMap4, Mockito.times(0))).computeIfAbsent((String) ArgumentMatchers.eq(str2), (Function) ArgumentMatchers.any());
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).untilAsserted(() -> {
            AssertJUnit.assertEquals(0, concurrentHashMap3.size());
            AssertJUnit.assertEquals(0, concurrentHashMap4.size());
        });
        validateMonitorCounters(serviceUnitStateChannelImpl, 3L, 0L, 5L, 0L, 4L, 1L, 1L);
        FieldUtils.writeDeclaredField(serviceUnitStateChannelImpl, "maxCleanupDelayTimeInSecs", 180, true);
        FieldUtils.writeDeclaredField(this.channel1, "cleanupJobs", cleanupJobs, true);
        FieldUtils.writeDeclaredField(this.channel2, "cleanupJobs", cleanupJobs2, true);
    }

    @org.testng.annotations.Test(priority = 2000)
    public void conflictAndCompactionTest() throws Exception {
        String format = String.format("%s/%s", "public/default", "0x0000000a_0xffffffff");
        CompletableFuture ownerAsync = this.channel1.getOwnerAsync(format);
        CompletableFuture ownerAsync2 = this.channel2.getOwnerAsync(format);
        Assert.assertTrue(((Optional) ownerAsync.get()).isEmpty());
        Assert.assertTrue(((Optional) ownerAsync2.get()).isEmpty());
        CompletableFuture publishAssignEventAsync = this.channel1.publishAssignEventAsync(format, this.brokerId1);
        AssertJUnit.assertNotNull(publishAssignEventAsync);
        waitUntilNewOwner(this.channel1, format, this.brokerId1);
        waitUntilNewOwner(this.channel2, format, this.brokerId1);
        AssertJUnit.assertEquals(this.brokerId1, (String) publishAssignEventAsync.get(5L, TimeUnit.SECONDS));
        FieldUtils.writeDeclaredField(this.channel2, "inFlightStateWaitingTimeInMillis", 3000, true);
        CompletableFuture publishAssignEventAsync2 = this.channel2.publishAssignEventAsync(format, this.brokerId2);
        AssertJUnit.assertNotNull(publishAssignEventAsync2);
        CompletionException completionException = null;
        try {
            publishAssignEventAsync2.join();
        } catch (CompletionException e) {
            completionException = e;
        }
        AssertJUnit.assertNull(completionException);
        AssertJUnit.assertEquals(Optional.of(this.brokerId1), this.channel2.getOwnerAsync(format).get());
        AssertJUnit.assertEquals(Optional.of(this.brokerId1), this.channel1.getOwnerAsync(format).get());
        if (this.serviceUnitStateTableViewClassName.equals(ServiceUnitStateMetadataStoreTableViewImpl.class.getCanonicalName())) {
            return;
        }
        StrategicTwoPhaseCompactor strategicTwoPhaseCompactor = (StrategicTwoPhaseCompactor) Mockito.spy(this.pulsar1.getStrategicCompactor());
        Field declaredField = FieldUtils.getDeclaredField(PulsarService.class, "strategicCompactor", true);
        FieldUtils.writeField(declaredField, this.pulsar1, strategicTwoPhaseCompactor, true);
        FieldUtils.writeField(declaredField, this.pulsar2, strategicTwoPhaseCompactor, true);
        Long compactionThreshold = this.admin.topicPolicies().getCompactionThreshold(ServiceUnitStateTableViewImpl.TOPIC);
        this.admin.topicPolicies().setCompactionThreshold(ServiceUnitStateTableViewImpl.TOPIC, 0L);
        try {
            Awaitility.await().pollInterval(200L, TimeUnit.MILLISECONDS).atMost(140L, TimeUnit.SECONDS).untilAsserted(() -> {
                this.channel1.publishAssignEventAsync(format, this.brokerId1);
                ((StrategicTwoPhaseCompactor) Mockito.verify(strategicTwoPhaseCompactor, Mockito.times(1))).compact((String) ArgumentMatchers.eq(ServiceUnitStateTableViewImpl.TOPIC), (TopicCompactionStrategy) ArgumentMatchers.any());
            });
            ServiceUnitStateChannelImpl createChannel = createChannel(this.pulsar);
            createChannel.start();
            Awaitility.await().pollInterval(200L, TimeUnit.MILLISECONDS).atMost(5L, TimeUnit.SECONDS).untilAsserted(() -> {
                AssertJUnit.assertEquals(createChannel.getOwnerAsync(format).get(), Optional.of(this.brokerId1));
            });
            createChannel.close();
            FieldUtils.writeDeclaredField(this.channel2, "inFlightStateWaitingTimeInMillis", 30000, true);
            if (compactionThreshold != null) {
                this.admin.topicPolicies().setCompactionThreshold(ServiceUnitStateTableViewImpl.TOPIC, compactionThreshold.longValue());
            }
        } catch (Throwable th) {
            FieldUtils.writeDeclaredField(this.channel2, "inFlightStateWaitingTimeInMillis", 30000, true);
            if (compactionThreshold != null) {
                this.admin.topicPolicies().setCompactionThreshold(ServiceUnitStateTableViewImpl.TOPIC, compactionThreshold.longValue());
            }
            throw th;
        }
    }

    @org.testng.annotations.Test(priority = 11)
    public void ownerLookupCountTests() throws IllegalAccessException {
        try {
            disableChannels();
            overrideTableView(this.channel1, this.bundle, new ServiceUnitStateData(ServiceUnitState.Assigning, "b1", 1L));
            this.channel1.getOwnerAsync(this.bundle);
            this.channel1.getOwnerAsync(this.bundle);
            overrideTableView(this.channel1, this.bundle, new ServiceUnitStateData(ServiceUnitState.Owned, "b1", 1L));
            this.channel1.getOwnerAsync(this.bundle);
            this.channel1.getOwnerAsync(this.bundle);
            this.channel1.getOwnerAsync(this.bundle);
            overrideTableView(this.channel1, this.bundle, new ServiceUnitStateData(ServiceUnitState.Releasing, "b1", 1L));
            this.channel1.getOwnerAsync(this.bundle);
            this.channel1.getOwnerAsync(this.bundle);
            overrideTableView(this.channel1, this.bundle, new ServiceUnitStateData(ServiceUnitState.Splitting, (String) null, "b1", 1L));
            this.channel1.getOwnerAsync(this.bundle);
            overrideTableView(this.channel1, this.bundle, new ServiceUnitStateData(ServiceUnitState.Free, "b1", 1L));
            this.channel1.getOwnerAsync(this.bundle);
            overrideTableView(this.channel1, this.bundle, new ServiceUnitStateData(ServiceUnitState.Deleted, "b1", 1L));
            this.channel1.getOwnerAsync(this.bundle);
            this.channel1.getOwnerAsync(this.bundle);
            overrideTableView(this.channel1, this.bundle, null);
            this.channel1.getOwnerAsync(this.bundle);
            this.channel1.getOwnerAsync(this.bundle);
            this.channel1.getOwnerAsync(this.bundle);
            validateOwnerLookUpCounters(this.channel1, 2L, 3L, 2L, 1L, 1L, 2L, 3L);
        } finally {
            enableChannels();
        }
    }

    @org.testng.annotations.Test(priority = 12)
    public void unloadTest() throws ExecutionException, InterruptedException, IllegalAccessException {
        this.channel1.publishAssignEventAsync(this.bundle, this.brokerId1);
        waitUntilNewOwner(this.channel1, this.bundle, this.brokerId1);
        waitUntilNewOwner(this.channel2, this.bundle, this.brokerId1);
        Optional optional = (Optional) this.channel1.getOwnerAsync(this.bundle).get();
        AssertJUnit.assertEquals(optional, (Optional) this.channel2.getOwnerAsync(this.bundle).get());
        AssertJUnit.assertEquals(optional, Optional.of(this.brokerId1));
        this.channel1.publishUnloadEventAsync(new Unload(this.brokerId1, this.bundle, Optional.empty()));
        waitUntilState(this.channel1, this.bundle, ServiceUnitState.Free);
        waitUntilState(this.channel2, this.bundle, ServiceUnitState.Free);
        CompletableFuture ownerAsync = this.channel1.getOwnerAsync(this.bundle);
        CompletableFuture ownerAsync2 = this.channel2.getOwnerAsync(this.bundle);
        AssertJUnit.assertEquals(Optional.empty(), ownerAsync.get());
        AssertJUnit.assertEquals(Optional.empty(), ownerAsync2.get());
        this.channel2.publishAssignEventAsync(this.bundle, this.brokerId2);
        waitUntilNewOwner(this.channel1, this.bundle, this.brokerId2);
        waitUntilNewOwner(this.channel2, this.bundle, this.brokerId2);
        Optional optional2 = (Optional) this.channel1.getOwnerAsync(this.bundle).get();
        AssertJUnit.assertEquals(optional2, (Optional) this.channel2.getOwnerAsync(this.bundle).get());
        AssertJUnit.assertEquals(optional2, Optional.of(this.brokerId2));
        this.channel2.publishUnloadEventAsync(new Unload(this.brokerId2, this.bundle, Optional.empty()));
        waitUntilState(this.channel1, this.bundle, ServiceUnitState.Free);
        waitUntilState(this.channel2, this.bundle, ServiceUnitState.Free);
        FieldUtils.writeDeclaredField(this.channel1, "inFlightStateWaitingTimeInMillis", 1, true);
        FieldUtils.writeDeclaredField(this.channel1, "stateTombstoneDelayTimeInMillis", 1, true);
        FieldUtils.writeDeclaredField(this.channel2, "inFlightStateWaitingTimeInMillis", 1, true);
        FieldUtils.writeDeclaredField(this.channel2, "stateTombstoneDelayTimeInMillis", 1, true);
        this.channel1.monitorOwnerships(List.of(this.brokerId1, this.brokerId2));
        this.channel2.monitorOwnerships(List.of(this.brokerId1, this.brokerId2));
        waitUntilState(this.channel1, this.bundle, ServiceUnitState.Init);
        waitUntilState(this.channel2, this.bundle, ServiceUnitState.Init);
        validateMonitorCounters(((Boolean) this.channel1.isChannelOwnerAsync().get()).booleanValue() ? this.channel1 : this.channel2, 0L, 1L, 0L, 0L, 0L, 0L, 0L);
        FieldUtils.writeDeclaredField(this.channel1, "inFlightStateWaitingTimeInMillis", 30000, true);
        FieldUtils.writeDeclaredField(this.channel1, "stateTombstoneDelayTimeInMillis", 30000, true);
        FieldUtils.writeDeclaredField(this.channel2, "inFlightStateWaitingTimeInMillis", 300000, true);
        FieldUtils.writeDeclaredField(this.channel2, "stateTombstoneDelayTimeInMillis", 300000, true);
    }

    @org.testng.annotations.Test(priority = 13)
    public void assignTestWhenDestBrokerProducerFails() throws ExecutionException, InterruptedException, IllegalAccessException {
        this.channel1.publishUnloadEventAsync(new Unload(this.brokerId1, this.bundle, Optional.empty()));
        waitUntilState(this.channel1, this.bundle, ServiceUnitState.Free);
        waitUntilState(this.channel2, this.bundle, ServiceUnitState.Free);
        AssertJUnit.assertEquals(Optional.empty(), this.channel1.getOwnerAsync(this.bundle).get());
        AssertJUnit.assertEquals(Optional.empty(), this.channel2.getOwnerAsync(this.bundle).get());
        ServiceUnitStateTableView tableView = getTableView(this.channel1);
        ServiceUnitStateTableView serviceUnitStateTableView = (ServiceUnitStateTableView) Mockito.spy(tableView);
        ((ServiceUnitStateTableView) Mockito.doReturn(CompletableFuture.failedFuture(new RuntimeException())).when(serviceUnitStateTableView)).put((String) ArgumentMatchers.any(), (ServiceUnitStateData) ArgumentMatchers.any());
        setTableView(this.channel2, serviceUnitStateTableView);
        FieldUtils.writeDeclaredField(this.channel1, "inFlightStateWaitingTimeInMillis", 3000, true);
        FieldUtils.writeDeclaredField(this.channel2, "inFlightStateWaitingTimeInMillis", 3000, true);
        ((ExtensibleLoadManagerImpl) Mockito.doReturn(CompletableFuture.completedFuture(Optional.of(this.brokerId2))).when(this.loadManager)).selectAsync((ServiceUnitId) ArgumentMatchers.any(), (Set) ArgumentMatchers.any(), (LookupOptions) ArgumentMatchers.any());
        this.channel1.publishAssignEventAsync(this.bundle, this.brokerId2);
        waitUntilState(this.channel1, this.bundle);
        waitUntilState(this.channel2, this.bundle);
        CompletableFuture ownerAsync = this.channel1.getOwnerAsync(this.bundle);
        CompletableFuture ownerAsync2 = this.channel2.getOwnerAsync(this.bundle);
        Assert.assertTrue(ownerAsync.isDone());
        Assert.assertFalse(ownerAsync2.isDone());
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assert.assertTrue(ownerAsync2.isCompletedExceptionally());
        });
        setTableView(this.channel2, tableView);
        FieldUtils.writeDeclaredField(this.channel1, "inFlightStateWaitingTimeInMillis", 1, true);
        FieldUtils.writeDeclaredField(this.channel2, "inFlightStateWaitingTimeInMillis", 1, true);
        this.channel1.monitorOwnerships(List.of(this.brokerId1, this.brokerId2));
        this.channel2.monitorOwnerships(List.of(this.brokerId1, this.brokerId2));
        waitUntilNewOwner(this.channel1, this.bundle, this.brokerId2);
        waitUntilNewOwner(this.channel2, this.bundle, this.brokerId2);
        Optional optional = (Optional) this.channel1.getOwnerAsync(this.bundle).get();
        AssertJUnit.assertEquals(optional, (Optional) this.channel2.getOwnerAsync(this.bundle).get());
        AssertJUnit.assertEquals(optional, Optional.of(this.brokerId2));
        validateMonitorCounters(((Boolean) this.channel1.isChannelOwnerAsync().get()).booleanValue() ? this.channel1 : this.channel2, 0L, 0L, 1L, 0L, 0L, 0L, 0L);
        FieldUtils.writeDeclaredField(this.channel1, "inFlightStateWaitingTimeInMillis", 30000, true);
        FieldUtils.writeDeclaredField(this.channel2, "inFlightStateWaitingTimeInMillis", 30000, true);
    }

    @org.testng.annotations.Test(priority = 14)
    public void splitTestWhenTableViewPutFails() throws ExecutionException, InterruptedException, IllegalAccessException {
        this.channel1.publishUnloadEventAsync(new Unload(this.brokerId1, this.bundle, Optional.empty()));
        waitUntilState(this.channel1, this.bundle, ServiceUnitState.Free);
        waitUntilState(this.channel2, this.bundle, ServiceUnitState.Free);
        this.channel1.publishAssignEventAsync(this.bundle, this.brokerId1);
        waitUntilState(this.channel1, this.bundle, ServiceUnitState.Owned);
        waitUntilState(this.channel2, this.bundle, ServiceUnitState.Owned);
        AssertJUnit.assertEquals(this.brokerId1, (String) ((Optional) this.channel1.getOwnerAsync(this.bundle).get()).get());
        AssertJUnit.assertEquals(this.brokerId1, (String) ((Optional) this.channel2.getOwnerAsync(this.bundle).get()).get());
        ServiceUnitStateTableView tableView = getTableView(this.channel1);
        ServiceUnitStateTableView serviceUnitStateTableView = (ServiceUnitStateTableView) Mockito.spy(tableView);
        ((ServiceUnitStateTableView) Mockito.doReturn(CompletableFuture.failedFuture(new RuntimeException())).when(serviceUnitStateTableView)).put((String) ArgumentMatchers.any(), (ServiceUnitStateData) ArgumentMatchers.any());
        setTableView(this.channel1, serviceUnitStateTableView);
        FieldUtils.writeDeclaredField(this.channel1, "inFlightStateWaitingTimeInMillis", 3000, true);
        FieldUtils.writeDeclaredField(this.channel2, "inFlightStateWaitingTimeInMillis", 3000, true);
        this.channel2.publishSplitEventAsync(new Split(this.bundle, this.brokerId1, Map.of(this.childBundle1Range, Optional.empty(), this.childBundle2Range, Optional.empty())));
        waitUntilState(this.channel1, this.bundle);
        waitUntilState(this.channel2, this.bundle);
        this.channel1.getOwnerAsync(this.bundle);
        this.channel2.getOwnerAsync(this.bundle);
        setTableView(this.channel1, tableView);
        FieldUtils.writeDeclaredField(this.channel1, "inFlightStateWaitingTimeInMillis", 1, true);
        FieldUtils.writeDeclaredField(this.channel2, "inFlightStateWaitingTimeInMillis", 1, true);
        ServiceUnitStateChannel serviceUnitStateChannel = ((Boolean) this.channel1.isChannelOwnerAsync().get()).booleanValue() ? this.channel1 : this.channel2;
        ((ExtensibleLoadManagerImpl) Mockito.doReturn(CompletableFuture.completedFuture(Optional.of(this.brokerId1))).when(this.loadManager)).selectAsync((ServiceUnitId) ArgumentMatchers.any(), (Set) ArgumentMatchers.any(), (LookupOptions) ArgumentMatchers.any());
        waitUntilStateWithMonitor(serviceUnitStateChannel, this.bundle, ServiceUnitState.Init);
        waitUntilStateWithMonitor(this.channel1, this.bundle, ServiceUnitState.Init);
        waitUntilStateWithMonitor(this.channel2, this.bundle, ServiceUnitState.Init);
        CompletableFuture ownerAsync = this.channel1.getOwnerAsync(this.bundle);
        CompletableFuture ownerAsync2 = this.channel2.getOwnerAsync(this.bundle);
        Assert.assertTrue(((Optional) ownerAsync.get()).isEmpty());
        Assert.assertTrue(((Optional) ownerAsync2.get()).isEmpty());
        FieldUtils.writeDeclaredField(this.channel1, "inFlightStateWaitingTimeInMillis", 30000, true);
        FieldUtils.writeDeclaredField(this.channel2, "inFlightStateWaitingTimeInMillis", 30000, true);
    }

    @org.testng.annotations.Test(priority = 15)
    public void testIsOwner() throws IllegalAccessException {
        boolean isOwner = this.channel1.isOwner(this.bundle);
        boolean isOwner2 = this.channel2.isOwner(this.bundle);
        Assert.assertFalse(isOwner);
        Assert.assertFalse(isOwner2);
        boolean isOwner3 = this.channel1.isOwner(this.bundle, this.brokerId2);
        boolean isOwner4 = this.channel2.isOwner(this.bundle, this.brokerId1);
        Assert.assertFalse(isOwner3);
        Assert.assertFalse(isOwner4);
        this.channel1.publishAssignEventAsync(this.bundle, this.brokerId1);
        Assert.assertFalse(this.channel2.isOwner(this.bundle));
        waitUntilOwnerChanges(this.channel1, this.bundle, null);
        waitUntilOwnerChanges(this.channel2, this.bundle, null);
        boolean isOwner5 = this.channel1.isOwner(this.bundle);
        boolean isOwner6 = this.channel2.isOwner(this.bundle);
        Assert.assertTrue(isOwner5);
        Assert.assertFalse(isOwner6);
        boolean isOwner7 = this.channel1.isOwner(this.bundle, this.brokerId1);
        boolean isOwner8 = this.channel2.isOwner(this.bundle, this.brokerId2);
        Assert.assertTrue(isOwner7);
        Assert.assertFalse(isOwner8);
        boolean isOwner9 = this.channel2.isOwner(this.bundle, this.brokerId1);
        boolean isOwner10 = this.channel1.isOwner(this.bundle, this.brokerId2);
        Assert.assertTrue(isOwner9);
        Assert.assertFalse(isOwner10);
        try {
            disableChannels();
            overrideTableView(this.channel1, this.bundle, new ServiceUnitStateData(ServiceUnitState.Assigning, this.brokerId1, 1L));
            Assert.assertFalse(this.channel1.isOwner(this.bundle));
            overrideTableView(this.channel1, this.bundle, new ServiceUnitStateData(ServiceUnitState.Owned, this.brokerId1, 1L));
            Assert.assertTrue(this.channel1.isOwner(this.bundle));
            overrideTableView(this.channel1, this.bundle, new ServiceUnitStateData(ServiceUnitState.Releasing, (String) null, this.brokerId1, 1L));
            Assert.assertFalse(this.channel1.isOwner(this.bundle));
            overrideTableView(this.channel1, this.bundle, new ServiceUnitStateData(ServiceUnitState.Splitting, (String) null, this.brokerId1, 1L));
            Assert.assertTrue(this.channel1.isOwner(this.bundle));
            overrideTableView(this.channel1, this.bundle, new ServiceUnitStateData(ServiceUnitState.Free, (String) null, this.brokerId1, 1L));
            Assert.assertFalse(this.channel1.isOwner(this.bundle));
            overrideTableView(this.channel1, this.bundle, new ServiceUnitStateData(ServiceUnitState.Deleted, (String) null, this.brokerId1, 1L));
            Assert.assertFalse(this.channel1.isOwner(this.bundle));
            overrideTableView(this.channel1, this.bundle, null);
            Assert.assertFalse(this.channel1.isOwner(this.bundle));
        } finally {
            enableChannels();
        }
    }

    @org.testng.annotations.Test(priority = 16)
    public void testGetOwnerAsync() throws Exception {
        try {
            disableChannels();
            overrideTableView(this.channel1, this.bundle, new ServiceUnitStateData(ServiceUnitState.Owned, this.brokerId1, 1L));
            AssertJUnit.assertEquals(this.brokerId1, (String) ((Optional) this.channel1.getOwnerAsync(this.bundle).get()).get());
            overrideTableView(this.channel1, this.bundle, new ServiceUnitStateData(ServiceUnitState.Owned, this.brokerId2, 1L));
            AssertJUnit.assertEquals(this.brokerId2, (String) ((Optional) this.channel1.getOwnerAsync(this.bundle).get()).get());
            overrideTableView(this.channel1, this.bundle, new ServiceUnitStateData(ServiceUnitState.Assigning, this.brokerId1, 1L));
            Assert.assertFalse(this.channel1.getOwnerAsync(this.bundle).isDone());
            overrideTableView(this.channel1, this.bundle, new ServiceUnitStateData(ServiceUnitState.Assigning, this.brokerId2, 1L));
            AssertJUnit.assertEquals(this.brokerId2, (String) ((Optional) this.channel1.getOwnerAsync(this.bundle).get()).get());
            overrideTableView(this.channel1, this.bundle, new ServiceUnitStateData(ServiceUnitState.Releasing, this.brokerId1, 1L));
            Assert.assertFalse(this.channel1.getOwnerAsync(this.bundle).isDone());
            overrideTableView(this.channel1, this.bundle, new ServiceUnitStateData(ServiceUnitState.Releasing, this.brokerId2, 1L));
            AssertJUnit.assertEquals(this.brokerId2, (String) ((Optional) this.channel1.getOwnerAsync(this.bundle).get()).get());
            overrideTableView(this.channel1, this.bundle, new ServiceUnitStateData(ServiceUnitState.Releasing, (String) null, this.brokerId1, 1L));
            AssertJUnit.assertEquals(Optional.empty(), this.channel1.getOwnerAsync(this.bundle).get());
            overrideTableView(this.channel1, this.bundle, new ServiceUnitStateData(ServiceUnitState.Splitting, (String) null, this.brokerId1, 1L));
            AssertJUnit.assertEquals(this.brokerId1, (String) ((Optional) this.channel1.getOwnerAsync(this.bundle).get()).get());
            overrideTableView(this.channel1, this.bundle, new ServiceUnitStateData(ServiceUnitState.Splitting, (String) null, this.brokerId2, 1L));
            AssertJUnit.assertEquals(this.brokerId2, (String) ((Optional) this.channel1.getOwnerAsync(this.bundle).get()).get());
            overrideTableView(this.channel1, this.bundle, new ServiceUnitStateData(ServiceUnitState.Free, (String) null, this.brokerId1, 1L));
            AssertJUnit.assertEquals(Optional.empty(), this.channel1.getOwnerAsync(this.bundle).get());
            overrideTableView(this.channel1, this.bundle, null);
            AssertJUnit.assertEquals(Optional.empty(), this.channel1.getOwnerAsync(this.bundle).get());
            overrideTableView(this.channel1, this.bundle1, new ServiceUnitStateData(ServiceUnitState.Deleted, (String) null, this.brokerId1, 1L));
            Assert.assertTrue(this.channel1.getOwnerAsync(this.bundle1).isCompletedExceptionally());
        } finally {
            enableChannels();
        }
    }

    @org.testng.annotations.Test(priority = 17)
    public void splitAndRetryFailureTest() throws Exception {
        this.channel1.publishAssignEventAsync(this.bundle3, this.brokerId1);
        waitUntilNewOwner(this.channel1, this.bundle3, this.brokerId1);
        waitUntilNewOwner(this.channel2, this.bundle3, this.brokerId1);
        Optional optional = (Optional) this.channel1.getOwnerAsync(this.bundle3).get();
        Optional optional2 = (Optional) this.channel2.getOwnerAsync(this.bundle3).get();
        AssertJUnit.assertEquals(optional, Optional.of(this.brokerId1));
        AssertJUnit.assertEquals(optional2, Optional.of(this.brokerId1));
        Assert.assertTrue(optional.isPresent());
        NamespaceService namespaceService = this.pulsar1.getNamespaceService();
        CompletableFuture completableFuture = new CompletableFuture();
        AtomicInteger atomicInteger = new AtomicInteger(10);
        completableFuture.completeExceptionally(new MetadataStoreException.BadVersionException("BadVersion"));
        ((NamespaceService) Mockito.doAnswer(invocationOnMock -> {
            return atomicInteger.decrementAndGet() > 0 ? completableFuture : invocationOnMock.callRealMethod();
        }).when(namespaceService)).updateNamespaceBundles((NamespaceName) ArgumentMatchers.any(), (NamespaceBundles) ArgumentMatchers.any());
        ((PulsarService) Mockito.doReturn(namespaceService).when(this.pulsar1)).getNamespaceService();
        ((NamespaceService) Mockito.doReturn(CompletableFuture.completedFuture(List.of("test-topic-1", "test-topic-2"))).when(namespaceService)).getOwnedTopicListForNamespaceBundle((NamespaceBundle) ArgumentMatchers.any());
        this.channel1.publishSplitEventAsync(new Split(this.bundle3, (String) optional.get(), Map.of(this.childBundle1Range, Optional.empty(), this.childBundle2Range, Optional.empty())));
        FieldUtils.writeDeclaredField(this.channel1, "inFlightStateWaitingTimeInMillis", 1, true);
        FieldUtils.writeDeclaredField(this.channel2, "inFlightStateWaitingTimeInMillis", 1, true);
        Awaitility.await().pollInterval(200L, TimeUnit.MILLISECONDS).atMost(10L, TimeUnit.SECONDS).untilAsserted(() -> {
            AssertJUnit.assertEquals(3, atomicInteger.get());
        });
        ServiceUnitStateChannelImpl serviceUnitStateChannelImpl = (ServiceUnitStateChannelImpl) (((Boolean) this.channel1.isChannelOwnerAsync().get()).booleanValue() ? this.channel1 : this.channel2);
        ((ExtensibleLoadManagerImpl) Mockito.doReturn(CompletableFuture.completedFuture(Optional.of(this.brokerId1))).when(this.loadManager)).selectAsync((ServiceUnitId) ArgumentMatchers.any(), (Set) ArgumentMatchers.any(), (LookupOptions) ArgumentMatchers.any());
        serviceUnitStateChannelImpl.monitorOwnerships(List.of(this.brokerId1, this.brokerId2));
        waitUntilState(serviceUnitStateChannelImpl, this.bundle3, ServiceUnitState.Init);
        waitUntilState(this.channel1, this.bundle3, ServiceUnitState.Init);
        waitUntilState(this.channel2, this.bundle3, ServiceUnitState.Init);
        waitUntilNewOwner(this.channel1, this.childBundle31, this.brokerId1);
        waitUntilNewOwner(this.channel1, this.childBundle32, this.brokerId1);
        waitUntilNewOwner(this.channel2, this.childBundle31, this.brokerId1);
        waitUntilNewOwner(this.channel2, this.childBundle32, this.brokerId1);
        AssertJUnit.assertEquals(Optional.of(this.brokerId1), this.channel1.getOwnerAsync(this.childBundle31).get());
        AssertJUnit.assertEquals(Optional.of(this.brokerId1), this.channel1.getOwnerAsync(this.childBundle32).get());
        AssertJUnit.assertEquals(Optional.of(this.brokerId1), this.channel2.getOwnerAsync(this.childBundle31).get());
        AssertJUnit.assertEquals(Optional.of(this.brokerId1), this.channel2.getOwnerAsync(this.childBundle32).get());
        validateHandlerCounters(this.channel1, 1L, 0L, 3L, 0L, 0L, 0L, 2L, 1L, 0L, 0L, 1L, 0L, 1L, 0L);
        validateHandlerCounters(this.channel2, 1L, 0L, 3L, 0L, 0L, 0L, 2L, 0L, 0L, 0L, 1L, 0L, 1L, 0L);
        validateEventCounters(this.channel1, 1L, 0L, 1L, 0L, 0L, 0L);
        validateEventCounters(this.channel2, 0L, 0L, 0L, 0L, 0L, 0L);
        FieldUtils.writeDeclaredField(this.channel1, "stateTombstoneDelayTimeInMillis", 1, true);
        FieldUtils.writeDeclaredField(this.channel2, "stateTombstoneDelayTimeInMillis", 1, true);
        this.channel1.monitorOwnerships(List.of(this.brokerId1, this.brokerId2));
        this.channel2.monitorOwnerships(List.of(this.brokerId1, this.brokerId2));
        waitUntilState(this.channel1, this.bundle3, ServiceUnitState.Init);
        waitUntilState(this.channel2, this.bundle3, ServiceUnitState.Init);
        validateMonitorCounters(serviceUnitStateChannelImpl, 0L, 0L, 1L, 0L, 0L, 0L, 0L);
        cleanTableViews();
        FieldUtils.writeDeclaredField(this.channel1, "inFlightStateWaitingTimeInMillis", 30000, true);
        FieldUtils.writeDeclaredField(this.channel1, "stateTombstoneDelayTimeInMillis", 300000, true);
        FieldUtils.writeDeclaredField(this.channel2, "inFlightStateWaitingTimeInMillis", 30000, true);
        FieldUtils.writeDeclaredField(this.channel2, "stateTombstoneDelayTimeInMillis", 300000, true);
    }

    @org.testng.annotations.Test(priority = 18)
    public void testOverrideInactiveBrokerStateData() throws IllegalAccessException, ExecutionException, InterruptedException, TimeoutException {
        ServiceUnitStateChannelImpl serviceUnitStateChannelImpl = this.channel1;
        ServiceUnitStateChannelImpl serviceUnitStateChannelImpl2 = this.channel2;
        String str = (String) ((Optional) this.channel1.getChannelOwnerAsync().get(2L, TimeUnit.SECONDS)).get();
        AssertJUnit.assertEquals(str, (String) ((Optional) this.channel2.getChannelOwnerAsync().get(2L, TimeUnit.SECONDS)).get());
        if (str.equals(this.brokerId2)) {
            serviceUnitStateChannelImpl = (ServiceUnitStateChannelImpl) this.channel2;
            serviceUnitStateChannelImpl2 = (ServiceUnitStateChannelImpl) this.channel1;
        }
        String str2 = this.brokerId1;
        String str3 = this.bundle;
        try {
            disableChannels();
            overrideTableViews("public/releasing/0xfffffff0_0xffffffff", new ServiceUnitStateData(ServiceUnitState.Releasing, (String) null, str2, 1L));
            overrideTableViews(str3, new ServiceUnitStateData(ServiceUnitState.Splitting, (String) null, str2, Map.of(this.childBundle1Range, Optional.empty(), this.childBundle2Range, Optional.empty()), 1L));
            overrideTableViews("public/assigning/0xfffffff0_0xffffffff", new ServiceUnitStateData(ServiceUnitState.Assigning, str2, (String) null, 1L));
            overrideTableViews("public/free/0xfffffff0_0xffffffff", new ServiceUnitStateData(ServiceUnitState.Free, (String) null, str2, 1L));
            overrideTableViews("public/deleted/0xfffffff0_0xffffffff", new ServiceUnitStateData(ServiceUnitState.Deleted, (String) null, str2, 1L));
            overrideTableViews("public/owned/0xfffffff0_0xffffffff", new ServiceUnitStateData(ServiceUnitState.Owned, str2, (String) null, 1L));
            enableChannels();
            ((ExtensibleLoadManagerImpl) Mockito.doReturn(CompletableFuture.completedFuture(Optional.of(this.brokerId2))).when(this.loadManager)).selectAsync((ServiceUnitId) ArgumentMatchers.any(), (Set) ArgumentMatchers.any(), (LookupOptions) ArgumentMatchers.any());
            serviceUnitStateChannelImpl.handleMetadataSessionEvent(SessionEvent.SessionReestablished);
            serviceUnitStateChannelImpl2.handleMetadataSessionEvent(SessionEvent.SessionReestablished);
            FieldUtils.writeDeclaredField(serviceUnitStateChannelImpl, "lastMetadataSessionEventTimestamp", Long.valueOf(System.currentTimeMillis() - 181000), true);
            FieldUtils.writeDeclaredField(serviceUnitStateChannelImpl2, "lastMetadataSessionEventTimestamp", Long.valueOf(System.currentTimeMillis() - 181000), true);
            ((PulsarAdmin) Mockito.doReturn(this.brokers).when(this.pulsarAdmin)).brokers();
            serviceUnitStateChannelImpl.handleBrokerRegistrationEvent(str2, NotificationType.Deleted);
            serviceUnitStateChannelImpl2.handleBrokerRegistrationEvent(str2, NotificationType.Deleted);
            waitUntilNewOwner(this.channel2, "public/releasing/0xfffffff0_0xffffffff", this.brokerId2);
            waitUntilNewOwner(this.channel2, this.childBundle11, this.brokerId2);
            waitUntilNewOwner(this.channel2, this.childBundle12, this.brokerId2);
            waitUntilNewOwner(this.channel2, "public/assigning/0xfffffff0_0xffffffff", this.brokerId2);
            waitUntilNewOwner(this.channel2, "public/owned/0xfffffff0_0xffffffff", this.brokerId2);
            AssertJUnit.assertEquals(Optional.empty(), this.channel2.getOwnerAsync("public/free/0xfffffff0_0xffffffff").get());
            Assert.assertTrue(this.channel2.getOwnerAsync("public/deleted/0xfffffff0_0xffffffff").isCompletedExceptionally());
            Assert.assertTrue(((Optional) this.channel2.getOwnerAsync(str3).get()).isEmpty());
            FieldUtils.writeDeclaredField(serviceUnitStateChannelImpl, "maxCleanupDelayTimeInSecs", 180, true);
            cleanTableViews();
            Mockito.reset(new PulsarAdmin[]{this.pulsarAdmin});
        } catch (Throwable th) {
            enableChannels();
            throw th;
        }
    }

    @org.testng.annotations.Test(priority = 19)
    public void testOverrideOrphanStateData() throws IllegalAccessException, ExecutionException, InterruptedException, TimeoutException {
        ServiceUnitStateChannel serviceUnitStateChannel = this.channel1;
        ServiceUnitStateChannel serviceUnitStateChannel2 = this.channel2;
        String str = (String) ((Optional) this.channel1.getChannelOwnerAsync().get(2L, TimeUnit.SECONDS)).get();
        AssertJUnit.assertEquals(str, (String) ((Optional) this.channel2.getChannelOwnerAsync().get(2L, TimeUnit.SECONDS)).get());
        if (str.equals(this.brokerId2)) {
            serviceUnitStateChannel = this.channel2;
            serviceUnitStateChannel2 = this.channel1;
        }
        String str2 = this.brokerId1;
        String str3 = this.bundle;
        try {
            disableChannels();
            overrideTableViews("public/releasing1/0xfffffff0_0xffffffff", new ServiceUnitStateData(ServiceUnitState.Releasing, str2, this.brokerId2, 1L));
            overrideTableViews("public/releasing2/0xfffffff0_0xffffffff", new ServiceUnitStateData(ServiceUnitState.Releasing, this.brokerId2, this.brokerId3, 1L));
            overrideTableViews(str3, new ServiceUnitStateData(ServiceUnitState.Splitting, (String) null, str2, Map.of(this.childBundle1Range, Optional.empty(), this.childBundle2Range, Optional.empty()), 1L));
            overrideTableViews("public/assigning1/0xfffffff0_0xffffffff", new ServiceUnitStateData(ServiceUnitState.Assigning, str2, (String) null, 1L));
            overrideTableViews("public/assigning2/0xfffffff0_0xffffffff", new ServiceUnitStateData(ServiceUnitState.Assigning, str2, this.brokerId2, 1L));
            overrideTableViews("public/free/0xfffffff0_0xffffffff", new ServiceUnitStateData(ServiceUnitState.Free, (String) null, str2, 1L));
            overrideTableViews("public/deleted/0xfffffff0_0xffffffff", new ServiceUnitStateData(ServiceUnitState.Deleted, (String) null, str2, 1L));
            overrideTableViews("public/owned1/0xfffffff0_0xffffffff", new ServiceUnitStateData(ServiceUnitState.Owned, str2, (String) null, 1L));
            overrideTableViews("public/owned2SourceBundle/0xfffffff0_0xffffffff", new ServiceUnitStateData(ServiceUnitState.Owned, str2, "broker-inactive-1", 1L));
            overrideTableViews("public/owned3/0xfffffff0_0xffffffff", new ServiceUnitStateData(ServiceUnitState.Owned, "broker-inactive-1", str2, 1L));
            enableChannels();
            ((ExtensibleLoadManagerImpl) Mockito.doReturn(CompletableFuture.completedFuture(Optional.of(this.brokerId2))).when(this.loadManager)).selectAsync((ServiceUnitId) ArgumentMatchers.any(), (Set) ArgumentMatchers.any(), (LookupOptions) ArgumentMatchers.any());
            FieldUtils.writeDeclaredField(serviceUnitStateChannel, "inFlightStateWaitingTimeInMillis", -1, true);
            FieldUtils.writeDeclaredField(serviceUnitStateChannel2, "inFlightStateWaitingTimeInMillis", -1, true);
            ((ServiceUnitStateChannelImpl) serviceUnitStateChannel).monitorOwnerships(List.of(this.brokerId1, this.brokerId2, "broker-3"));
            ServiceUnitStateChannel serviceUnitStateChannel3 = serviceUnitStateChannel;
            Awaitility.await().atMost(5L, TimeUnit.SECONDS).until(() -> {
                return Boolean.valueOf(getCleanupJobs(serviceUnitStateChannel3).isEmpty());
            });
            waitUntilNewOwner(this.channel2, "public/releasing1/0xfffffff0_0xffffffff", this.brokerId2);
            waitUntilNewOwner(this.channel2, "public/releasing2/0xfffffff0_0xffffffff", this.brokerId2);
            Assert.assertTrue(((Optional) this.channel2.getOwnerAsync(str3).get()).isEmpty());
            waitUntilNewOwner(this.channel2, this.childBundle11, this.brokerId2);
            waitUntilNewOwner(this.channel2, this.childBundle12, this.brokerId2);
            waitUntilNewOwner(this.channel2, "public/assigning1/0xfffffff0_0xffffffff", this.brokerId2);
            waitUntilNewOwner(this.channel2, "public/assigning2/0xfffffff0_0xffffffff", this.brokerId2);
            Assert.assertTrue(((Optional) this.channel2.getOwnerAsync("public/free/0xfffffff0_0xffffffff").get()).isEmpty());
            Assert.assertTrue(this.channel2.getOwnerAsync("public/deleted/0xfffffff0_0xffffffff").isCompletedExceptionally());
            waitUntilNewOwner(this.channel2, "public/owned1/0xfffffff0_0xffffffff", str2);
            waitUntilNewOwner(this.channel2, "public/owned2SourceBundle/0xfffffff0_0xffffffff", str2);
            waitUntilNewOwner(this.channel2, "public/owned3/0xfffffff0_0xffffffff", this.brokerId2);
            validateMonitorCounters(serviceUnitStateChannel, 1L, 0L, 6L, 0L, 1L, 0L, 0L);
            FieldUtils.writeDeclaredField(this.channel1, "inFlightStateWaitingTimeInMillis", 30000, true);
            FieldUtils.writeDeclaredField(this.channel2, "inFlightStateWaitingTimeInMillis", 30000, true);
            cleanTableViews();
        } catch (Throwable th) {
            enableChannels();
            throw th;
        }
    }

    @org.testng.annotations.Test(priority = 20)
    public void testActiveGetOwner() throws Exception {
        String str = this.brokerId2;
        String str2 = "public/owned/0xfffffff0_0xffffffff";
        try {
            disableChannels();
            overrideTableViews("public/owned/0xfffffff0_0xffffffff", null);
            AssertJUnit.assertEquals(Optional.empty(), this.channel1.getOwnerAsync("public/owned/0xfffffff0_0xffffffff").get());
            overrideTableViews("public/owned/0xfffffff0_0xffffffff", new ServiceUnitStateData(ServiceUnitState.Releasing, str, this.brokerId1, 1L));
            AssertJUnit.assertEquals(Optional.of(str), this.channel1.getOwnerAsync("public/owned/0xfffffff0_0xffffffff").get());
            overrideTableViews("public/owned/0xfffffff0_0xffffffff", new ServiceUnitStateData(ServiceUnitState.Assigning, this.brokerId1, this.brokerId2, 1L));
            Assert.assertFalse(this.channel1.getOwnerAsync("public/owned/0xfffffff0_0xffffffff").isDone());
            overrideTableViews("public/owned/0xfffffff0_0xffffffff", new ServiceUnitStateData(ServiceUnitState.Owned, str, (String) null, 1L));
            AssertJUnit.assertEquals((String) ((Optional) this.channel1.getOwnerAsync("public/owned/0xfffffff0_0xffffffff").get(5L, TimeUnit.SECONDS)).get(), str);
            enableChannels();
            FieldUtils.writeDeclaredField(this.channel1, "inFlightStateWaitingTimeInMillis", 1000, true);
            CompletableFuture completableFuture = new CompletableFuture();
            ((BrokerRegistryImpl) Mockito.doReturn(completableFuture).when(this.registry)).lookupAsync((String) ArgumentMatchers.eq(str));
            CompletableFuture.runAsync(() -> {
                try {
                    Thread.sleep(500L);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                completableFuture.complete(Optional.of(str));
            });
            long currentTimeMillis = System.currentTimeMillis();
            AssertJUnit.assertEquals(str, (String) ((Optional) this.channel1.getOwnerAsync("public/owned/0xfffffff0_0xffffffff").get()).get());
            Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis < 1000);
            ((BrokerRegistryImpl) Mockito.doReturn(CompletableFuture.completedFuture(Optional.empty())).when(this.registry)).lookupAsync((String) ArgumentMatchers.eq(str));
            long currentTimeMillis2 = System.currentTimeMillis();
            Assert.assertTrue(((ExecutionException) Assert.expectThrows(ExecutionException.class, () -> {
                this.channel1.getOwnerAsync(str2).get();
            })).getCause() instanceof IllegalStateException);
            Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis2 >= 1000);
            try {
                this.registry.unregister();
                long currentTimeMillis3 = System.currentTimeMillis();
                AssertJUnit.assertEquals(str, (String) ((Optional) this.channel1.getOwnerAsync("public/owned/0xfffffff0_0xffffffff").get()).get());
                Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis3 < 1000);
                this.registry.registerAsync().join();
                ((ExtensibleLoadManagerImpl) Mockito.doReturn(CompletableFuture.completedFuture(Optional.empty())).when(this.loadManager)).selectAsync((ServiceUnitId) ArgumentMatchers.any(), (Set) ArgumentMatchers.any(), (LookupOptions) ArgumentMatchers.any());
                ServiceUnitStateChannelImpl serviceUnitStateChannelImpl = this.channel1;
                String str3 = (String) ((Optional) this.channel1.getChannelOwnerAsync().get(2L, TimeUnit.SECONDS)).get();
                AssertJUnit.assertEquals(str3, (String) ((Optional) this.channel2.getChannelOwnerAsync().get(2L, TimeUnit.SECONDS)).get());
                if (str3.equals(this.brokerId2)) {
                    serviceUnitStateChannelImpl = (ServiceUnitStateChannelImpl) this.channel2;
                }
                serviceUnitStateChannelImpl.handleMetadataSessionEvent(SessionEvent.SessionReestablished);
                FieldUtils.writeDeclaredField(serviceUnitStateChannelImpl, "lastMetadataSessionEventTimestamp", Long.valueOf(System.currentTimeMillis() - 181000), true);
                ((PulsarAdmin) Mockito.doReturn(this.brokers).when(this.pulsarAdmin)).brokers();
                serviceUnitStateChannelImpl.handleBrokerRegistrationEvent(str, NotificationType.Deleted);
                FieldUtils.writeDeclaredField(this.channel1, "inFlightStateWaitingTimeInMillis", 20000, true);
                long currentTimeMillis4 = System.currentTimeMillis();
                Assert.assertTrue(((Optional) this.channel1.getOwnerAsync("public/owned/0xfffffff0_0xffffffff").get()).isEmpty());
                waitUntilState(this.channel1, "public/owned/0xfffffff0_0xffffffff", ServiceUnitState.Init);
                waitUntilState(this.channel2, "public/owned/0xfffffff0_0xffffffff", ServiceUnitState.Init);
                Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis4 < 20000);
                Mockito.reset(new PulsarAdmin[]{this.pulsarAdmin});
                try {
                    disableChannels();
                    overrideTableViews("public/owned/0xfffffff0_0xffffffff", new ServiceUnitStateData(ServiceUnitState.Owned, str, (String) null, 1L));
                    enableChannels();
                    ((ExtensibleLoadManagerImpl) Mockito.doReturn(CompletableFuture.completedFuture(Optional.of(this.brokerId1))).when(this.loadManager)).selectAsync((ServiceUnitId) ArgumentMatchers.any(), (Set) ArgumentMatchers.any(), (LookupOptions) ArgumentMatchers.any());
                    serviceUnitStateChannelImpl.handleMetadataSessionEvent(SessionEvent.SessionReestablished);
                    FieldUtils.writeDeclaredField(serviceUnitStateChannelImpl, "lastMetadataSessionEventTimestamp", Long.valueOf(System.currentTimeMillis() - 181000), true);
                    getCleanupJobs(serviceUnitStateChannelImpl).clear();
                    ((PulsarAdmin) Mockito.doReturn(this.brokers).when(this.pulsarAdmin)).brokers();
                    serviceUnitStateChannelImpl.handleBrokerRegistrationEvent(str, NotificationType.Deleted);
                    long currentTimeMillis5 = System.currentTimeMillis();
                    AssertJUnit.assertEquals(this.brokerId1, (String) ((Optional) this.channel1.getOwnerAsync("public/owned/0xfffffff0_0xffffffff").get()).get());
                    Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis5 < 20000);
                    FieldUtils.writeDeclaredField(this.channel1, "inFlightStateWaitingTimeInMillis", 30000, true);
                    cleanTableViews();
                    Mockito.reset(new PulsarAdmin[]{this.pulsarAdmin});
                } finally {
                }
            } catch (Throwable th) {
                this.registry.registerAsync().join();
                throw th;
            }
        } finally {
        }
    }

    @org.testng.annotations.Test(priority = 21)
    public void testGetOwnershipEntrySetBeforeChannelStart() {
        try {
            new ServiceUnitStateChannelImpl(this.pulsar1).getOwnershipEntrySet();
            Assert.fail();
        } catch (Exception e) {
            Assert.assertTrue(e instanceof IllegalStateException);
            AssertJUnit.assertEquals("Invalid channel state:Constructed", e.getMessage());
        }
    }

    @org.testng.annotations.Test(priority = 22)
    public void unloadTimeoutCheckTest() throws Exception {
        String str = "persistent://" + this.namespaceName + "/test-topic";
        NamespaceBundle bundle = this.pulsar.getNamespaceService().getBundle(TopicName.get(str));
        ServiceUnitStateData serviceUnitStateData = new ServiceUnitStateData(ServiceUnitState.Releasing, this.pulsar2.getBrokerId(), this.pulsar1.getBrokerId(), 1L);
        try {
            try {
                disableChannels();
                overrideTableView(this.channel1, bundle.toString(), serviceUnitStateData);
                this.pulsar1.getBrokerService().getOrCreateTopic(str).get(1L, TimeUnit.SECONDS);
                enableChannels();
            } catch (Exception e) {
                if (!(e.getCause() instanceof BrokerServiceException.ServiceUnitNotReadyException) || !e.getMessage().contains("Please redo the lookup")) {
                    Assert.fail();
                }
                enableChannels();
            }
            this.pulsar1.getBrokerService().unloadServiceUnit(bundle, true, true, 5L, TimeUnit.SECONDS).get(2L, TimeUnit.SECONDS);
        } catch (Throwable th) {
            enableChannels();
            throw th;
        }
    }

    private static ConcurrentHashMap<String, CompletableFuture<Optional<String>>> getOwnerRequests(ServiceUnitStateChannel serviceUnitStateChannel) throws IllegalAccessException {
        return (ConcurrentHashMap) FieldUtils.readDeclaredField(serviceUnitStateChannel, "getOwnerRequests", true);
    }

    private static SessionEvent getLastMetadataSessionEvent(ServiceUnitStateChannel serviceUnitStateChannel) throws IllegalAccessException {
        return (SessionEvent) FieldUtils.readField(serviceUnitStateChannel, "lastMetadataSessionEvent", true);
    }

    private static long getLastMetadataSessionEventTimestamp(ServiceUnitStateChannel serviceUnitStateChannel) throws IllegalAccessException {
        return ((Long) FieldUtils.readField(serviceUnitStateChannel, "lastMetadataSessionEventTimestamp", true)).longValue();
    }

    private static ConcurrentHashMap<String, CompletableFuture<Void>> getCleanupJobs(ServiceUnitStateChannel serviceUnitStateChannel) throws IllegalAccessException {
        return (ConcurrentHashMap) FieldUtils.readField(serviceUnitStateChannel, "cleanupJobs", true);
    }

    private static void waitUntilNewChannelOwner(ServiceUnitStateChannel serviceUnitStateChannel, String str) {
        Awaitility.await().pollInterval(200L, TimeUnit.MILLISECONDS).atMost(10L, TimeUnit.SECONDS).until(() -> {
            CompletableFuture channelOwnerAsync = serviceUnitStateChannel.getChannelOwnerAsync();
            if (channelOwnerAsync.isDone()) {
                return Boolean.valueOf(!StringUtils.equals(str, (String) ((Optional) channelOwnerAsync.get()).orElse(null)));
            }
            return false;
        });
    }

    private static void waitUntilOwnerChanges(ServiceUnitStateChannel serviceUnitStateChannel, String str, String str2) {
        Awaitility.await().pollInterval(200L, TimeUnit.MILLISECONDS).atMost(10L, TimeUnit.SECONDS).until(() -> {
            CompletableFuture ownerAsync = serviceUnitStateChannel.getOwnerAsync(str);
            if (ownerAsync.isDone()) {
                return Boolean.valueOf(!StringUtils.equals(str2, (String) ((Optional) ownerAsync.get()).orElse(null)));
            }
            return false;
        });
    }

    private static void waitUntilNewOwner(ServiceUnitStateChannel serviceUnitStateChannel, String str, String str2) {
        Awaitility.await().pollInterval(200L, TimeUnit.MILLISECONDS).atMost(15L, TimeUnit.SECONDS).until(() -> {
            try {
                CompletableFuture ownerAsync = serviceUnitStateChannel.getOwnerAsync(str);
                if (ownerAsync.isDone()) {
                    return Boolean.valueOf(StringUtils.equals(str2, (String) ((Optional) ownerAsync.get()).orElse(null)));
                }
                return false;
            } catch (Exception e) {
                return false;
            }
        });
    }

    private static ServiceUnitStateTableView getTableView(ServiceUnitStateChannel serviceUnitStateChannel) throws IllegalAccessException {
        return (ServiceUnitStateTableView) FieldUtils.readField(serviceUnitStateChannel, "tableview", true);
    }

    private static void setTableView(ServiceUnitStateChannel serviceUnitStateChannel, ServiceUnitStateTableView serviceUnitStateTableView) throws IllegalAccessException {
        FieldUtils.writeField(serviceUnitStateChannel, "tableview", serviceUnitStateTableView, true);
    }

    private static void waitUntilState(ServiceUnitStateChannel serviceUnitStateChannel, String str) throws IllegalAccessException {
        ServiceUnitStateTableView tableView = getTableView(serviceUnitStateChannel);
        Awaitility.await().pollInterval(200L, TimeUnit.MILLISECONDS).atMost(10L, TimeUnit.SECONDS).until(() -> {
            ServiceUnitStateData serviceUnitStateData = tableView.get(str);
            if (serviceUnitStateData == null) {
                return true;
            }
            return Boolean.valueOf(serviceUnitStateData.state() != ServiceUnitState.Owned);
        });
    }

    private static void waitUntilState(ServiceUnitStateChannel serviceUnitStateChannel, String str, ServiceUnitState serviceUnitState) throws IllegalAccessException {
        ServiceUnitStateTableView tableView = getTableView(serviceUnitStateChannel);
        Awaitility.await().pollInterval(200L, TimeUnit.MILLISECONDS).atMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(ServiceUnitStateData.state(tableView.get(str)) == serviceUnitState);
        });
    }

    private void waitUntilStateWithMonitor(ServiceUnitStateChannel serviceUnitStateChannel, String str, ServiceUnitState serviceUnitState) throws IllegalAccessException {
        ServiceUnitStateTableView tableView = getTableView(serviceUnitStateChannel);
        Awaitility.await().pollInterval(200L, TimeUnit.MILLISECONDS).atMost(10L, TimeUnit.SECONDS).until(() -> {
            ((ServiceUnitStateChannelImpl) serviceUnitStateChannel).monitorOwnerships(List.of(this.brokerId1, this.brokerId2));
            return Boolean.valueOf(ServiceUnitStateData.state(tableView.get(str)) == serviceUnitState);
        });
    }

    private void cleanTableViews() throws IllegalAccessException {
        cleanTableView(this.channel1);
        cleanTableView(this.channel2);
    }

    private void cleanTableView(ServiceUnitStateChannel serviceUnitStateChannel) throws IllegalAccessException {
        ((Map) FieldUtils.readField(serviceUnitStateChannel, "getOwnerRequests", true)).clear();
        ServiceUnitStateTableView tableView = getTableView(serviceUnitStateChannel);
        if (this.serviceUnitStateTableViewClassName.equals(ServiceUnitStateTableViewImpl.class.getCanonicalName())) {
            ((ConcurrentMap) FieldUtils.readField((TableView) FieldUtils.readField(tableView, "tableview", true), "data", true)).clear();
            return;
        }
        MetadataStoreTableView metadataStoreTableView = (MetadataStoreTableView) FieldUtils.readField(tableView, "tableview", true);
        AtomicLong total = ((ServiceUnitStateChannelImpl.Counters) ((Map) FieldUtils.readDeclaredField(serviceUnitStateChannel, "handlerCounters", true)).get(ServiceUnitState.Init)).getTotal();
        AtomicLong atomicLong = new AtomicLong(total.get());
        try {
            Iterator it = metadataStoreTableView.entrySet().iterator();
            while (it.hasNext()) {
                try {
                    metadataStoreTableView.delete((String) ((Map.Entry) it.next()).getKey()).join();
                    atomicLong.incrementAndGet();
                } catch (CompletionException e) {
                    if (!(e.getCause() instanceof MetadataStoreException.NotFoundException)) {
                        throw e;
                    }
                }
            }
            Awaitility.await().ignoreNoExceptions().atMost(5L, TimeUnit.SECONDS).untilAsserted(() -> {
                AssertJUnit.assertEquals(total.get(), atomicLong.get());
            });
        } catch (Exception e2) {
            throw new RuntimeException(e2);
        }
    }

    private void overrideTableViews(String str, ServiceUnitStateData serviceUnitStateData) throws IllegalAccessException {
        overrideTableView(this.channel1, str, serviceUnitStateData);
        overrideTableView(this.channel2, str, serviceUnitStateData);
    }

    @org.testng.annotations.Test(enabled = false)
    public static void overrideTableView(ServiceUnitStateChannel serviceUnitStateChannel, String str, ServiceUnitStateData serviceUnitStateData) throws IllegalAccessException {
        ((Map) FieldUtils.readField(serviceUnitStateChannel, "getOwnerRequests", true)).clear();
        ServiceUnitStateTableView tableView = getTableView(serviceUnitStateChannel);
        Map map = (Map) FieldUtils.readDeclaredField(serviceUnitStateChannel, "handlerCounters", true);
        if (tableView.get(str) != null) {
            long j = ((ServiceUnitStateChannelImpl.Counters) map.get(ServiceUnitState.Init)).getTotal().get();
            AtomicLong atomicLong = new AtomicLong(0L);
            tableView.delete(str).join();
            atomicLong.incrementAndGet();
            Awaitility.await().pollInterval(200L, TimeUnit.MILLISECONDS).atMost(3L, TimeUnit.SECONDS).untilAsserted(() -> {
                AssertJUnit.assertEquals(((ServiceUnitStateChannelImpl.Counters) map.get(ServiceUnitState.Init)).getTotal().get() - j, atomicLong.get());
                AssertJUnit.assertNull(tableView.get(str));
            });
        }
        if (serviceUnitStateData != null) {
            long j2 = ((ServiceUnitStateChannelImpl.Counters) map.get(ServiceUnitStateData.state(serviceUnitStateData))).getTotal().get();
            AtomicLong atomicLong2 = new AtomicLong(0L);
            tableView.put(str, serviceUnitStateData).join();
            atomicLong2.incrementAndGet();
            Awaitility.await().pollInterval(200L, TimeUnit.MILLISECONDS).atMost(3L, TimeUnit.SECONDS).untilAsserted(() -> {
                AssertJUnit.assertEquals(((ServiceUnitStateChannelImpl.Counters) map.get(ServiceUnitStateData.state(serviceUnitStateData))).getTotal().get() - j2, atomicLong2.get());
                AssertJUnit.assertEquals(serviceUnitStateData, tableView.get(str));
            });
        }
    }

    private static void cleanOpsCounters(ServiceUnitStateChannel serviceUnitStateChannel) throws IllegalAccessException {
        for (ServiceUnitStateChannelImpl.Counters counters : ((Map) FieldUtils.readDeclaredField(serviceUnitStateChannel, "handlerCounters", true)).values()) {
            counters.getFailure().set(0L);
            counters.getTotal().set(0L);
        }
        for (ServiceUnitStateChannelImpl.Counters counters2 : ((Map) FieldUtils.readDeclaredField(serviceUnitStateChannel, "eventCounters", true)).values()) {
            counters2.getFailure().set(0L);
            counters2.getTotal().set(0L);
        }
        for (ServiceUnitStateChannelImpl.Counters counters3 : ((Map) FieldUtils.readDeclaredField(serviceUnitStateChannel, "ownerLookUpCounters", true)).values()) {
            counters3.getFailure().set(0L);
            counters3.getTotal().set(0L);
        }
    }

    private void cleanOwnershipMonitorCounters(ServiceUnitStateChannel serviceUnitStateChannel) throws IllegalAccessException {
        FieldUtils.writeDeclaredField(serviceUnitStateChannel, "totalInactiveBrokerCleanupCnt", 0, true);
        FieldUtils.writeDeclaredField(serviceUnitStateChannel, "totalServiceUnitTombstoneCleanupCnt", 0, true);
        FieldUtils.writeDeclaredField(serviceUnitStateChannel, "totalOrphanServiceUnitCleanupCnt", 0, true);
        FieldUtils.writeDeclaredField(serviceUnitStateChannel, "totalCleanupErrorCnt", new AtomicLong(0L), true);
        FieldUtils.writeDeclaredField(serviceUnitStateChannel, "totalInactiveBrokerCleanupScheduledCnt", 0, true);
        FieldUtils.writeDeclaredField(serviceUnitStateChannel, "totalInactiveBrokerCleanupIgnoredCnt", 0, true);
        FieldUtils.writeDeclaredField(serviceUnitStateChannel, "totalInactiveBrokerCleanupCancelledCnt", 0, true);
    }

    private void cleanMetadataState(ServiceUnitStateChannel serviceUnitStateChannel) throws IllegalAccessException {
        ((ServiceUnitStateChannelImpl) serviceUnitStateChannel).handleMetadataSessionEvent(SessionEvent.SessionReestablished);
        FieldUtils.writeDeclaredField(serviceUnitStateChannel, "lastMetadataSessionEventTimestamp", 0L, true);
    }

    private static long getCleanupMetric(ServiceUnitStateChannel serviceUnitStateChannel, String str) throws IllegalAccessException {
        Object readDeclaredField = FieldUtils.readDeclaredField(serviceUnitStateChannel, str, true);
        return readDeclaredField instanceof AtomicLong ? ((AtomicLong) readDeclaredField).get() : ((Long) readDeclaredField).longValue();
    }

    private static void validateHandlerCounters(ServiceUnitStateChannel serviceUnitStateChannel, long j, long j2, long j3, long j4, long j5, long j6, long j7, long j8, long j9, long j10, long j11, long j12, long j13, long j14) throws IllegalAccessException {
        Map map = (Map) FieldUtils.readDeclaredField(serviceUnitStateChannel, "handlerCounters", true);
        Awaitility.await().pollInterval(200L, TimeUnit.MILLISECONDS).atMost(10L, TimeUnit.SECONDS).untilAsserted(() -> {
            AssertJUnit.assertEquals(j, ((ServiceUnitStateChannelImpl.Counters) map.get(ServiceUnitState.Assigning)).getTotal().get());
            AssertJUnit.assertEquals(j2, ((ServiceUnitStateChannelImpl.Counters) map.get(ServiceUnitState.Assigning)).getFailure().get());
            AssertJUnit.assertEquals(j3, ((ServiceUnitStateChannelImpl.Counters) map.get(ServiceUnitState.Owned)).getTotal().get());
            AssertJUnit.assertEquals(j4, ((ServiceUnitStateChannelImpl.Counters) map.get(ServiceUnitState.Owned)).getFailure().get());
            AssertJUnit.assertEquals(j5, ((ServiceUnitStateChannelImpl.Counters) map.get(ServiceUnitState.Releasing)).getTotal().get());
            AssertJUnit.assertEquals(j6, ((ServiceUnitStateChannelImpl.Counters) map.get(ServiceUnitState.Releasing)).getFailure().get());
            AssertJUnit.assertEquals(j7, ((ServiceUnitStateChannelImpl.Counters) map.get(ServiceUnitState.Splitting)).getTotal().get());
            AssertJUnit.assertEquals(j8, ((ServiceUnitStateChannelImpl.Counters) map.get(ServiceUnitState.Splitting)).getFailure().get());
            AssertJUnit.assertEquals(j9, ((ServiceUnitStateChannelImpl.Counters) map.get(ServiceUnitState.Free)).getTotal().get());
            AssertJUnit.assertEquals(j10, ((ServiceUnitStateChannelImpl.Counters) map.get(ServiceUnitState.Free)).getFailure().get());
            AssertJUnit.assertEquals(j11, ((ServiceUnitStateChannelImpl.Counters) map.get(ServiceUnitState.Init)).getTotal().get());
            AssertJUnit.assertEquals(j12, ((ServiceUnitStateChannelImpl.Counters) map.get(ServiceUnitState.Init)).getFailure().get());
            AssertJUnit.assertEquals(j13, ((ServiceUnitStateChannelImpl.Counters) map.get(ServiceUnitState.Deleted)).getTotal().get());
            AssertJUnit.assertEquals(j14, ((ServiceUnitStateChannelImpl.Counters) map.get(ServiceUnitState.Deleted)).getFailure().get());
        });
    }

    private static void validateEventCounters(ServiceUnitStateChannel serviceUnitStateChannel, long j, long j2, long j3, long j4, long j5, long j6) throws IllegalAccessException {
        Map map = (Map) FieldUtils.readDeclaredField(serviceUnitStateChannel, "eventCounters", true);
        Awaitility.await().pollInterval(200L, TimeUnit.MILLISECONDS).atMost(10L, TimeUnit.SECONDS).untilAsserted(() -> {
            AssertJUnit.assertEquals(j, ((ServiceUnitStateChannelImpl.Counters) map.get(ServiceUnitStateChannelImpl.EventType.Assign)).getTotal().get());
            AssertJUnit.assertEquals(j2, ((ServiceUnitStateChannelImpl.Counters) map.get(ServiceUnitStateChannelImpl.EventType.Assign)).getFailure().get());
            AssertJUnit.assertEquals(j3, ((ServiceUnitStateChannelImpl.Counters) map.get(ServiceUnitStateChannelImpl.EventType.Split)).getTotal().get());
            AssertJUnit.assertEquals(j4, ((ServiceUnitStateChannelImpl.Counters) map.get(ServiceUnitStateChannelImpl.EventType.Split)).getFailure().get());
            AssertJUnit.assertEquals(j5, ((ServiceUnitStateChannelImpl.Counters) map.get(ServiceUnitStateChannelImpl.EventType.Unload)).getTotal().get());
            AssertJUnit.assertEquals(j6, ((ServiceUnitStateChannelImpl.Counters) map.get(ServiceUnitStateChannelImpl.EventType.Unload)).getFailure().get());
        });
    }

    private static void validateOwnerLookUpCounters(ServiceUnitStateChannel serviceUnitStateChannel, long j, long j2, long j3, long j4, long j5, long j6, long j7) throws IllegalAccessException {
        Map map = (Map) FieldUtils.readDeclaredField(serviceUnitStateChannel, "ownerLookUpCounters", true);
        Awaitility.await().pollInterval(200L, TimeUnit.MILLISECONDS).atMost(10L, TimeUnit.SECONDS).untilAsserted(() -> {
            AssertJUnit.assertEquals(j, ((ServiceUnitStateChannelImpl.Counters) map.get(ServiceUnitState.Assigning)).getTotal().get());
            AssertJUnit.assertEquals(j2, ((ServiceUnitStateChannelImpl.Counters) map.get(ServiceUnitState.Owned)).getTotal().get());
            AssertJUnit.assertEquals(j3, ((ServiceUnitStateChannelImpl.Counters) map.get(ServiceUnitState.Releasing)).getTotal().get());
            AssertJUnit.assertEquals(j4, ((ServiceUnitStateChannelImpl.Counters) map.get(ServiceUnitState.Splitting)).getTotal().get());
            AssertJUnit.assertEquals(j5, ((ServiceUnitStateChannelImpl.Counters) map.get(ServiceUnitState.Free)).getTotal().get());
            AssertJUnit.assertEquals(j6, ((ServiceUnitStateChannelImpl.Counters) map.get(ServiceUnitState.Deleted)).getTotal().get());
            AssertJUnit.assertEquals(j7, ((ServiceUnitStateChannelImpl.Counters) map.get(ServiceUnitState.Init)).getTotal().get());
        });
    }

    private static void validateMonitorCounters(ServiceUnitStateChannel serviceUnitStateChannel, long j, long j2, long j3, long j4, long j5, long j6, long j7) throws IllegalAccessException {
        AssertJUnit.assertEquals(j, getCleanupMetric(serviceUnitStateChannel, "totalInactiveBrokerCleanupCnt"));
        AssertJUnit.assertEquals(j2, getCleanupMetric(serviceUnitStateChannel, "totalServiceUnitTombstoneCleanupCnt"));
        AssertJUnit.assertEquals(j3, getCleanupMetric(serviceUnitStateChannel, "totalOrphanServiceUnitCleanupCnt"));
        AssertJUnit.assertEquals(j4, getCleanupMetric(serviceUnitStateChannel, "totalCleanupErrorCnt"));
        AssertJUnit.assertEquals(j5, getCleanupMetric(serviceUnitStateChannel, "totalInactiveBrokerCleanupScheduledCnt"));
        AssertJUnit.assertEquals(j6, getCleanupMetric(serviceUnitStateChannel, "totalInactiveBrokerCleanupIgnoredCnt"));
        AssertJUnit.assertEquals(j7, getCleanupMetric(serviceUnitStateChannel, "totalInactiveBrokerCleanupCancelledCnt"));
    }

    ServiceUnitStateChannelImpl createChannel(PulsarService pulsarService) throws IllegalAccessException, PulsarServerException {
        ServiceUnitStateChannelImpl serviceUnitStateChannelImpl = new ServiceUnitStateChannelImpl(pulsarService);
        FieldUtils.writeDeclaredField(serviceUnitStateChannelImpl, "ownershipMonitorDelayTimeInSecs", 5, true);
        ServiceUnitStateChannelImpl serviceUnitStateChannelImpl2 = (ServiceUnitStateChannelImpl) Mockito.spy(serviceUnitStateChannelImpl);
        ((ServiceUnitStateChannelImpl) Mockito.doReturn(this.loadManagerContext).when(serviceUnitStateChannelImpl2)).getContext();
        ((ServiceUnitStateChannelImpl) Mockito.doReturn(this.registry).when(serviceUnitStateChannelImpl2)).getBrokerRegistry();
        ((ServiceUnitStateChannelImpl) Mockito.doReturn(this.loadManager).when(serviceUnitStateChannelImpl2)).getLoadManager();
        ((ServiceUnitStateChannelImpl) Mockito.doReturn(this.pulsarAdmin).when(serviceUnitStateChannelImpl2)).getPulsarAdmin();
        LeaderElectionService leaderElectionService = new LeaderElectionService(pulsarService.getCoordinationService(), pulsarService.getBrokerId(), pulsarService.getSafeWebServiceAddress(), leaderElectionState -> {
            if (leaderElectionState == LeaderElectionState.Leading) {
                serviceUnitStateChannelImpl2.scheduleOwnershipMonitor();
            } else {
                serviceUnitStateChannelImpl2.cancelOwnershipMonitor();
            }
        });
        leaderElectionService.start();
        ((ServiceUnitStateChannelImpl) Mockito.doReturn(leaderElectionService).when(serviceUnitStateChannelImpl2)).getLeaderElectionService();
        return serviceUnitStateChannelImpl2;
    }

    private void disableChannels() {
        this.channel1.disable();
        this.channel2.disable();
    }

    private void enableChannels() {
        this.channel1.enable();
        this.channel2.enable();
    }
}
