/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.transaction.buffer;

import com.google.common.collect.Sets;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.pulsar.broker.intercept.MockBrokerInterceptor;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl;
import org.apache.pulsar.broker.transaction.coordinator.TransactionMetaStoreTestBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class TransactionBufferClientTest
extends TransactionMetaStoreTestBase {
    private static final Logger log = LoggerFactory.getLogger(TransactionBufferClientTest.class);
    private TransactionBufferClient tbClient;
    TopicName partitionedTopicName = TopicName.get((String)"persistent", (String)"public", (String)"test", (String)"tb-client");
    int partitions = 10;
    BrokerService[] brokerServices;
    private static final String namespace = "public/test";
    private EventLoopGroup eventLoopGroup;

    @Override
    protected void afterSetup() throws Exception {
        this.pulsarAdmins[0].clusters().createCluster("my-cluster", new ClusterData(this.pulsarServices[0].getWebServiceAddress()));
        this.pulsarAdmins[0].tenants().createTenant("public", new TenantInfo((Set)Sets.newHashSet(), (Set)Sets.newHashSet((Object[])new String[]{"my-cluster"})));
        this.pulsarAdmins[0].namespaces().createNamespace(namespace, 10);
        this.pulsarAdmins[0].topics().createPartitionedTopic(this.partitionedTopicName.getPartitionedTopicName(), this.partitions);
        this.pulsarClient.newConsumer().topic(new String[]{this.partitionedTopicName.getPartitionedTopicName()}).subscriptionName("test").subscribe();
        this.tbClient = TransactionBufferClientImpl.create((PulsarClient)((PulsarClientImpl)this.pulsarClient), (HashedWheelTimer)new HashedWheelTimer((ThreadFactory)new DefaultThreadFactory("transaction-buffer")));
    }

    @Override
    protected void cleanup() throws Exception {
        if (this.tbClient != null) {
            this.tbClient.close();
        }
        if (this.brokerServices != null) {
            for (BrokerService bs : this.brokerServices) {
                bs.close();
            }
            this.brokerServices = null;
        }
        super.cleanup();
        this.eventLoopGroup.shutdownGracefully().get();
    }

    @Override
    protected void afterPulsarStart() throws Exception {
        this.eventLoopGroup = new NioEventLoopGroup();
        this.brokerServices = new BrokerService[this.pulsarServices.length];
        AtomicLong atomicLong = new AtomicLong(0L);
        for (int i = 0; i < this.pulsarServices.length; ++i) {
            Subscription mockSubscription = (Subscription)Mockito.mock(Subscription.class);
            Mockito.when((Object)mockSubscription.endTxn(Mockito.anyLong(), Mockito.anyLong(), Mockito.anyInt(), Mockito.anyLong())).thenReturn(CompletableFuture.completedFuture(null));
            Topic mockTopic = (Topic)Mockito.mock(Topic.class);
            Mockito.when((Object)mockTopic.endTxn((TxnID)ArgumentMatchers.any(), Mockito.anyInt(), ArgumentMatchers.anyLong())).thenReturn(CompletableFuture.completedFuture(null));
            Mockito.when((Object)mockTopic.getSubscription((String)ArgumentMatchers.any())).thenReturn((Object)mockSubscription);
            ConcurrentOpenHashMap topicMap = (ConcurrentOpenHashMap)Mockito.mock(ConcurrentOpenHashMap.class);
            Mockito.when((Object)topicMap.get((Object)Mockito.anyString())).thenReturn(CompletableFuture.completedFuture(Optional.of(mockTopic)));
            BrokerService brokerService = (BrokerService)Mockito.spy((Object)new BrokerService(this.pulsarServices[i], this.eventLoopGroup));
            ((BrokerService)Mockito.doReturn((Object)new MockBrokerInterceptor()).when((Object)brokerService)).getInterceptor();
            ((BrokerService)Mockito.doReturn((Object)(atomicLong.getAndIncrement() + "")).when((Object)brokerService)).generateUniqueProducerName();
            this.brokerServices[i] = brokerService;
            Mockito.when((Object)brokerService.getTopics()).thenReturn((Object)topicMap);
            Mockito.when((Object)this.pulsarServices[i].getBrokerService()).thenReturn((Object)brokerService);
        }
    }

    @Test
    public void testCommitOnTopic() throws ExecutionException, InterruptedException {
        int i;
        ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>();
        for (i = 0; i < this.partitions; ++i) {
            String topic = this.partitionedTopicName.getPartition(i).toString();
            futures.add(this.tbClient.commitTxnOnTopic(topic, 1L, (long)i, Long.MIN_VALUE));
        }
        for (i = 0; i < futures.size(); ++i) {
            Assert.assertEquals((long)((TxnID)((CompletableFuture)futures.get(i)).get()).getMostSigBits(), (long)1L);
            Assert.assertEquals((long)((TxnID)((CompletableFuture)futures.get(i)).get()).getLeastSigBits(), (long)i);
        }
    }

    @Test
    public void testAbortOnTopic() throws ExecutionException, InterruptedException {
        int i;
        ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>();
        for (i = 0; i < this.partitions; ++i) {
            String topic = this.partitionedTopicName.getPartition(i).toString();
            futures.add(this.tbClient.abortTxnOnTopic(topic, 1L, (long)i, Long.MIN_VALUE));
        }
        for (i = 0; i < futures.size(); ++i) {
            Assert.assertEquals((long)((TxnID)((CompletableFuture)futures.get(i)).get()).getMostSigBits(), (long)1L);
            Assert.assertEquals((long)((TxnID)((CompletableFuture)futures.get(i)).get()).getLeastSigBits(), (long)i);
        }
    }

    @Test
    public void testCommitOnSubscription() throws ExecutionException, InterruptedException {
        int i;
        ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>();
        for (i = 0; i < this.partitions; ++i) {
            String topic = this.partitionedTopicName.getPartition(i).toString();
            futures.add(this.tbClient.commitTxnOnSubscription(topic, "test", 1L, (long)i, -1L));
        }
        for (i = 0; i < futures.size(); ++i) {
            Assert.assertEquals((long)((TxnID)((CompletableFuture)futures.get(i)).get()).getMostSigBits(), (long)1L);
            Assert.assertEquals((long)((TxnID)((CompletableFuture)futures.get(i)).get()).getLeastSigBits(), (long)i);
        }
    }

    @Test
    public void testAbortOnSubscription() throws ExecutionException, InterruptedException {
        int i;
        ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>();
        for (i = 0; i < this.partitions; ++i) {
            String topic = this.partitionedTopicName.getPartition(i).toString();
            futures.add(this.tbClient.abortTxnOnSubscription(topic, "test", 1L, (long)i, -1L));
        }
        for (i = 0; i < futures.size(); ++i) {
            Assert.assertEquals((long)((TxnID)((CompletableFuture)futures.get(i)).get()).getMostSigBits(), (long)1L);
            Assert.assertEquals((long)((TxnID)((CompletableFuture)futures.get(i)).get()).getLeastSigBits(), (long)i);
        }
    }

    @Test
    public void testTransactionBufferOpFail() throws InterruptedException, ExecutionException {
        ConcurrentOpenHashMap[] originalMaps = new ConcurrentOpenHashMap[this.brokerServices.length];
        ConcurrentOpenHashMap topicMap = new ConcurrentOpenHashMap();
        for (int i = 0; i < this.brokerServices.length; ++i) {
            originalMaps[i] = this.brokerServices[i].getTopics();
            Mockito.when((Object)this.brokerServices[i].getTopics()).thenReturn((Object)topicMap);
        }
        try {
            this.tbClient.abortTxnOnSubscription(this.partitionedTopicName.getPartition(0).toString(), "test", 1L, 1L, -1L).get();
            Assert.fail();
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof PulsarClientException.LookupException));
        }
        try {
            this.tbClient.abortTxnOnTopic(this.partitionedTopicName.getPartition(0).toString(), 1L, 1L, -1L).get();
            Assert.fail();
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof PulsarClientException.LookupException));
        }
        for (int i = 0; i < this.brokerServices.length; ++i) {
            Mockito.when((Object)this.brokerServices[i].getTopics()).thenReturn((Object)originalMaps[i]);
        }
        this.tbClient.abortTxnOnSubscription(this.partitionedTopicName.getPartition(0).toString(), "test", 1L, 1L, -1L).get();
        this.tbClient.abortTxnOnTopic(this.partitionedTopicName.getPartition(0).toString(), 1L, 1L, -1L).get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTransactionBufferClientTimeout() throws Exception {
        PulsarClientImpl mockClient = (PulsarClientImpl)Mockito.mock(PulsarClientImpl.class);
        CompletableFuture<ClientCnx> completableFuture = new CompletableFuture<ClientCnx>();
        ClientCnx clientCnx = (ClientCnx)Mockito.mock(ClientCnx.class);
        completableFuture.complete(clientCnx);
        Mockito.when((Object)mockClient.getConnection(ArgumentMatchers.anyString())).thenReturn(completableFuture);
        ChannelHandlerContext cnx = (ChannelHandlerContext)Mockito.mock(ChannelHandlerContext.class);
        Mockito.when((Object)clientCnx.ctx()).thenReturn((Object)cnx);
        Channel channel = (Channel)Mockito.mock(Channel.class);
        Mockito.when((Object)cnx.channel()).thenReturn((Object)channel);
        Mockito.when((Object)channel.isActive()).thenReturn((Object)true);
        HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();
        try {
            TransactionBufferHandlerImpl transactionBufferHandler = new TransactionBufferHandlerImpl((PulsarClient)mockClient, hashedWheelTimer);
            CompletableFuture endFuture = transactionBufferHandler.endTxnOnTopic("test", 1L, 1L, TxnAction.ABORT, 1L);
            Field field = TransactionBufferHandlerImpl.class.getDeclaredField("pendingRequests");
            field.setAccessible(true);
            ConcurrentSkipListMap pendingRequests = (ConcurrentSkipListMap)field.get(transactionBufferHandler);
            Assert.assertEquals((int)pendingRequests.size(), (int)1);
            Awaitility.await().atLeast(2L, TimeUnit.SECONDS).until(() -> {
                if (pendingRequests.size() == 0) {
                    return true;
                }
                return false;
            });
            try {
                endFuture.get();
                Assert.fail();
            }
            catch (Exception e) {
                Assert.assertTrue((boolean)(e.getCause() instanceof TransactionBufferClientException.RequestTimeoutException));
            }
        }
        finally {
            if (Collections.singletonList(hashedWheelTimer).get(0) != null) {
                hashedWheelTimer.stop();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTransactionBufferChannelUnActive() {
        PulsarClientImpl mockClient = (PulsarClientImpl)Mockito.mock(PulsarClientImpl.class);
        CompletableFuture<ClientCnx> completableFuture = new CompletableFuture<ClientCnx>();
        ClientCnx clientCnx = (ClientCnx)Mockito.mock(ClientCnx.class);
        completableFuture.complete(clientCnx);
        Mockito.when((Object)mockClient.getConnection(ArgumentMatchers.anyString())).thenReturn(completableFuture);
        ChannelHandlerContext cnx = (ChannelHandlerContext)Mockito.mock(ChannelHandlerContext.class);
        Mockito.when((Object)clientCnx.ctx()).thenReturn((Object)cnx);
        Channel channel = (Channel)Mockito.mock(Channel.class);
        Mockito.when((Object)cnx.channel()).thenReturn((Object)channel);
        Mockito.when((Object)channel.isActive()).thenReturn((Object)false);
        HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();
        try {
            TransactionBufferHandlerImpl transactionBufferHandler = new TransactionBufferHandlerImpl((PulsarClient)mockClient, hashedWheelTimer);
            try {
                transactionBufferHandler.endTxnOnTopic("test", 1L, 1L, TxnAction.ABORT, 1L).get();
                Assert.fail();
            }
            catch (Exception e) {
                Assert.assertTrue((boolean)(e.getCause() instanceof PulsarClientException.LookupException));
            }
            Mockito.when((Object)channel.isActive()).thenReturn((Object)true);
            try {
                transactionBufferHandler.endTxnOnTopic("test", 1L, 1L, TxnAction.ABORT, 1L).get();
                Assert.fail();
            }
            catch (Exception e) {
                Assert.assertTrue((boolean)(e.getCause() instanceof TransactionBufferClientException.RequestTimeoutException));
            }
        }
        finally {
            if (Collections.singletonList(hashedWheelTimer).get(0) != null) {
                hashedWheelTimer.stop();
            }
        }
    }

    @Test
    public void testTransactionBufferLookUp() throws ExecutionException, InterruptedException {
        String topic = "persistent://public/test/testTransactionBufferLookUp";
        this.tbClient.abortTxnOnSubscription(topic + "_abort_sub", "test", 1L, 1L, -1L).get();
        this.tbClient.commitTxnOnSubscription(topic + "_commit_sub", "test", 1L, 1L, -1L).get();
        this.tbClient.abortTxnOnTopic(topic + "_abort_topic", 1L, 1L, -1L).get();
        this.tbClient.commitTxnOnTopic(topic + "_commit_topic", 1L, 1L, -1L).get();
    }

    @Test
    public void testTransactionBufferHandlerSemaphore() throws Exception {
        Field field = TransactionBufferClientImpl.class.getDeclaredField("tbHandler");
        field.setAccessible(true);
        TransactionBufferHandlerImpl transactionBufferHandler = (TransactionBufferHandlerImpl)field.get(this.tbClient);
        field = TransactionBufferHandlerImpl.class.getDeclaredField("semaphore");
        field.setAccessible(true);
        field.set(transactionBufferHandler, new Semaphore(2));
        String topic = "persistent://public/test/testTransactionBufferLookUp";
        this.tbClient.abortTxnOnSubscription(topic + "_abort_sub", "test", 1L, 1L, -1L).get();
        this.tbClient.abortTxnOnTopic(topic + "_abort_topic", 1L, 1L, -1L).get();
        this.tbClient.commitTxnOnSubscription(topic + "_commit_sub", "test", 1L, 1L, -1L).get();
        this.tbClient.commitTxnOnTopic(topic + "_commit_topic", 1L, 1L, -1L).get();
    }
}

