package org.apache.pulsar.broker.admin;

import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Phaser;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.socks5.auth.DefaultPasswordAuthImpl;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker-admin"})
/* loaded from: input_file:org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.class */
public class AdminApiHealthCheckTest extends MockedPulsarServiceBaseTest {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(AdminApiHealthCheckTest.class);
    private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();

    /* loaded from: input_file:org/apache/pulsar/broker/admin/AdminApiHealthCheckTest$DummyProducerBuilder.class */
    class DummyProducerBuilder<T> extends ProducerBuilderImpl<T> {
        public DummyProducerBuilder(PulsarClientImpl pulsarClientImpl, Schema schema) {
            super(pulsarClientImpl, schema);
        }

        public CompletableFuture<Producer<T>> createAsync() {
            CompletableFuture<Producer<T>> completableFuture = new CompletableFuture<>();
            super.createAsync().thenAccept((Consumer) producer -> {
                Producer producer = (Producer) Mockito.spy(producer);
                ((Producer) Mockito.doReturn(CompletableFuture.completedFuture(MessageId.earliest)).when(producer)).sendAsync(Mockito.any());
                completableFuture.complete(producer);
            }).exceptionally(th -> {
                completableFuture.completeExceptionally(th);
                return null;
            });
            return completableFuture;
        }
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    public void setup() throws Exception {
        super.internalSetup();
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build());
        TenantInfoImpl tenantInfoImpl = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
        this.admin.tenants().createTenant(DefaultPasswordAuthImpl.DEFAULT_PASSWORD, tenantInfoImpl);
        this.admin.namespaces().createNamespace("pulsar/system", Set.of("test"));
        this.admin.tenants().createTenant("public", tenantInfoImpl);
        this.admin.namespaces().createNamespace("public/default", Set.of("test"));
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "topicVersion")
    public static Object[][] topicVersions() {
        return new Object[]{new Object[]{null}, new Object[]{TopicVersion.V1}, new Object[]{TopicVersion.V2}};
    }

    @Test(dataProvider = "topicVersion")
    public void testHealthCheckup(TopicVersion topicVersion) throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        this.pulsar.getExecutor().execute(() -> {
            for (int i = 0; i < 30; i++) {
                if (topicVersion == null) {
                    try {
                        this.admin.brokers().healthcheck();
                    } catch (PulsarAdminException e) {
                        completableFuture.completeExceptionally(e);
                        return;
                    }
                } else {
                    this.admin.brokers().healthcheck(topicVersion);
                }
            }
            completableFuture.complete(null);
        });
        for (int i = 0; i < 30; i++) {
            if (topicVersion == null) {
                this.admin.brokers().healthcheck();
            } else {
                this.admin.brokers().healthcheck(topicVersion);
            }
        }
        String brokerId = this.pulsar.getBrokerId();
        String format = String.format("persistent://%s/%s", topicVersion == TopicVersion.V2 ? NamespaceService.getHeartbeatNamespaceV2(brokerId, this.pulsar.getConfiguration()) : NamespaceService.getHeartbeatNamespace(brokerId, this.pulsar.getConfiguration()), "healthcheck");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertFalse(completableFuture.isCompletedExceptionally());
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertTrue(CollectionUtils.isEmpty((Collection) this.admin.topics().getSubscriptions(format).stream().filter(str -> {
                return !str.equals("__compaction");
            }).collect(Collectors.toList())));
        });
    }

    @Test(expectedExceptions = {PulsarAdminException.class}, expectedExceptionsMessageRegExp = ".*Deadlocked threads detected.*")
    public void testHealthCheckupDetectsDeadlock() throws Exception {
        ReentrantReadWriteLock.WriteLock writeLock = new ReentrantReadWriteLock().writeLock();
        ReentrantReadWriteLock.WriteLock writeLock2 = new ReentrantReadWriteLock().writeLock();
        Phaser phaser = new Phaser(3);
        Thread thread = new Thread(() -> {
            phaser.arriveAndAwaitAdvance();
            try {
                deadlock(writeLock, writeLock2, 1000L);
                phaser.arriveAndDeregister();
            } catch (Throwable th) {
                phaser.arriveAndDeregister();
                throw th;
            }
        }, "deadlockthread-1");
        Thread thread2 = new Thread(() -> {
            phaser.arriveAndAwaitAdvance();
            try {
                deadlock(writeLock2, writeLock, 2000L);
                phaser.arriveAndDeregister();
            } catch (Throwable th) {
                phaser.arriveAndDeregister();
                throw th;
            }
        }, "deadlockthread-2");
        thread.start();
        thread2.start();
        phaser.arriveAndAwaitAdvance();
        Thread.sleep(5000L);
        try {
            this.admin.brokers().healthcheck(TopicVersion.V2);
            thread.interrupt();
            thread2.interrupt();
            phaser.arriveAndAwaitAdvance();
            Awaitility.await().atMost(Duration.ofSeconds(10L)).until(() -> {
                return Boolean.valueOf(this.threadBean.findDeadlockedThreads() == null);
            });
        } catch (Throwable th) {
            thread.interrupt();
            thread2.interrupt();
            phaser.arriveAndAwaitAdvance();
            Awaitility.await().atMost(Duration.ofSeconds(10L)).until(() -> {
                return Boolean.valueOf(this.threadBean.findDeadlockedThreads() == null);
            });
            throw th;
        }
    }

    private void deadlock(Lock lock, Lock lock2, long j) {
        try {
            lock.lockInterruptibly();
            try {
                try {
                    Thread.sleep(j);
                    lock2.lockInterruptibly();
                    lock2.unlock();
                    lock.unlock();
                } catch (Throwable th) {
                    lock.unlock();
                    throw th;
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                lock.unlock();
            }
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
        }
    }

    @Test(timeOut = 5000)
    public void testDeadlockDetectionOverhead() {
        for (int i = 0; i < 1000; i++) {
            Assert.assertNull(this.threadBean.findDeadlockedThreads());
        }
    }

    @Test
    public void testHealthCheckTimeOut() throws Exception {
        String format = String.format("persistent://pulsar/localhost:%s/healthcheck", this.pulsar.getConfig().getWebServicePort().get());
        PulsarClientImpl pulsarClientImpl = (PulsarClient) Mockito.spy(this.pulsar.getClient());
        ((PulsarClient) Mockito.doReturn(new DummyProducerBuilder(pulsarClientImpl, Schema.BYTES)).when(pulsarClientImpl)).newProducer(Schema.STRING);
        Field declaredField = PulsarService.class.getDeclaredField("client");
        declaredField.setAccessible(true);
        declaredField.set(this.pulsar, pulsarClientImpl);
        try {
            this.admin.brokers().healthcheck(TopicVersion.V2);
            throw new Exception("Should not reach here");
        } catch (PulsarAdminException e) {
            log.info("Exception caught", e);
            Assert.assertTrue(e.getMessage().contains("LowOverheadTimeoutException"));
            Awaitility.await().untilAsserted(() -> {
                Assert.assertTrue(CollectionUtils.isEmpty((Collection) this.admin.topics().getSubscriptions(format).stream().filter(str -> {
                    return !str.equals("__compaction");
                }).collect(Collectors.toList())));
            });
        }
    }
}
