/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.awaitility.Awaitility;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.testng.collections.Sets;

@Test(groups={"broker"})
public class TransactionMarkerDeleteTest
extends BrokerTestBase {
    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        this.conf.setTransactionCoordinatorEnabled(true);
        super.baseSetup();
        this.admin.tenants().createTenant("public", (TenantInfo)new TenantInfoImpl(Sets.newHashSet((Object[])new String[]{"appid1"}), Sets.newHashSet((Object[])new String[]{"test"})));
        this.admin.namespaces().createNamespace("public/default");
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testMarkerDeleteTimes() throws Exception {
        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)Mockito.spy((Object)((ManagedLedgerImpl)this.pulsar.getManagedLedgerFactory().open("test")));
        PersistentTopic topic = (PersistentTopic)Mockito.mock(PersistentTopic.class);
        BrokerService brokerService = (BrokerService)Mockito.mock(BrokerService.class);
        PulsarService pulsarService = (PulsarService)Mockito.mock(PulsarService.class);
        ServiceConfiguration configuration = (ServiceConfiguration)Mockito.mock(ServiceConfiguration.class);
        ((PersistentTopic)Mockito.doReturn((Object)brokerService).when((Object)topic)).getBrokerService();
        ((BrokerService)Mockito.doReturn((Object)pulsarService).when((Object)brokerService)).getPulsar();
        ((PulsarService)Mockito.doReturn((Object)configuration).when((Object)pulsarService)).getConfig();
        ((ServiceConfiguration)Mockito.doReturn((Object)false).when((Object)configuration)).isTransactionCoordinatorEnabled();
        ((PersistentTopic)Mockito.doReturn((Object)managedLedger).when((Object)topic)).getManagedLedger();
        ManagedCursor cursor = managedLedger.openCursor("test");
        PersistentSubscription persistentSubscription = (PersistentSubscription)Mockito.spy((Object)new PersistentSubscription(topic, "test", cursor, false));
        Position position = managedLedger.addEntry("test".getBytes());
        persistentSubscription.acknowledgeMessage(Collections.singletonList(position), CommandAck.AckType.Individual, Collections.emptyMap());
        ((ManagedLedgerImpl)Mockito.verify((Object)managedLedger, (VerificationMode)Mockito.times((int)0))).asyncReadEntry((PositionImpl)ArgumentMatchers.any(), (AsyncCallbacks.ReadEntryCallback)ArgumentMatchers.any(), ArgumentMatchers.any());
    }

    @Test
    public void testMarkerDelete() throws Exception {
        MessageMetadata msgMetadata = new MessageMetadata().clear().setPublishTime(1L).setProducerName("test").setSequenceId(0L);
        ByteBuf payload = PooledByteBufAllocator.DEFAULT.buffer(0);
        payload = Commands.serializeMetadataAndPayload((Commands.ChecksumType)Commands.ChecksumType.Crc32c, (MessageMetadata)msgMetadata, (ByteBuf)payload);
        ManagedLedger managedLedger = this.pulsar.getManagedLedgerFactory().open("test");
        PersistentTopic topic = (PersistentTopic)Mockito.mock(PersistentTopic.class);
        ((PersistentTopic)Mockito.doReturn((Object)this.pulsar.getBrokerService()).when((Object)topic)).getBrokerService();
        ((PersistentTopic)Mockito.doReturn((Object)managedLedger).when((Object)topic)).getManagedLedger();
        ((PersistentTopic)Mockito.doReturn((Object)"test").when((Object)topic)).getName();
        ManagedCursor cursor = managedLedger.openCursor("test");
        PersistentSubscription persistentSubscription = new PersistentSubscription(topic, "test", managedLedger.openCursor("test"), false);
        Position position1 = managedLedger.addEntry(payload.array());
        Position markerPosition1 = managedLedger.addEntry(Markers.newTxnCommitMarker((long)1L, (long)1L, (long)1L).array());
        Position position2 = managedLedger.addEntry(payload.array());
        Position markerPosition2 = managedLedger.addEntry(Markers.newTxnAbortMarker((long)1L, (long)1L, (long)1L).array());
        Position position3 = managedLedger.addEntry(payload.array());
        Assert.assertEquals((long)cursor.getNumberOfEntriesInBacklog(true), (long)5L);
        Assert.assertTrue((((PositionImpl)cursor.getMarkDeletedPosition()).compareTo((PositionImpl)position1) < 0 ? 1 : 0) != 0);
        persistentSubscription.acknowledgeMessage(Collections.singletonList(position1), CommandAck.AckType.Individual, Collections.emptyMap());
        Awaitility.await().during(1L, TimeUnit.SECONDS).until(() -> ((PositionImpl)persistentSubscription.getCursor().getMarkDeletedPosition()).compareTo((PositionImpl)markerPosition1) == 0);
        persistentSubscription.acknowledgeMessage(Collections.singletonList(position2), CommandAck.AckType.Individual, Collections.emptyMap());
        Awaitility.await().until(() -> ((PositionImpl)persistentSubscription.getCursor().getMarkDeletedPosition()).compareTo((PositionImpl)markerPosition2) == 0);
        managedLedger.addEntry(Markers.newTxnCommitMarker((long)1L, (long)1L, (long)1L).array());
        managedLedger.addEntry(Markers.newTxnAbortMarker((long)1L, (long)1L, (long)1L).array());
        Position markerPosition3 = managedLedger.addEntry(Markers.newTxnAbortMarker((long)1L, (long)1L, (long)1L).array());
        persistentSubscription.transactionIndividualAcknowledge(new TxnID(0L, 0L), Collections.singletonList(MutablePair.of((Object)((PositionImpl)position3), (Object)0))).get();
        persistentSubscription.endTxn(0L, 0L, 0, 0L).get();
        Awaitility.await().until(() -> ((PositionImpl)persistentSubscription.getCursor().getMarkDeletedPosition()).compareTo((PositionImpl)markerPosition3) == 0);
    }
}

