/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"broker-impl"})
public class BatchMessageIndexAckDisableTest
extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(BatchMessageIndexAckDisableTest.class);

    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        this.conf.setAcknowledgmentAtBatchIndexLevelEnabled(false);
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @DataProvider(name="ackReceiptEnabled")
    public Object[][] ackReceiptEnabled() {
        return new Object[][]{{true}, {false}};
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="ackReceiptEnabled")
    public void testBatchMessageIndexAckForSharedSubscription(boolean ackReceiptEnabled) throws PulsarClientException, ExecutionException, InterruptedException {
        String topic = "testBatchMessageIndexAckForSharedSubscription";
        Consumer consumer = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{"testBatchMessageIndexAckForSharedSubscription"}).subscriptionName("sub").receiverQueueSize(100).subscriptionType(SubscriptionType.Shared).isAckReceiptEnabled(ackReceiptEnabled).ackTimeout(1L, TimeUnit.SECONDS).subscribe();
        try {
            Producer producer = this.pulsarClient.newProducer(Schema.INT32).topic("testBatchMessageIndexAckForSharedSubscription").batchingMaxPublishDelay(50L, TimeUnit.MILLISECONDS).create();
            try {
                int i;
                int messages = 100;
                ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>(100);
                for (i = 0; i < 100; ++i) {
                    futures.add(producer.sendAsync((Object)i));
                }
                FutureUtil.waitForAll(futures).get();
                for (i = 0; i < 100; ++i) {
                    if (i % 2 != 0) continue;
                    consumer.acknowledge(consumer.receive());
                }
                ArrayList<Message> received = new ArrayList<Message>(100);
                for (int i2 = 0; i2 < 100; ++i2) {
                    received.add(consumer.receive());
                }
                Assert.assertEquals((int)received.size(), (int)100);
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="ackReceiptEnabled")
    public void testBatchMessageIndexAckForExclusiveSubscription(boolean ackReceiptEnabled) throws PulsarClientException, ExecutionException, InterruptedException {
        String topic = "testBatchMessageIndexAckForExclusiveSubscription";
        Consumer consumer = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{"testBatchMessageIndexAckForExclusiveSubscription"}).subscriptionName("sub").receiverQueueSize(100).isAckReceiptEnabled(ackReceiptEnabled).subscribe();
        try {
            Producer producer = this.pulsarClient.newProducer(Schema.INT32).topic("testBatchMessageIndexAckForExclusiveSubscription").batchingMaxPublishDelay(50L, TimeUnit.MILLISECONDS).create();
            try {
                int i;
                int messages = 100;
                ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>(100);
                for (i = 0; i < 100; ++i) {
                    futures.add(producer.sendAsync((Object)i));
                }
                FutureUtil.waitForAll(futures).get();
                for (i = 0; i < 100; ++i) {
                    if (i == 49) {
                        consumer.acknowledgeCumulative(consumer.receive());
                        continue;
                    }
                    consumer.receive();
                }
                Thread.sleep(1000L);
                consumer.close();
                consumer = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{"testBatchMessageIndexAckForExclusiveSubscription"}).subscriptionName("sub").receiverQueueSize(100).subscribe();
                ArrayList<Message> received = new ArrayList<Message>(100);
                for (int i2 = 0; i2 < 100; ++i2) {
                    received.add(consumer.receive());
                }
                Assert.assertEquals((int)received.size(), (int)100);
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
    }
}

