package org.apache.pulsar.broker.transaction.buffer;

import com.google.common.collect.Sets;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.pulsar.broker.intercept.MockBrokerInterceptor;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl;
import org.apache.pulsar.broker.transaction.coordinator.TransactionMetaStoreTestBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.class */
public class TransactionBufferClientTest extends TransactionMetaStoreTestBase {
    private static final Logger log = LoggerFactory.getLogger(TransactionBufferClientTest.class);
    private TransactionBufferClient tbClient;
    TopicName partitionedTopicName = TopicName.get("persistent", "public", "test", "tb-client");
    int partitions = 10;
    BrokerService[] brokerServices;
    private static final String namespace = "public/test";
    private EventLoopGroup eventLoopGroup;

    @Override // org.apache.pulsar.broker.transaction.coordinator.TransactionMetaStoreTestBase
    protected void afterSetup() throws Exception {
        this.pulsarAdmins[0].clusters().createCluster("my-cluster", ClusterData.builder().serviceUrl(this.pulsarServices[0].getWebServiceAddress()).build());
        this.pulsarAdmins[0].tenants().createTenant("public", new TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet(new String[]{"my-cluster"})));
        this.pulsarAdmins[0].namespaces().createNamespace(namespace, 10);
        this.pulsarAdmins[0].topics().createPartitionedTopic(this.partitionedTopicName.getPartitionedTopicName(), this.partitions);
        this.pulsarClient.newConsumer().topic(new String[]{this.partitionedTopicName.getPartitionedTopicName()}).subscriptionName("test").subscribe();
        this.tbClient = TransactionBufferClientImpl.create(this.pulsarClient, new HashedWheelTimer(new DefaultThreadFactory("transaction-buffer")));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.broker.transaction.coordinator.TransactionMetaStoreTestBase
    public void cleanup() throws Exception {
        if (this.tbClient != null) {
            this.tbClient.close();
        }
        if (this.brokerServices != null) {
            for (BrokerService brokerService : this.brokerServices) {
                brokerService.close();
            }
            this.brokerServices = null;
        }
        super.cleanup();
        this.eventLoopGroup.shutdownGracefully().get();
    }

    @Override // org.apache.pulsar.broker.transaction.coordinator.TransactionMetaStoreTestBase
    protected void afterPulsarStart() throws Exception {
        this.eventLoopGroup = new NioEventLoopGroup();
        this.brokerServices = new BrokerService[this.pulsarServices.length];
        AtomicLong atomicLong = new AtomicLong(0L);
        for (int i = 0; i < this.pulsarServices.length; i++) {
            Subscription subscription = (Subscription) Mockito.mock(Subscription.class);
            Mockito.when(subscription.endTxn(Mockito.anyLong(), Mockito.anyLong(), Mockito.anyInt(), Mockito.anyLong())).thenReturn(CompletableFuture.completedFuture(null));
            Topic topic = (Topic) Mockito.mock(Topic.class);
            Mockito.when(topic.endTxn((TxnID) ArgumentMatchers.any(), Mockito.anyInt(), ArgumentMatchers.anyLong())).thenReturn(CompletableFuture.completedFuture(null));
            Mockito.when(topic.getSubscription((String) ArgumentMatchers.any())).thenReturn(subscription);
            ConcurrentOpenHashMap concurrentOpenHashMap = (ConcurrentOpenHashMap) Mockito.mock(ConcurrentOpenHashMap.class);
            Mockito.when(concurrentOpenHashMap.get(Mockito.anyString())).thenReturn(CompletableFuture.completedFuture(Optional.of(topic)));
            BrokerService brokerService = (BrokerService) Mockito.spy(new BrokerService(this.pulsarServices[i], this.eventLoopGroup));
            ((BrokerService) Mockito.doReturn(new MockBrokerInterceptor()).when(brokerService)).getInterceptor();
            ((BrokerService) Mockito.doReturn(atomicLong.getAndIncrement() + "").when(brokerService)).generateUniqueProducerName();
            this.brokerServices[i] = brokerService;
            Mockito.when(brokerService.getTopics()).thenReturn(concurrentOpenHashMap);
            Mockito.when(this.pulsarServices[i].getBrokerService()).thenReturn(brokerService);
        }
    }

    @Test
    public void testCommitOnTopic() throws ExecutionException, InterruptedException {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < this.partitions; i++) {
            arrayList.add(this.tbClient.commitTxnOnTopic(this.partitionedTopicName.getPartition(i).toString(), 1L, i, Long.MIN_VALUE));
        }
        for (int i2 = 0; i2 < arrayList.size(); i2++) {
            Assert.assertEquals(((TxnID) ((CompletableFuture) arrayList.get(i2)).get()).getMostSigBits(), 1L);
            Assert.assertEquals(((TxnID) ((CompletableFuture) arrayList.get(i2)).get()).getLeastSigBits(), i2);
        }
    }

    @Test
    public void testAbortOnTopic() throws ExecutionException, InterruptedException {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < this.partitions; i++) {
            arrayList.add(this.tbClient.abortTxnOnTopic(this.partitionedTopicName.getPartition(i).toString(), 1L, i, Long.MIN_VALUE));
        }
        for (int i2 = 0; i2 < arrayList.size(); i2++) {
            Assert.assertEquals(((TxnID) ((CompletableFuture) arrayList.get(i2)).get()).getMostSigBits(), 1L);
            Assert.assertEquals(((TxnID) ((CompletableFuture) arrayList.get(i2)).get()).getLeastSigBits(), i2);
        }
    }

    @Test
    public void testCommitOnSubscription() throws ExecutionException, InterruptedException {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < this.partitions; i++) {
            arrayList.add(this.tbClient.commitTxnOnSubscription(this.partitionedTopicName.getPartition(i).toString(), "test", 1L, i, -1L));
        }
        for (int i2 = 0; i2 < arrayList.size(); i2++) {
            Assert.assertEquals(((TxnID) ((CompletableFuture) arrayList.get(i2)).get()).getMostSigBits(), 1L);
            Assert.assertEquals(((TxnID) ((CompletableFuture) arrayList.get(i2)).get()).getLeastSigBits(), i2);
        }
    }

    @Test
    public void testAbortOnSubscription() throws ExecutionException, InterruptedException {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < this.partitions; i++) {
            arrayList.add(this.tbClient.abortTxnOnSubscription(this.partitionedTopicName.getPartition(i).toString(), "test", 1L, i, -1L));
        }
        for (int i2 = 0; i2 < arrayList.size(); i2++) {
            Assert.assertEquals(((TxnID) ((CompletableFuture) arrayList.get(i2)).get()).getMostSigBits(), 1L);
            Assert.assertEquals(((TxnID) ((CompletableFuture) arrayList.get(i2)).get()).getLeastSigBits(), i2);
        }
    }

    @Test
    public void testTransactionBufferOpFail() throws InterruptedException, ExecutionException {
        ConcurrentOpenHashMap[] concurrentOpenHashMapArr = new ConcurrentOpenHashMap[this.brokerServices.length];
        ConcurrentOpenHashMap concurrentOpenHashMap = new ConcurrentOpenHashMap();
        for (int i = 0; i < this.brokerServices.length; i++) {
            concurrentOpenHashMapArr[i] = this.brokerServices[i].getTopics();
            Mockito.when(this.brokerServices[i].getTopics()).thenReturn(concurrentOpenHashMap);
        }
        try {
            this.tbClient.abortTxnOnSubscription(this.partitionedTopicName.getPartition(0).toString(), "test", 1L, 1L, -1L).get();
            Assert.fail();
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof PulsarClientException.LookupException);
        }
        try {
            this.tbClient.abortTxnOnTopic(this.partitionedTopicName.getPartition(0).toString(), 1L, 1L, -1L).get();
            Assert.fail();
        } catch (ExecutionException e2) {
            Assert.assertTrue(e2.getCause() instanceof PulsarClientException.LookupException);
        }
        for (int i2 = 0; i2 < this.brokerServices.length; i2++) {
            Mockito.when(this.brokerServices[i2].getTopics()).thenReturn(concurrentOpenHashMapArr[i2]);
        }
        this.tbClient.abortTxnOnSubscription(this.partitionedTopicName.getPartition(0).toString(), "test", 1L, 1L, -1L).get();
        this.tbClient.abortTxnOnTopic(this.partitionedTopicName.getPartition(0).toString(), 1L, 1L, -1L).get();
    }

    @Test
    public void testTransactionBufferClientTimeout() throws Exception {
        PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) Mockito.mock(PulsarClientImpl.class);
        CompletableFuture completableFuture = new CompletableFuture();
        ClientCnx clientCnx = (ClientCnx) Mockito.mock(ClientCnx.class);
        completableFuture.complete(clientCnx);
        Mockito.when(pulsarClientImpl.getConnection(ArgumentMatchers.anyString())).thenReturn(completableFuture);
        ChannelHandlerContext channelHandlerContext = (ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class);
        Mockito.when(clientCnx.ctx()).thenReturn(channelHandlerContext);
        Channel channel = (Channel) Mockito.mock(Channel.class);
        Mockito.when(channelHandlerContext.channel()).thenReturn(channel);
        Mockito.when(Boolean.valueOf(channel.isActive())).thenReturn(true);
        HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();
        try {
            TransactionBufferHandlerImpl transactionBufferHandlerImpl = new TransactionBufferHandlerImpl(pulsarClientImpl, hashedWheelTimer);
            CompletableFuture endTxnOnTopic = transactionBufferHandlerImpl.endTxnOnTopic("test", 1L, 1L, TxnAction.ABORT, 1L);
            Field declaredField = TransactionBufferHandlerImpl.class.getDeclaredField("pendingRequests");
            declaredField.setAccessible(true);
            ConcurrentSkipListMap concurrentSkipListMap = (ConcurrentSkipListMap) declaredField.get(transactionBufferHandlerImpl);
            Assert.assertEquals(concurrentSkipListMap.size(), 1);
            Awaitility.await().atLeast(2L, TimeUnit.SECONDS).until(() -> {
                return concurrentSkipListMap.size() == 0;
            });
            try {
                endTxnOnTopic.get();
                Assert.fail();
            } catch (Exception e) {
                Assert.assertTrue(e.getCause() instanceof TransactionBufferClientException.RequestTimeoutException);
            }
        } finally {
            if (Collections.singletonList(hashedWheelTimer).get(0) != null) {
                hashedWheelTimer.stop();
            }
        }
    }

    @Test
    public void testTransactionBufferChannelUnActive() {
        PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) Mockito.mock(PulsarClientImpl.class);
        CompletableFuture completableFuture = new CompletableFuture();
        ClientCnx clientCnx = (ClientCnx) Mockito.mock(ClientCnx.class);
        completableFuture.complete(clientCnx);
        Mockito.when(pulsarClientImpl.getConnection(ArgumentMatchers.anyString())).thenReturn(completableFuture);
        ChannelHandlerContext channelHandlerContext = (ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class);
        Mockito.when(clientCnx.ctx()).thenReturn(channelHandlerContext);
        Channel channel = (Channel) Mockito.mock(Channel.class);
        Mockito.when(channelHandlerContext.channel()).thenReturn(channel);
        Mockito.when(Boolean.valueOf(channel.isActive())).thenReturn(false);
        HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();
        try {
            TransactionBufferHandlerImpl transactionBufferHandlerImpl = new TransactionBufferHandlerImpl(pulsarClientImpl, hashedWheelTimer);
            try {
                transactionBufferHandlerImpl.endTxnOnTopic("test", 1L, 1L, TxnAction.ABORT, 1L).get();
                Assert.fail();
            } catch (Exception e) {
                Assert.assertTrue(e.getCause() instanceof PulsarClientException.LookupException);
            }
            Mockito.when(Boolean.valueOf(channel.isActive())).thenReturn(true);
            try {
                transactionBufferHandlerImpl.endTxnOnTopic("test", 1L, 1L, TxnAction.ABORT, 1L).get();
                Assert.fail();
            } catch (Exception e2) {
                Assert.assertTrue(e2.getCause() instanceof TransactionBufferClientException.RequestTimeoutException);
            }
        } finally {
            if (Collections.singletonList(hashedWheelTimer).get(0) != null) {
                hashedWheelTimer.stop();
            }
        }
    }

    @Test
    public void testTransactionBufferLookUp() throws ExecutionException, InterruptedException {
        this.tbClient.abortTxnOnSubscription("persistent://public/test/testTransactionBufferLookUp_abort_sub", "test", 1L, 1L, -1L).get();
        this.tbClient.commitTxnOnSubscription("persistent://public/test/testTransactionBufferLookUp_commit_sub", "test", 1L, 1L, -1L).get();
        this.tbClient.abortTxnOnTopic("persistent://public/test/testTransactionBufferLookUp_abort_topic", 1L, 1L, -1L).get();
        this.tbClient.commitTxnOnTopic("persistent://public/test/testTransactionBufferLookUp_commit_topic", 1L, 1L, -1L).get();
    }

    @Test
    public void testTransactionBufferHandlerSemaphore() throws Exception {
        Field declaredField = TransactionBufferClientImpl.class.getDeclaredField("tbHandler");
        declaredField.setAccessible(true);
        TransactionBufferHandlerImpl transactionBufferHandlerImpl = (TransactionBufferHandlerImpl) declaredField.get(this.tbClient);
        Field declaredField2 = TransactionBufferHandlerImpl.class.getDeclaredField("semaphore");
        declaredField2.setAccessible(true);
        declaredField2.set(transactionBufferHandlerImpl, new Semaphore(2));
        this.tbClient.abortTxnOnSubscription("persistent://public/test/testTransactionBufferLookUp_abort_sub", "test", 1L, 1L, -1L).get();
        this.tbClient.abortTxnOnTopic("persistent://public/test/testTransactionBufferLookUp_abort_topic", 1L, 1L, -1L).get();
        this.tbClient.commitTxnOnSubscription("persistent://public/test/testTransactionBufferLookUp_commit_sub", "test", 1L, 1L, -1L).get();
        this.tbClient.commitTxnOnTopic("persistent://public/test/testTransactionBufferLookUp_commit_topic", 1L, 1L, -1L).get();
    }
}
