package org.apache.pulsar.broker.transaction.buffer;

import com.google.common.collect.Sets;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.class */
public class TransactionBufferClientTest extends TransactionTestBase {
    private static final Logger log = LoggerFactory.getLogger(TransactionBufferClientTest.class);
    private TransactionBufferClient tbClient;
    TopicName partitionedTopicName = TopicName.get("persistent", "public", "test", "tb-client");
    int partitions = 10;
    private static final String namespace = "public/test";

    @BeforeClass(alwaysRun = true)
    protected void setup() throws Exception {
        setBrokerCount(3);
        internalSetup();
        String[] split = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl("http://localhost:" + split[split.length - 1]).build());
        this.admin.tenants().createTenant("public", new TenantInfoImpl(Sets.newHashSet(new String[]{"appid1"}), Sets.newHashSet(new String[]{"test"})));
        this.admin.namespaces().createNamespace(namespace, 10);
        this.admin.topics().createPartitionedTopic(this.partitionedTopicName.getPartitionedTopicName(), this.partitions);
        this.tbClient = TransactionBufferClientImpl.create(this.pulsarServiceList.get(0), new HashedWheelTimer(new DefaultThreadFactory("transaction-buffer")), 1000, 3000L);
    }

    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        this.tbClient.close();
        super.internalCleanup();
    }

    @Test
    public void testCommitOnTopic() throws ExecutionException, InterruptedException {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < this.partitions; i++) {
            arrayList.add(this.tbClient.commitTxnOnTopic(this.partitionedTopicName.getPartition(i).toString(), 1L, i, Long.MIN_VALUE));
        }
        for (int i2 = 0; i2 < arrayList.size(); i2++) {
            Assert.assertEquals(((TxnID) ((CompletableFuture) arrayList.get(i2)).get()).getMostSigBits(), 1L);
            Assert.assertEquals(((TxnID) ((CompletableFuture) arrayList.get(i2)).get()).getLeastSigBits(), i2);
        }
    }

    @Test
    public void testAbortOnTopic() throws ExecutionException, InterruptedException {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < this.partitions; i++) {
            arrayList.add(this.tbClient.abortTxnOnTopic(this.partitionedTopicName.getPartition(i).toString(), 1L, i, Long.MIN_VALUE));
        }
        for (int i2 = 0; i2 < arrayList.size(); i2++) {
            Assert.assertEquals(((TxnID) ((CompletableFuture) arrayList.get(i2)).get()).getMostSigBits(), 1L);
            Assert.assertEquals(((TxnID) ((CompletableFuture) arrayList.get(i2)).get()).getLeastSigBits(), i2);
        }
    }

    @Test
    public void testCommitOnSubscription() throws ExecutionException, InterruptedException {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < this.partitions; i++) {
            arrayList.add(this.tbClient.commitTxnOnSubscription(this.partitionedTopicName.getPartition(i).toString(), "test", 1L, i, -1L));
        }
        for (int i2 = 0; i2 < arrayList.size(); i2++) {
            Assert.assertEquals(((TxnID) ((CompletableFuture) arrayList.get(i2)).get()).getMostSigBits(), 1L);
            Assert.assertEquals(((TxnID) ((CompletableFuture) arrayList.get(i2)).get()).getLeastSigBits(), i2);
        }
    }

    @Test
    public void testAbortOnSubscription() throws ExecutionException, InterruptedException {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < this.partitions; i++) {
            arrayList.add(this.tbClient.abortTxnOnSubscription(this.partitionedTopicName.getPartition(i).toString(), "test", 1L, i, -1L));
        }
        for (int i2 = 0; i2 < arrayList.size(); i2++) {
            Assert.assertEquals(((TxnID) ((CompletableFuture) arrayList.get(i2)).get()).getMostSigBits(), 1L);
            Assert.assertEquals(((TxnID) ((CompletableFuture) arrayList.get(i2)).get()).getLeastSigBits(), i2);
        }
    }

    @Test
    public void testTransactionBufferClientTimeout() throws Exception {
        PulsarService pulsarService = this.pulsarServiceList.get(0);
        final PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) Mockito.mock(PulsarClientImpl.class);
        Mockito.when(pulsarClientImpl.getCnxPool()).thenReturn((ConnectionPool) Mockito.mock(ConnectionPool.class));
        CompletableFuture completableFuture = new CompletableFuture();
        ClientCnx clientCnx = (ClientCnx) Mockito.mock(ClientCnx.class);
        completableFuture.complete(clientCnx);
        Mockito.when(pulsarClientImpl.getConnection(ArgumentMatchers.anyString())).thenReturn(completableFuture);
        Mockito.when(pulsarClientImpl.getConnection(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt())).thenReturn(completableFuture);
        Mockito.when(pulsarClientImpl.getConnection((InetSocketAddress) ArgumentMatchers.any(), (InetSocketAddress) ArgumentMatchers.any(), ArgumentMatchers.anyInt())).thenReturn(completableFuture);
        ChannelHandlerContext channelHandlerContext = (ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class);
        Mockito.when(clientCnx.ctx()).thenReturn(channelHandlerContext);
        Channel channel = (Channel) Mockito.mock(Channel.class);
        Mockito.when(channelHandlerContext.channel()).thenReturn(channel);
        Mockito.when(pulsarService.getClient()).thenAnswer(new Answer<PulsarClient>() { // from class: org.apache.pulsar.broker.transaction.buffer.TransactionBufferClientTest.1
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public PulsarClient m134answer(InvocationOnMock invocationOnMock) throws Throwable {
                return pulsarClientImpl;
            }
        });
        Mockito.when(Boolean.valueOf(channel.isActive())).thenReturn(true);
        HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();
        try {
            TransactionBufferHandlerImpl transactionBufferHandlerImpl = new TransactionBufferHandlerImpl(pulsarService, hashedWheelTimer, 1000, 3000L);
            CompletableFuture endTxnOnTopic = transactionBufferHandlerImpl.endTxnOnTopic("test", 1L, 1L, TxnAction.ABORT, 1L);
            Field declaredField = TransactionBufferHandlerImpl.class.getDeclaredField("outstandingRequests");
            declaredField.setAccessible(true);
            ConcurrentSkipListMap concurrentSkipListMap = (ConcurrentSkipListMap) declaredField.get(transactionBufferHandlerImpl);
            Awaitility.await().atMost(1L, TimeUnit.SECONDS).untilAsserted(() -> {
                Assert.assertEquals(concurrentSkipListMap.size(), 1);
            });
            Awaitility.await().atLeast(2L, TimeUnit.SECONDS).until(() -> {
                return concurrentSkipListMap.size() == 0;
            });
            try {
                endTxnOnTopic.get();
                Assert.fail();
            } catch (Exception e) {
                Assert.assertTrue(e.getCause() instanceof TransactionBufferClientException.RequestTimeoutException);
            }
        } finally {
            if (Collections.singletonList(hashedWheelTimer).get(0) != null) {
                hashedWheelTimer.stop();
            }
        }
    }

    @Test
    public void testTransactionBufferChannelUnActive() throws PulsarServerException {
        PulsarService pulsarService = this.pulsarServiceList.get(0);
        final PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) Mockito.mock(PulsarClientImpl.class);
        Mockito.when(pulsarClientImpl.getCnxPool()).thenReturn((ConnectionPool) Mockito.mock(ConnectionPool.class));
        CompletableFuture completableFuture = new CompletableFuture();
        ClientCnx clientCnx = (ClientCnx) Mockito.mock(ClientCnx.class);
        completableFuture.complete(clientCnx);
        Mockito.when(pulsarClientImpl.getConnection(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt())).thenReturn(completableFuture);
        ChannelHandlerContext channelHandlerContext = (ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class);
        Mockito.when(clientCnx.ctx()).thenReturn(channelHandlerContext);
        Channel channel = (Channel) Mockito.mock(Channel.class);
        Mockito.when(channelHandlerContext.channel()).thenReturn(channel);
        Mockito.when(Boolean.valueOf(channel.isActive())).thenReturn(false);
        Mockito.when(pulsarService.getClient()).thenAnswer(new Answer<PulsarClient>() { // from class: org.apache.pulsar.broker.transaction.buffer.TransactionBufferClientTest.2
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public PulsarClient m135answer(InvocationOnMock invocationOnMock) throws Throwable {
                return pulsarClientImpl;
            }
        });
        HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();
        try {
            TransactionBufferHandlerImpl transactionBufferHandlerImpl = new TransactionBufferHandlerImpl(this.pulsarServiceList.get(0), hashedWheelTimer, 1000, 3000L);
            try {
                transactionBufferHandlerImpl.endTxnOnTopic("test", 1L, 1L, TxnAction.ABORT, 1L).get();
                Assert.fail();
            } catch (Exception e) {
                Assert.assertTrue(e.getCause() instanceof PulsarClientException.LookupException);
            }
            Mockito.when(Boolean.valueOf(channel.isActive())).thenReturn(true);
            try {
                transactionBufferHandlerImpl.endTxnOnTopic("test", 1L, 1L, TxnAction.ABORT, 1L).get();
                Assert.fail();
            } catch (Exception e2) {
                Assert.assertTrue(e2.getCause() instanceof TransactionBufferClientException.RequestTimeoutException);
            }
        } finally {
            if (Collections.singletonList(hashedWheelTimer).get(0) != null) {
                hashedWheelTimer.stop();
            }
        }
    }

    @Test
    public void testTransactionBufferLookUp() throws Exception {
        String str = "persistent://public/test/testTransactionBufferLookUp_abort_sub";
        String str2 = "persistent://public/test/testTransactionBufferLookUp_commit_sub";
        this.admin.topics().createNonPartitionedTopic(str);
        this.admin.topics().createSubscription(str, "test", MessageId.earliest);
        this.admin.topics().createNonPartitionedTopic(str2);
        this.admin.topics().createSubscription(str2, "test", MessageId.earliest);
        this.tbClient.abortTxnOnSubscription(str, "test", 1L, 1L, -1L).get();
        this.tbClient.commitTxnOnSubscription(str2, "test", 1L, 1L, -1L).get();
        this.tbClient.abortTxnOnTopic(str, 1L, 1L, -1L).get();
        this.tbClient.commitTxnOnTopic(str2, 1L, 1L, -1L).get();
    }

    @Test
    public void testTransactionBufferRequestCredits() throws Exception {
        String str = "persistent://public/test/testTransactionBufferRequestCredits_abort_sub";
        String str2 = "persistent://public/test/testTransactionBufferRequestCredits_commit_sub";
        this.admin.topics().createNonPartitionedTopic(str);
        this.admin.topics().createSubscription(str, "test", MessageId.earliest);
        this.admin.topics().createNonPartitionedTopic(str2);
        this.admin.topics().createSubscription(str2, "test", MessageId.earliest);
        this.tbClient.abortTxnOnSubscription(str, "test", 1L, 1L, -1L).get();
        this.tbClient.commitTxnOnSubscription(str2, "test", 1L, 1L, -1L).get();
        this.tbClient.abortTxnOnTopic(str, 1L, 1L, -1L).get();
        this.tbClient.commitTxnOnTopic(str2, 1L, 1L, -1L).get();
        Assert.assertEquals(this.tbClient.getAvailableRequestCredits(), 1000);
    }

    @Test
    public void testTransactionBufferPendingRequests() throws Exception {
    }

    @Test
    public void testEndTopicNotExist() throws Exception {
        this.tbClient.abortTxnOnTopic("persistent://public/test/testEndTopicNotExist_abort_topic", 1L, 1L, -1L).get();
        this.tbClient.commitTxnOnTopic("persistent://public/test/testEndTopicNotExist_commit_topic", 1L, 1L, -1L).get();
        this.tbClient.abortTxnOnSubscription("persistent://public/test/testEndTopicNotExist_abort_topic", "test", 1L, 1L, -1L).get();
        this.tbClient.abortTxnOnSubscription("persistent://public/test/testEndTopicNotExist_commit_topic", "test", 1L, 1L, -1L).get();
    }

    @Test
    public void testEndSubNotExist() throws Exception {
        this.admin.topics().createNonPartitionedTopic("persistent://public/test/testEndTopicNotExist_abort_sub");
        this.admin.topics().createNonPartitionedTopic("persistent://public/test/testEndTopicNotExist_commit_sub");
        this.tbClient.abortTxnOnSubscription("persistent://public/test/testEndTopicNotExist_abort_topic", "test", 1L, 1L, -1L).get();
        this.tbClient.abortTxnOnSubscription("persistent://public/test/testEndTopicNotExist_commit_topic", "test", 1L, 1L, -1L).get();
    }
}
