/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.net.BookieId;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"broker-impl"})
public class BatchMessageIndexAckTest
extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(BatchMessageIndexAckTest.class);

    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        this.conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
        super.internalSetup();
        super.producerBaseSetup();
        ((MockedPulsarServiceBaseTest.NonClosableMockBookKeeper)((Object)Mockito.doReturn(CompletableFuture.completedFuture(new LedgerMetadata(){

            public long getLedgerId() {
                return 0L;
            }

            public int getEnsembleSize() {
                return 0;
            }

            public int getWriteQuorumSize() {
                return 0;
            }

            public int getAckQuorumSize() {
                return 0;
            }

            public long getLastEntryId() {
                return 0L;
            }

            public long getLength() {
                return 0L;
            }

            public boolean hasPassword() {
                return false;
            }

            public byte[] getPassword() {
                return new byte[0];
            }

            public DigestType getDigestType() {
                return null;
            }

            public long getCtime() {
                return 0L;
            }

            public boolean isClosed() {
                return false;
            }

            public Map<String, byte[]> getCustomMetadata() {
                return null;
            }

            public List<BookieId> getEnsembleAt(long entryId) {
                return null;
            }

            public NavigableMap<Long, ? extends List<BookieId>> getAllEnsembles() {
                return null;
            }

            public LedgerMetadata.State getState() {
                return null;
            }

            public String toSafeString() {
                return null;
            }

            public int getMetadataFormatVersion() {
                return 0;
            }

            public long getCToken() {
                return 0L;
            }
        })).when((Object)this.mockBookKeeper))).getLedgerMetadata(ArgumentMatchers.anyLong());
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @DataProvider(name="ackReceiptEnabled")
    public Object[][] ackReceiptEnabled() {
        return new Object[][]{{true}, {false}};
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="ackReceiptEnabled")
    public void testBatchMessageIndexAckForSharedSubscription(boolean ackReceiptEnabled) throws Exception {
        String topic = "testBatchMessageIndexAckForSharedSubscription";
        String subscriptionName = "sub";
        Consumer consumer = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{"testBatchMessageIndexAckForSharedSubscription"}).subscriptionName("sub").receiverQueueSize(100).isAckReceiptEnabled(ackReceiptEnabled).subscriptionType(SubscriptionType.Shared).enableBatchIndexAcknowledgment(true).negativeAckRedeliveryDelay(2L, TimeUnit.SECONDS).subscribe();
        try {
            Producer producer = this.pulsarClient.newProducer(Schema.INT32).topic("testBatchMessageIndexAckForSharedSubscription").batchingMaxPublishDelay(50L, TimeUnit.MILLISECONDS).create();
            try {
                int i;
                int messages = 100;
                ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>(100);
                for (int i2 = 0; i2 < 100; ++i2) {
                    futures.add(producer.sendAsync((Object)i2));
                }
                FutureUtil.waitForAll(futures).get();
                ArrayList<MessageId> acked = new ArrayList<MessageId>(50);
                for (int i3 = 0; i3 < 100; ++i3) {
                    Message msg = consumer.receive();
                    if (i3 % 2 == 0) {
                        consumer.acknowledge(msg);
                        acked.add(msg.getMessageId());
                        continue;
                    }
                    consumer.negativeAcknowledge(consumer.receive());
                }
                ArrayList<MessageId> received = new ArrayList<MessageId>(50);
                for (int i4 = 0; i4 < 50; ++i4) {
                    received.add(consumer.receive().getMessageId());
                }
                Assert.assertEquals((int)received.size(), (int)50);
                acked.retainAll(received);
                Assert.assertEquals((int)acked.size(), (int)0);
                for (MessageId messageId : received) {
                    consumer.acknowledge(messageId);
                }
                Thread.sleep(1000L);
                consumer.redeliverUnacknowledgedMessages();
                Message moreMessage = consumer.receive(2, TimeUnit.SECONDS);
                Assert.assertNull((Object)moreMessage);
                BatchMessageIdImpl ackedMessageId = (BatchMessageIdImpl)received.get(0);
                PersistentTopicInternalStats stats = this.admin.topics().getInternalStats("testBatchMessageIndexAckForSharedSubscription", false);
                String markDeletePosition = ((PersistentTopicInternalStats.CursorStats)stats.cursors.get((Object)"sub")).markDeletePosition;
                Assert.assertEquals((String)(ackedMessageId.ledgerId + ":" + ackedMessageId.entryId), (String)markDeletePosition);
                futures.clear();
                for (i = 0; i < 50; ++i) {
                    futures.add(producer.sendAsync((Object)i));
                }
                FutureUtil.waitForAll(futures).get();
                for (i = 0; i < 50; ++i) {
                    received.add(consumer.receive().getMessageId());
                }
                Assert.assertEquals((int)received.size(), (int)100);
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="ackReceiptEnabled")
    public void testBatchMessageIndexAckForExclusiveSubscription(boolean ackReceiptEnabled) throws PulsarClientException, ExecutionException, InterruptedException {
        String topic = "testBatchMessageIndexAckForExclusiveSubscription";
        Consumer consumer = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{"testBatchMessageIndexAckForExclusiveSubscription"}).subscriptionName("sub").receiverQueueSize(100).isAckReceiptEnabled(ackReceiptEnabled).enableBatchIndexAcknowledgment(true).subscribe();
        try {
            Producer producer = this.pulsarClient.newProducer(Schema.INT32).topic("testBatchMessageIndexAckForExclusiveSubscription").batchingMaxPublishDelay(50L, TimeUnit.MILLISECONDS).create();
            try {
                int i;
                int i2;
                int messages = 100;
                ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>(100);
                for (i2 = 0; i2 < 100; ++i2) {
                    futures.add(producer.sendAsync((Object)i2));
                }
                FutureUtil.waitForAll(futures).get();
                for (i2 = 0; i2 < 100; ++i2) {
                    if (i2 == 49) {
                        consumer.acknowledgeCumulative(consumer.receive());
                        continue;
                    }
                    consumer.receive();
                }
                Thread.sleep(1000L);
                consumer.close();
                consumer = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{"testBatchMessageIndexAckForExclusiveSubscription"}).subscriptionName("sub").receiverQueueSize(100).subscribe();
                ArrayList<Message> received = new ArrayList<Message>(50);
                for (int i3 = 0; i3 < 50; ++i3) {
                    received.add(consumer.receive());
                }
                Assert.assertEquals((int)received.size(), (int)50);
                Message moreMessage = consumer.receive(1, TimeUnit.SECONDS);
                Assert.assertNull((Object)moreMessage);
                futures.clear();
                for (i = 0; i < 50; ++i) {
                    futures.add(producer.sendAsync((Object)i));
                }
                FutureUtil.waitForAll(futures).get();
                for (i = 0; i < 50; ++i) {
                    received.add(consumer.receive());
                }
                Assert.assertEquals((int)received.size(), (int)100);
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
    }

    @Test
    public void testDoNotRecycleAckSetMultipleTimes() throws Exception {
        int i;
        String topic = "persistent://my-property/my-ns/testSafeAckSetRecycle";
        Producer producer = this.pulsarClient.newProducer().batchingMaxMessages(10).blockIfQueueFull(true).topic("persistent://my-property/my-ns/testSafeAckSetRecycle").create();
        Consumer consumer = this.pulsarClient.newConsumer().acknowledgmentGroupTime(1L, TimeUnit.MILLISECONDS).topic(new String[]{"persistent://my-property/my-ns/testSafeAckSetRecycle"}).enableBatchIndexAcknowledgment(true).subscriptionName("test").subscribe();
        int messages = 100;
        for (i = 0; i < 100; ++i) {
            producer.sendAsync((Object)"Hello Pulsar".getBytes());
        }
        for (i = 0; i < 100; ++i) {
            consumer.acknowledgeCumulative(consumer.receive());
            Thread.sleep(2L);
        }
        producer.close();
        consumer.close();
    }
}

