package org.apache.pulsar.broker.transaction.buffer;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.class */
public class TransactionStablePositionTest extends TransactionTestBase {
    private static final Logger log = LoggerFactory.getLogger(TransactionStablePositionTest.class);
    private static final String TOPIC = "tnx/ns1/test-topic";

    @BeforeMethod
    protected void setup() throws Exception {
        setUpBase(1, 16, TOPIC, 0);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsarClient.getTcClient().getState() == TransactionCoordinatorClient.State.READY);
        });
    }

    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void commitTxnTest() throws Exception {
        Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
        Producer create = this.pulsarClient.newProducer().topic(TOPIC).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{TOPIC}).subscriptionName("test").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).enableBatchIndexAcknowledgment(true).subscriptionType(SubscriptionType.Failover).subscribe();
        create.newMessage().value("test1".getBytes()).send();
        create.newMessage(transaction).value("test2".getBytes()).send();
        create.newMessage().value("test3".getBytes()).send();
        Assert.assertEquals(new String(subscribe.receive(2, TimeUnit.SECONDS).getData()), "test1");
        Assert.assertNull(subscribe.receive(2, TimeUnit.SECONDS));
        transaction.commit().get();
        Assert.assertEquals(new String(subscribe.receive(2, TimeUnit.SECONDS).getData()), "test2");
        Assert.assertEquals(new String(subscribe.receive(2, TimeUnit.SECONDS).getData()), "test3");
        Assert.assertNull(subscribe.receive(2, TimeUnit.SECONDS));
    }

    @Test
    public void abortTxnTest() throws Exception {
        Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
        Producer create = this.pulsarClient.newProducer().topic(TOPIC).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{TOPIC}).subscriptionName("test").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).enableBatchIndexAcknowledgment(true).subscriptionType(SubscriptionType.Failover).subscribe();
        create.newMessage().value("test1".getBytes()).send();
        create.newMessage(transaction).value("test2".getBytes()).send();
        create.newMessage().value("test3".getBytes()).send();
        Assert.assertEquals(new String(subscribe.receive(2, TimeUnit.SECONDS).getData()), "test1");
        Assert.assertNull(subscribe.receive(2, TimeUnit.SECONDS));
        transaction.abort().get();
        Assert.assertEquals(new String(subscribe.receive(2, TimeUnit.SECONDS).getData()), "test3");
        Assert.assertNull(subscribe.receive(2, TimeUnit.SECONDS));
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "enableTransactionAndState")
    public static Object[][] enableTransactionAndState() {
        return new Object[]{new Object[]{true, TopicTransactionBufferState.State.None}, new Object[]{false, TopicTransactionBufferState.State.None}, new Object[]{true, TopicTransactionBufferState.State.Initializing}, new Object[]{false, TopicTransactionBufferState.State.Initializing}};
    }

    @Test(dataProvider = "enableTransactionAndState")
    public void testSyncNormalPositionWhenTBRecover(boolean z, TopicTransactionBufferState.State state) throws Exception {
        String str = "tnx/ns1/testSyncNormalPositionWhenTBRecover-" + z + state.name();
        if (this.pulsarClient != null) {
            this.pulsarClient.shutdown();
        }
        this.pulsarClient = PulsarClient.builder().serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl()).statsInterval(0L, TimeUnit.SECONDS).enableTransaction(z).build();
        Producer create = this.pulsarClient.newProducer(Schema.BYTES).sendTimeout(0, TimeUnit.SECONDS).topic(str).create();
        try {
            TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer) ((PersistentTopic) ((Optional) getPulsarServiceList().get(0).getBrokerService().getTopic(TopicName.get(str).toString(), false).get()).get()).getTransactionBuffer();
            checkTopicTransactionBufferState(z, topicTransactionBuffer);
            Field declaredField = TopicTransactionBufferState.class.getDeclaredField("state");
            declaredField.setAccessible(true);
            declaredField.set(topicTransactionBuffer, state);
            Assert.assertEquals(topicTransactionBuffer.getMaxReadPosition(), PositionImpl.EARLIEST);
            MessageIdImpl send = create.send("test".getBytes());
            Assert.assertEquals(topicTransactionBuffer.getMaxReadPosition(), PositionImpl.EARLIEST);
            Method declaredMethod = TopicTransactionBuffer.class.getDeclaredMethod("recover", new Class[0]);
            declaredMethod.setAccessible(true);
            declaredMethod.invoke(topicTransactionBuffer, new Object[0]);
            declaredField.set(topicTransactionBuffer, TopicTransactionBufferState.State.None);
            checkTopicTransactionBufferState(z, topicTransactionBuffer);
            Assert.assertEquals(PositionImpl.get(send.getLedgerId(), send.getEntryId()), topicTransactionBuffer.getMaxReadPosition());
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    private void checkTopicTransactionBufferState(boolean z, TopicTransactionBuffer topicTransactionBuffer) {
        Awaitility.await().until(() -> {
            return z ? Boolean.valueOf(topicTransactionBuffer.getStats(false).state.equals(TopicTransactionBufferState.State.Ready.name())) : Boolean.valueOf(topicTransactionBuffer.getStats(false).state.equals(TopicTransactionBufferState.State.NoSnapshot.name()));
        });
    }
}
