/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.persistent;

import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.util.Set;
import java.util.TreeSet;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
import org.apache.pulsar.broker.service.persistent.MessageRedeliveryController;
import org.apache.pulsar.common.util.collections.LongPairSet;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class MessageRedeliveryControllerTest {
    @DataProvider(name="allowOutOfOrderDelivery")
    public Object[][] dataProvider() {
        return new Object[][]{{true}, {false}};
    }

    @Test(dataProvider="allowOutOfOrderDelivery", timeOut=10000L)
    public void testAddAndRemove(boolean allowOutOfOrderDelivery) throws Exception {
        MessageRedeliveryController controller = new MessageRedeliveryController(allowOutOfOrderDelivery);
        Field messagesToRedeliverField = MessageRedeliveryController.class.getDeclaredField("messagesToRedeliver");
        messagesToRedeliverField.setAccessible(true);
        LongPairSet messagesToRedeliver = (LongPairSet)messagesToRedeliverField.get(controller);
        Field hashesToBeBlockedField = MessageRedeliveryController.class.getDeclaredField("hashesToBeBlocked");
        hashesToBeBlockedField.setAccessible(true);
        ConcurrentLongLongPairHashMap hashesToBeBlocked = (ConcurrentLongLongPairHashMap)hashesToBeBlockedField.get(controller);
        if (allowOutOfOrderDelivery) {
            Assert.assertNull((Object)hashesToBeBlocked);
        } else {
            Assert.assertNotNull((Object)hashesToBeBlocked);
        }
        Assert.assertTrue((boolean)controller.isEmpty());
        Assert.assertEquals((long)messagesToRedeliver.size(), (long)0L);
        if (!allowOutOfOrderDelivery) {
            Assert.assertEquals((long)hashesToBeBlocked.size(), (long)0L);
        }
        Assert.assertTrue((boolean)controller.add(1L, 1L));
        Assert.assertTrue((boolean)controller.add(1L, 2L));
        Assert.assertFalse((boolean)controller.add(1L, 1L));
        Assert.assertFalse((boolean)controller.isEmpty());
        Assert.assertEquals((long)messagesToRedeliver.size(), (long)2L);
        Assert.assertTrue((boolean)messagesToRedeliver.contains(1L, 1L));
        Assert.assertTrue((boolean)messagesToRedeliver.contains(1L, 2L));
        if (!allowOutOfOrderDelivery) {
            Assert.assertEquals((long)hashesToBeBlocked.size(), (long)0L);
            Assert.assertFalse((boolean)hashesToBeBlocked.containsKey(1L, 1L));
            Assert.assertFalse((boolean)hashesToBeBlocked.containsKey(1L, 2L));
        }
        Assert.assertTrue((boolean)controller.remove(1L, 1L));
        Assert.assertTrue((boolean)controller.remove(1L, 2L));
        Assert.assertFalse((boolean)controller.remove(1L, 1L));
        Assert.assertTrue((boolean)controller.isEmpty());
        Assert.assertEquals((long)messagesToRedeliver.size(), (long)0L);
        Assert.assertFalse((boolean)messagesToRedeliver.contains(1L, 1L));
        Assert.assertFalse((boolean)messagesToRedeliver.contains(1L, 2L));
        if (!allowOutOfOrderDelivery) {
            Assert.assertEquals((long)hashesToBeBlocked.size(), (long)0L);
        }
        Assert.assertTrue((boolean)controller.add(2L, 1L, 100L));
        Assert.assertTrue((boolean)controller.add(2L, 2L, 101L));
        Assert.assertTrue((boolean)controller.add(2L, 3L, 101L));
        Assert.assertFalse((boolean)controller.add(2L, 1L, 100L));
        Assert.assertFalse((boolean)controller.isEmpty());
        Assert.assertEquals((long)messagesToRedeliver.size(), (long)3L);
        Assert.assertTrue((boolean)messagesToRedeliver.contains(2L, 1L));
        Assert.assertTrue((boolean)messagesToRedeliver.contains(2L, 2L));
        Assert.assertTrue((boolean)messagesToRedeliver.contains(2L, 3L));
        if (!allowOutOfOrderDelivery) {
            Assert.assertEquals((long)hashesToBeBlocked.size(), (long)3L);
            Assert.assertEquals((long)hashesToBeBlocked.get((long)2L, (long)1L).first, (long)100L);
            Assert.assertEquals((long)hashesToBeBlocked.get((long)2L, (long)2L).first, (long)101L);
            Assert.assertEquals((long)hashesToBeBlocked.get((long)2L, (long)3L).first, (long)101L);
        }
        controller.clear();
        Assert.assertTrue((boolean)controller.isEmpty());
        Assert.assertEquals((long)messagesToRedeliver.size(), (long)0L);
        Assert.assertTrue((boolean)messagesToRedeliver.isEmpty());
        if (!allowOutOfOrderDelivery) {
            Assert.assertEquals((long)hashesToBeBlocked.size(), (long)0L);
            Assert.assertTrue((boolean)hashesToBeBlocked.isEmpty());
        }
        controller.add(2L, 2L, 201L);
        controller.add(1L, 3L, 100L);
        controller.add(3L, 1L, 300L);
        controller.add(2L, 1L, 200L);
        controller.add(3L, 2L, 301L);
        controller.add(1L, 2L, 101L);
        controller.add(1L, 1L, 100L);
        controller.removeAllUpTo(1L, 3L);
        Assert.assertEquals((long)messagesToRedeliver.size(), (long)4L);
        Assert.assertTrue((boolean)messagesToRedeliver.contains(2L, 1L));
        Assert.assertTrue((boolean)messagesToRedeliver.contains(2L, 2L));
        Assert.assertTrue((boolean)messagesToRedeliver.contains(3L, 1L));
        Assert.assertTrue((boolean)messagesToRedeliver.contains(3L, 2L));
        if (!allowOutOfOrderDelivery) {
            Assert.assertEquals((long)hashesToBeBlocked.size(), (long)4L);
            Assert.assertEquals((long)hashesToBeBlocked.get((long)2L, (long)1L).first, (long)200L);
            Assert.assertEquals((long)hashesToBeBlocked.get((long)2L, (long)2L).first, (long)201L);
            Assert.assertEquals((long)hashesToBeBlocked.get((long)3L, (long)1L).first, (long)300L);
            Assert.assertEquals((long)hashesToBeBlocked.get((long)3L, (long)2L).first, (long)301L);
        }
        controller.removeAllUpTo(3L, 1L);
        Assert.assertEquals((long)messagesToRedeliver.size(), (long)1L);
        Assert.assertTrue((boolean)messagesToRedeliver.contains(3L, 2L));
        if (!allowOutOfOrderDelivery) {
            Assert.assertEquals((long)hashesToBeBlocked.size(), (long)1L);
            Assert.assertEquals((long)hashesToBeBlocked.get((long)3L, (long)2L).first, (long)301L);
        }
        controller.removeAllUpTo(5L, 10L);
        Assert.assertTrue((boolean)controller.isEmpty());
        Assert.assertEquals((long)messagesToRedeliver.size(), (long)0L);
        if (!allowOutOfOrderDelivery) {
            Assert.assertEquals((long)hashesToBeBlocked.size(), (long)0L);
        }
    }

    @Test(dataProvider="allowOutOfOrderDelivery", timeOut=10000L)
    public void testContainsStickyKeyHashes(boolean allowOutOfOrderDelivery) throws Exception {
        MessageRedeliveryController controller = new MessageRedeliveryController(allowOutOfOrderDelivery);
        controller.add(1L, 1L, 100L);
        controller.add(1L, 2L, 101L);
        controller.add(1L, 3L, 102L);
        controller.add(2L, 2L, 103L);
        controller.add(2L, 1L, 104L);
        if (allowOutOfOrderDelivery) {
            Assert.assertFalse((boolean)controller.containsStickyKeyHashes((Set)Sets.newHashSet((Object[])new Integer[]{100})));
            Assert.assertFalse((boolean)controller.containsStickyKeyHashes((Set)Sets.newHashSet((Object[])new Integer[]{101, 102, 103})));
            Assert.assertFalse((boolean)controller.containsStickyKeyHashes((Set)Sets.newHashSet((Object[])new Integer[]{104, 105})));
        } else {
            Assert.assertTrue((boolean)controller.containsStickyKeyHashes((Set)Sets.newHashSet((Object[])new Integer[]{100})));
            Assert.assertTrue((boolean)controller.containsStickyKeyHashes((Set)Sets.newHashSet((Object[])new Integer[]{101, 102, 103})));
            Assert.assertTrue((boolean)controller.containsStickyKeyHashes((Set)Sets.newHashSet((Object[])new Integer[]{104, 105})));
        }
        Assert.assertFalse((boolean)controller.containsStickyKeyHashes((Set)Sets.newHashSet()));
        Assert.assertFalse((boolean)controller.containsStickyKeyHashes((Set)Sets.newHashSet((Object[])new Integer[]{99})));
        Assert.assertFalse((boolean)controller.containsStickyKeyHashes((Set)Sets.newHashSet((Object[])new Integer[]{105, 106})));
    }

    @Test(dataProvider="allowOutOfOrderDelivery", timeOut=10000L)
    public void testGetMessagesToReplayNow(boolean allowOutOfOrderDelivery) throws Exception {
        MessageRedeliveryController controller = new MessageRedeliveryController(allowOutOfOrderDelivery);
        controller.add(2L, 2L);
        controller.add(1L, 3L);
        controller.add(3L, 1L);
        controller.add(2L, 1L);
        controller.add(3L, 2L);
        controller.add(1L, 2L);
        controller.add(1L, 1L);
        if (allowOutOfOrderDelivery) {
            Object[] actual1 = controller.getMessagesToReplayNow(3).toArray(new PositionImpl[3]);
            Object[] expected1 = new PositionImpl[]{PositionImpl.get((long)1L, (long)1L), PositionImpl.get((long)1L, (long)2L), PositionImpl.get((long)1L, (long)3L)};
            Assert.assertEqualsNoOrder((Object[])actual1, (Object[])expected1);
        } else {
            Set actual2 = controller.getMessagesToReplayNow(6);
            TreeSet<PositionImpl> expected2 = new TreeSet<PositionImpl>();
            expected2.add(PositionImpl.get((long)1L, (long)1L));
            expected2.add(PositionImpl.get((long)1L, (long)2L));
            expected2.add(PositionImpl.get((long)1L, (long)3L));
            expected2.add(PositionImpl.get((long)2L, (long)1L));
            expected2.add(PositionImpl.get((long)2L, (long)2L));
            expected2.add(PositionImpl.get((long)3L, (long)1L));
            Assert.assertEquals((Set)actual2, expected2);
        }
    }
}

