package com.gemstone.gemfire.internal.cache.ha;

import com.gemstone.gemfire.LogWriter;
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.CacheListener;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.i18n.StringIdImpl;
import com.gemstone.gemfire.internal.cache.Conflatable;
import com.gemstone.gemfire.internal.cache.EventID;
import com.gemstone.gemfire.internal.cache.ha.HARegionQueue;
import io.snappydata.test.dunit.DistributedTestBase;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/gemstone/gemfire/internal/cache/ha/HARQAddOperationJUnitTest.class */
public class HARQAddOperationJUnitTest extends TestCase {
    protected Cache cache;
    protected LogWriter logger;
    private HARegionQueue rq;
    protected static final String KEY1 = "Key-1";
    protected static final String KEY2 = "Key-2";
    protected static final String VALUE1 = "Value-1";
    protected static final String VALUE2 = "Value-2";
    protected boolean testFailed;
    protected StringBuffer message;
    protected int barrierCount;
    static volatile int expiryCount = 0;

    public HARQAddOperationJUnitTest(String str) {
        super(str);
        this.cache = null;
        this.logger = null;
        this.rq = null;
        this.testFailed = false;
        this.message = null;
        this.barrierCount = 0;
    }

    protected void setUp() throws Exception {
        super.setUp();
        this.cache = createCache();
        this.logger = this.cache.getLogger();
    }

    protected void tearDown() throws Exception {
        super.tearDown();
        this.cache.close();
    }

    private Cache createCache() throws CacheException {
        return CacheFactory.create(DistributedSystem.connect(new Properties()));
    }

    protected HARegionQueue createHARegionQueue(String str) throws IOException, ClassNotFoundException, CacheException, InterruptedException {
        AttributesFactory attributesFactory = new AttributesFactory();
        attributesFactory.setDataPolicy(DataPolicy.REPLICATE);
        attributesFactory.setScope(Scope.DISTRIBUTED_ACK);
        return HARegionQueue.getHARegionQueueInstance(str, this.cache, 2, false);
    }

    protected HARegionQueue createHARegionQueue(String str, HARegionQueueAttributes hARegionQueueAttributes) throws IOException, ClassNotFoundException, CacheException, InterruptedException {
        return HARegionQueue.getHARegionQueueInstance(str, this.cache, hARegionQueueAttributes, 2, false);
    }

    public void testQueueAddOperationWithConflation() throws Exception {
        this.logger.info("HARegionQueueJUnitTest : testQueueAddOperationWithConflation BEGIN");
        this.rq = createHARegionQueue("testQueueAddOperationWithConflation");
        EventID eventID = new EventID(new byte[]{1}, 1L, 1L);
        EventID eventID2 = new EventID(new byte[]{1}, 1L, 2L);
        ConflatableObject conflatableObject = new ConflatableObject(KEY1, VALUE1, eventID, true, "region1");
        ConflatableObject conflatableObject2 = new ConflatableObject(KEY1, VALUE2, eventID2, true, "region1");
        this.rq.put(conflatableObject);
        this.rq.put(conflatableObject2);
        Map map = (Map) this.rq.getConflationMapForTesting().get("region1");
        assertEquals(1, map.size());
        assertEquals(VALUE2, ((ConflatableObject) this.rq.getRegion().get((Long) map.get(KEY1))).getValueToConflate());
        assertEquals(1, this.rq.getAvalaibleIds().size());
        assertEquals(1, this.rq.getCurrentCounterSet(eventID).size());
        this.logger.info("HARegionQueueJUnitTest : testQueueAddOperationWithConflation END");
    }

    public void testQueueAddOperationWithoutConflation() throws Exception {
        this.logger.info("HARegionQueueJUnitTest : testQueueAddOperationWithoutConflation BEGIN");
        this.rq = createHARegionQueue("testQueueAddOperationWithConflation");
        EventID eventID = new EventID(new byte[]{1}, 1L, 1L);
        EventID eventID2 = new EventID(new byte[]{1}, 1L, 2L);
        ConflatableObject conflatableObject = new ConflatableObject(KEY1, VALUE1, eventID, false, "region1");
        ConflatableObject conflatableObject2 = new ConflatableObject(KEY2, VALUE2, eventID2, false, "region1");
        this.rq.put(conflatableObject);
        assertNull(this.rq.getConflationMapForTesting().get("region1"));
        assertEquals(1, this.rq.getAvalaibleIds().size());
        assertEquals(1, this.rq.getCurrentCounterSet(eventID).size());
        this.rq.put(conflatableObject2);
        assertNull(this.rq.getConflationMapForTesting().get("region1"));
        assertEquals(2, this.rq.getAvalaibleIds().size());
        assertEquals(2, this.rq.getCurrentCounterSet(eventID).size());
        Iterator it = this.rq.getCurrentCounterSet(eventID).iterator();
        if (it.hasNext()) {
            ConflatableObject conflatableObject3 = (ConflatableObject) this.rq.getRegion().get((Long) it.next());
            assertEquals(KEY1, conflatableObject3.getKeyToConflate());
            assertEquals(VALUE1, conflatableObject3.getValueToConflate());
        }
        if (it.hasNext()) {
            ConflatableObject conflatableObject4 = (ConflatableObject) this.rq.getRegion().get((Long) it.next());
            assertEquals(KEY2, conflatableObject4.getKeyToConflate());
            assertEquals(VALUE2, conflatableObject4.getValueToConflate());
        }
        this.logger.info("HARegionQueueJUnitTest : testQueueAddOperationWithoutConflation END");
    }

    public void testQueueAddTakeOperationWithoutConflation() throws Exception {
        this.logger.info("HARegionQueueJUnitTest : testQueueAddTakeOperationWithoutConflation BEGIN");
        this.rq = createHARegionQueue("testQueueAddOperationWithConflation");
        EventID eventID = new EventID(new byte[]{1}, 1L, 1L);
        this.rq.put(new ConflatableObject(KEY1, VALUE1, eventID, true, "region1"));
        this.rq.take();
        assertNull(this.rq.getRegion().get(KEY1));
        assertEquals(0, this.rq.getAvalaibleIds().size());
        assertEquals(1, this.rq.getEventsMapForTesting().size());
        assertEquals(0, this.rq.getCurrentCounterSet(eventID).size());
        this.logger.info("HARegionQueueJUnitTest : testQueueAddTakeOperationWithoutConflation END");
    }

    public void testExpiryOnThreadIdentifier() {
        try {
            HARegionQueueAttributes hARegionQueueAttributes = new HARegionQueueAttributes();
            hARegionQueueAttributes.setExpiryTime(2);
            HARegionQueue createHARegionQueue = createHARegionQueue("testing", hARegionQueueAttributes);
            ConflatableObject conflatableObject = new ConflatableObject(KEY1, VALUE1, new EventID(new byte[]{1}, 1L, 1L), true, "region1");
            ThreadIdentifier threadIdentifier = new ThreadIdentifier(conflatableObject.getEventId().getMembershipID(), conflatableObject.getEventId().getThreadID());
            createHARegionQueue.put(conflatableObject);
            createHARegionQueue.take();
            Thread.sleep(25000L);
            assertFalse("ThreadIdentifier did not remove itself through expiry.The reqgion queue is of type=" + createHARegionQueue.getClass(), createHARegionQueue.getRegion().containsKey(threadIdentifier));
            assertNull("expiry action on ThreadIdentifier did not remove itself from eventsMap", createHARegionQueue.getEventsMapForTesting().get(threadIdentifier));
        } catch (Exception e) {
            fail(" test failed due to " + e);
        }
    }

    public void testNoExpiryOnThreadIdentifier() {
        try {
            HARegionQueueAttributes hARegionQueueAttributes = new HARegionQueueAttributes();
            hARegionQueueAttributes.setExpiryTime(8);
            HARegionQueue createHARegionQueue = createHARegionQueue("testing", hARegionQueueAttributes);
            EventID eventID = new EventID(new byte[]{1}, 1L, 1L);
            EventID eventID2 = new EventID(new byte[]{1}, 1L, 2L);
            ConflatableObject conflatableObject = new ConflatableObject(KEY1, VALUE1, eventID, true, "region1");
            ConflatableObject conflatableObject2 = new ConflatableObject(KEY1, VALUE2, eventID2, true, "region1");
            ThreadIdentifier threadIdentifier = new ThreadIdentifier(conflatableObject.getEventId().getMembershipID(), conflatableObject.getEventId().getThreadID());
            createHARegionQueue.put(conflatableObject);
            assertNotNull(createHARegionQueue.take());
            Thread.sleep(3000L);
            createHARegionQueue.put(conflatableObject2);
            Thread.sleep(4000L);
            assertNotNull("ThreadIdentifier removed itself through expiry even though data was lying in the queue", createHARegionQueue.getEventsMapForTesting().get(threadIdentifier));
            Thread.sleep(16000L);
            assertEquals(0, createHARegionQueue.getRegion().entries(false).size());
            assertEquals(0, createHARegionQueue.getAvalaibleIds().size());
            assertNull(createHARegionQueue.getCurrentCounterSet(eventID));
        } catch (Exception e) {
            e.printStackTrace();
            fail(" test failed due to " + e);
        }
    }

    public void testMultipleQRMArrival() throws Exception {
        HARegionQueue createHARegionQueue = createHARegionQueue("testNoExpiryOnThreadIdentifier");
        EventID[] eventIDArr = new EventID[10];
        for (int i = 0; i < 10; i++) {
            eventIDArr[i] = new EventID(new byte[]{1}, 1L, i + 1);
        }
        for (int i2 = 0; i2 < 10; i2++) {
            createHARegionQueue.put(new ConflatableObject("KEY " + i2, "VALUE" + i2, eventIDArr[i2], true, "region1"));
        }
        assertEquals(10, createHARegionQueue.getAvalaibleIds().size());
        createHARegionQueue.removeDispatchedEvents(eventIDArr[4]);
        assertEquals(5, createHARegionQueue.getAvalaibleIds().size());
        assertEquals(5, createHARegionQueue.getCurrentCounterSet(eventIDArr[0]).size());
        Iterator it = createHARegionQueue.getCurrentCounterSet(eventIDArr[0]).iterator();
        while (it.hasNext()) {
            assertTrue(((ConflatableObject) createHARegionQueue.getRegion().get((Long) it.next())).getEventId().getSequenceID() > 5);
        }
        createHARegionQueue.removeDispatchedEvents(eventIDArr[9]);
        assertEquals(0, createHARegionQueue.getAvalaibleIds().size());
    }

    public void testConcurrentPutAndQRM() throws Exception {
        this.testFailed = false;
        this.message = new StringBuffer();
        final HARegionQueue createHARegionQueue = createHARegionQueue("testConcurrentPutAndQRM");
        final EventID eventID = new EventID(new byte[]{1}, 1L, 1L);
        final EventID eventID2 = new EventID(new byte[]{1}, 1L, 2L);
        Thread thread = new Thread() { // from class: com.gemstone.gemfire.internal.cache.ha.HARQAddOperationJUnitTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    createHARegionQueue.put(new ConflatableObject(HARQAddOperationJUnitTest.KEY1, HARQAddOperationJUnitTest.VALUE1, eventID, true, "region1"));
                    createHARegionQueue.put(new ConflatableObject(HARQAddOperationJUnitTest.KEY2, HARQAddOperationJUnitTest.VALUE2, eventID2, true, "region1"));
                } catch (Exception e) {
                    HARQAddOperationJUnitTest.this.message.append("Put in region queue failed");
                    HARQAddOperationJUnitTest.this.testFailed = true;
                }
            }
        };
        thread.setPriority(10);
        Thread thread2 = new Thread() { // from class: com.gemstone.gemfire.internal.cache.ha.HARQAddOperationJUnitTest.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    createHARegionQueue.removeDispatchedEvents(eventID2);
                } catch (Exception e) {
                    HARQAddOperationJUnitTest.this.message.append("Removal by QRM in region queue failed");
                    HARQAddOperationJUnitTest.this.testFailed = true;
                }
            }
        };
        thread.start();
        thread2.start();
        DistributedTestBase.join(thread, 180000L, (Logger) null);
        DistributedTestBase.join(thread2, 180000L, (Logger) null);
        if (this.testFailed) {
            fail("Test failed due to " + ((Object) this.message));
        }
        assertEquals(0, createHARegionQueue.getAvalaibleIds().size());
        assertEquals(2L, createHARegionQueue.getLastDispatchedSequenceId(eventID2));
    }

    public void testConcurrentQRMAndPut() throws Exception {
        this.testFailed = false;
        final HARegionQueue createHARegionQueue = createHARegionQueue("testConcurrentQRMAndPut");
        final EventID eventID = new EventID(new byte[]{1}, 1L, 1L);
        final EventID eventID2 = new EventID(new byte[]{1}, 1L, 2L);
        Thread thread = new Thread() { // from class: com.gemstone.gemfire.internal.cache.ha.HARQAddOperationJUnitTest.3
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    createHARegionQueue.put(new ConflatableObject(HARQAddOperationJUnitTest.KEY1, HARQAddOperationJUnitTest.VALUE1, eventID, true, "region1"));
                    createHARegionQueue.put(new ConflatableObject(HARQAddOperationJUnitTest.KEY2, HARQAddOperationJUnitTest.VALUE2, eventID2, true, "region1"));
                } catch (Exception e) {
                    HARQAddOperationJUnitTest.this.message.append("Put in region queue failed");
                    HARQAddOperationJUnitTest.this.testFailed = true;
                }
            }
        };
        Thread thread2 = new Thread() { // from class: com.gemstone.gemfire.internal.cache.ha.HARQAddOperationJUnitTest.4
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    createHARegionQueue.removeDispatchedEvents(eventID2);
                } catch (Exception e) {
                    HARQAddOperationJUnitTest.this.message.append("Removal of Events by QRM in Region queue failed");
                    HARQAddOperationJUnitTest.this.testFailed = true;
                }
            }
        };
        thread2.setPriority(10);
        thread2.start();
        thread.start();
        DistributedTestBase.join(thread, 180000L, (Logger) null);
        DistributedTestBase.join(thread2, 180000L, (Logger) null);
        if (this.testFailed) {
            fail("Test failed due to " + ((Object) this.message));
        }
        assertEquals(0, createHARegionQueue.getAvalaibleIds().size());
        assertEquals(2L, createHARegionQueue.getLastDispatchedSequenceId(eventID2));
    }

    public void testEventMapPopulationForQRM() throws Exception {
        HARegionQueue createHARegionQueue = createHARegionQueue("testEventMapPopulationForQRM");
        EventID eventID = new EventID(new byte[]{1}, 1L, 1L);
        EventID eventID2 = new EventID(new byte[]{1}, 1L, 2L);
        this.logger.info("RemoveDispatched event for sequence id : " + eventID2.getSequenceID());
        createHARegionQueue.removeDispatchedEvents(eventID2);
        this.logger.info("RemoveDispatched event for sequence id :" + eventID.getSequenceID());
        createHARegionQueue.removeDispatchedEvents(eventID);
        assertEquals("Size of eventMap should be 1 but actual size " + createHARegionQueue.getEventsMapForTesting(), createHARegionQueue.getEventsMapForTesting().size(), 1);
        this.logger.info("sequence id : " + createHARegionQueue.getLastDispatchedSequenceId(eventID2));
        assertEquals("Last dispatched sequence id should be 2 but actual sequence id is ", createHARegionQueue.getLastDispatchedSequenceId(eventID2), eventID2.getSequenceID());
        this.logger.info("testEventMapPopulationForQRM() completed successfully");
    }

    public void testCleanUpForConflation() throws Exception {
        this.logger.info("HARQAddOperationJUnitTest : testCleanUpForConflation BEGIN");
        this.testFailed = false;
        this.message = null;
        final HARegionQueue createHARegionQueue = createHARegionQueue("testCleanUpForConflation");
        this.logger.info("HARQAddOperationJUnitTest : testCleanUpForConflation after regionqueue create");
        Thread[] threadArr = new Thread[10];
        for (int i = 0; i < 10; i++) {
            final long j = i;
            threadArr[i] = new Thread() { // from class: com.gemstone.gemfire.internal.cache.ha.HARQAddOperationJUnitTest.5
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    for (int i2 = 0; i2 < 567; i2++) {
                        EventID eventID = new EventID(new byte[]{(byte) j}, j, i2);
                        try {
                            createHARegionQueue.put(new ConflatableObject(HARQAddOperationJUnitTest.KEY1, eventID.getThreadID() + "VALUE" + i2, eventID, true, "region1"));
                        } catch (Exception e) {
                            HARQAddOperationJUnitTest.this.testFailed = true;
                            HARQAddOperationJUnitTest.this.message.append("put failed for the threadId " + eventID.getThreadID());
                        }
                    }
                }
            };
        }
        for (int i2 = 0; i2 < 10; i2++) {
            threadArr[i2].start();
        }
        for (int i3 = 0; i3 < 10; i3++) {
            DistributedTestBase.join(threadArr[i3], 180000L, (Logger) null);
        }
        this.logger.info("HARQAddOperationJUnitTest : testCleanUpForConflation after join");
        if (this.testFailed) {
            fail("Test failed due to " + ((Object) this.message));
        }
        assertEquals("size of the conflation map should be 1 but actual size is " + createHARegionQueue.getConflationMapForTesting().size(), 1, createHARegionQueue.getConflationMapForTesting().size());
        assertEquals("size of the event map should be 10 but actual size " + createHARegionQueue.getEventsMapForTesting().size(), 10, createHARegionQueue.getEventsMapForTesting().size());
        assertEquals("size of availableids should 1 but actual size " + createHARegionQueue.getAvalaibleIds().size(), 1, createHARegionQueue.getAvalaibleIds().size());
        int i4 = 0;
        for (int i5 = 0; i5 < 10; i5++) {
            if (createHARegionQueue.getCurrentCounterSet(new EventID(new byte[]{(byte) i5}, i5, i5)).size() > 0) {
                i4++;
            }
        }
        assertEquals("size of the counter set is  1 but the actual size is " + i4, 1, i4);
        assertEquals(createHARegionQueue.getCurrentCounterSet(((ConflatableObject) createHARegionQueue.getRegion().get(createHARegionQueue.getAvalaibleIds().size() == 1 ? (Long) createHARegionQueue.getAvalaibleIds().iterator().next() : null)).getEventId()).size(), 1);
        this.logger.info("HARQAddOperationJUnitTest : testCleanUpForConflation END");
    }

    public void testPeekAndRemoveWithoutConflation() throws Exception {
        this.testFailed = false;
        this.message = null;
        final HARegionQueue createHARegionQueue = createHARegionQueue("testPeekAndRemoveWithoutConflation");
        Thread[] threadArr = new Thread[5];
        for (int i = 0; i < 5; i++) {
            final long j = i;
            threadArr[i] = new Thread() { // from class: com.gemstone.gemfire.internal.cache.ha.HARQAddOperationJUnitTest.6
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    for (int i2 = 0; i2 < 4; i2++) {
                        EventID eventID = new EventID(new byte[]{(byte) j}, j, i2);
                        try {
                            createHARegionQueue.put(new ConflatableObject(HARQAddOperationJUnitTest.KEY1 + eventID.getThreadID() + i2, eventID.getThreadID() + "VALUE" + i2, eventID, false, "region1"));
                        } catch (Exception e) {
                            HARQAddOperationJUnitTest.this.testFailed = true;
                            HARQAddOperationJUnitTest.this.message.append("put failed for the threadId " + eventID.getThreadID());
                        }
                    }
                }
            };
        }
        for (int i2 = 0; i2 < 5; i2++) {
            threadArr[i2].start();
        }
        for (int i3 = 0; i3 < 5; i3++) {
            DistributedTestBase.join(threadArr[i3], 180000L, (Logger) null);
        }
        if (this.testFailed) {
            fail("Test failed due to " + ((Object) this.message));
        }
        assertEquals(20, createHARegionQueue.peek(20).size());
        createHARegionQueue.remove();
        for (int i4 = 0; i4 < 5; i4++) {
            assertEquals(3L, createHARegionQueue.getLastDispatchedSequenceId(new EventID(new byte[]{(byte) i4}, i4, 1L)));
            assertEquals(0, createHARegionQueue.getCurrentCounterSet(new EventID(new byte[]{(byte) i4}, i4, 1L)).size());
        }
        assertEquals(0, createHARegionQueue.getAvalaibleIds().size());
        this.logger.info("testPeekAndRemoveWithoutConflation() completed successfully");
    }

    public void testPeekAndRemoveWithConflation() throws Exception {
        this.testFailed = false;
        this.message = null;
        final HARegionQueue createHARegionQueue = createHARegionQueue("testPeekAndRemoveWithConflation");
        Thread[] threadArr = new Thread[5];
        for (int i = 0; i < 5; i++) {
            final long j = i;
            threadArr[i] = new Thread() { // from class: com.gemstone.gemfire.internal.cache.ha.HARQAddOperationJUnitTest.7
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    for (int i2 = 0; i2 < 4; i2++) {
                        EventID eventID = new EventID(new byte[]{(byte) j}, j, i2);
                        try {
                            createHARegionQueue.put(new ConflatableObject(HARQAddOperationJUnitTest.KEY1 + j, eventID.getThreadID() + "VALUE" + i2, eventID, true, "region1"));
                        } catch (Exception e) {
                            HARQAddOperationJUnitTest.this.testFailed = true;
                            HARQAddOperationJUnitTest.this.message.append("put failed for the threadId " + eventID.getThreadID());
                        }
                    }
                }
            };
        }
        for (int i2 = 0; i2 < 5; i2++) {
            threadArr[i2].start();
        }
        for (int i3 = 0; i3 < 5; i3++) {
            DistributedTestBase.join(threadArr[i3], 180000L, (Logger) null);
        }
        if (this.testFailed) {
            fail("Test failed due to " + ((Object) this.message));
        }
        assertEquals(5, createHARegionQueue.peek(20).size());
        createHARegionQueue.remove();
        for (int i4 = 0; i4 < 5; i4++) {
            assertEquals(0, createHARegionQueue.getCurrentCounterSet(new EventID(new byte[]{(byte) i4}, i4, 1L)).size());
        }
        assertEquals("size of availableIds map should be 0 ", 0, createHARegionQueue.getAvalaibleIds().size());
        assertEquals("size of conflation map should be 0 ", 0, ((Map) createHARegionQueue.getConflationMapForTesting().get("region1")).size());
        this.logger.info("testPeekAndRemoveWithConflation() completed successfully");
    }

    public void testPeekForDiffBatchSizeAndRemoveAll() throws Exception {
        this.testFailed = false;
        this.message = null;
        this.barrierCount = 0;
        final HARegionQueue createHARegionQueue = createHARegionQueue("testPeekForDiffBatchSizeAndRemoveAll");
        Thread[] threadArr = new Thread[5];
        for (int i = 0; i < 5; i++) {
            final long j = i;
            threadArr[i] = new Thread() { // from class: com.gemstone.gemfire.internal.cache.ha.HARQAddOperationJUnitTest.8
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    for (int i2 = 0; i2 < 4; i2++) {
                        EventID eventID = new EventID(new byte[]{(byte) j}, j, i2);
                        try {
                            createHARegionQueue.put(new ConflatableObject(HARQAddOperationJUnitTest.KEY1 + eventID.getThreadID() + i2, eventID.getThreadID() + "VALUE" + i2, eventID, false, "region1"));
                        } catch (Exception e) {
                            HARQAddOperationJUnitTest.this.testFailed = true;
                            HARQAddOperationJUnitTest.this.message.append("put failed for the threadId " + eventID.getThreadID());
                        }
                    }
                }
            };
        }
        for (int i2 = 0; i2 < 5; i2++) {
            threadArr[i2].start();
        }
        for (int i3 = 0; i3 < 5; i3++) {
            DistributedTestBase.join(threadArr[i3], 180000L, (Logger) null);
        }
        if (this.testFailed) {
            fail("Test failed due to " + ((Object) this.message));
        }
        this.testFailed = false;
        this.message = null;
        Thread[] threadArr2 = new Thread[4];
        for (int i4 = 1; i4 < 5; i4++) {
            final int i5 = i4 * 5;
            threadArr2[i4 - 1] = new Thread() { // from class: com.gemstone.gemfire.internal.cache.ha.HARQAddOperationJUnitTest.9
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        TestCase.assertEquals(i5, createHARegionQueue.peek(i5).size());
                        synchronized (HARQAddOperationJUnitTest.this) {
                            HARQAddOperationJUnitTest.this.barrierCount++;
                            if (HARQAddOperationJUnitTest.this.barrierCount == 4) {
                                HARQAddOperationJUnitTest.this.notifyAll();
                            } else {
                                HARQAddOperationJUnitTest.this.wait();
                            }
                        }
                        createHARegionQueue.remove();
                    } catch (Exception e) {
                        HARQAddOperationJUnitTest.this.testFailed = true;
                        e.printStackTrace();
                        HARQAddOperationJUnitTest.this.message.append("Exception while performing peak operation " + e.getStackTrace());
                    }
                }
            };
        }
        for (int i6 = 0; i6 < 4; i6++) {
            threadArr2[i6].start();
        }
        for (int i7 = 0; i7 < 4; i7++) {
            DistributedTestBase.join(threadArr2[i7], 180000L, (Logger) null);
        }
        if (this.testFailed) {
            fail("Test failed due to " + ((Object) this.message));
        }
        for (int i8 = 0; i8 < 5; i8++) {
            assertEquals(3L, createHARegionQueue.getLastDispatchedSequenceId(new EventID(new byte[]{(byte) i8}, i8, 1L)));
            assertEquals(0, createHARegionQueue.getCurrentCounterSet(new EventID(new byte[]{(byte) i8}, i8, 1L)).size());
        }
        assertEquals(0, createHARegionQueue.getAvalaibleIds().size());
        this.logger.info("testPeekForDiffBatchSizeAndRemoveAll() completed successfully");
    }

    public void testPeekForDiffBatchSizeAndRemoveSome() throws Exception {
        this.testFailed = false;
        this.barrierCount = 0;
        this.message = null;
        final HARegionQueue createHARegionQueue = createHARegionQueue("testPeekForDiffBatchSizeAndRemoveSome");
        Thread[] threadArr = new Thread[5];
        for (int i = 0; i < 5; i++) {
            final long j = i;
            threadArr[i] = new Thread() { // from class: com.gemstone.gemfire.internal.cache.ha.HARQAddOperationJUnitTest.10
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    for (int i2 = 0; i2 < 4; i2++) {
                        EventID eventID = new EventID(new byte[]{(byte) j}, j, i2);
                        try {
                            createHARegionQueue.put(new ConflatableObject(HARQAddOperationJUnitTest.KEY1 + eventID.getThreadID() + i2, eventID.getThreadID() + "VALUE" + i2, eventID, false, "region1"));
                        } catch (Exception e) {
                            HARQAddOperationJUnitTest.this.testFailed = true;
                            HARQAddOperationJUnitTest.this.message.append("put failed for the threadId " + eventID.getThreadID());
                        }
                    }
                }
            };
        }
        for (int i2 = 0; i2 < 5; i2++) {
            threadArr[i2].start();
        }
        for (int i3 = 0; i3 < 5; i3++) {
            DistributedTestBase.join(threadArr[i3], 180000L, (Logger) null);
        }
        if (this.testFailed) {
            fail("Test failed due to " + ((Object) this.message));
        }
        this.testFailed = false;
        this.message = null;
        Thread[] threadArr2 = new Thread[3];
        for (int i4 = 1; i4 < 4; i4++) {
            final int i5 = i4 * 5;
            threadArr2[i4 - 1] = new Thread() { // from class: com.gemstone.gemfire.internal.cache.ha.HARQAddOperationJUnitTest.11
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        TestCase.assertEquals(i5, createHARegionQueue.peek(i5).size());
                        synchronized (HARQAddOperationJUnitTest.this) {
                            HARQAddOperationJUnitTest.this.barrierCount++;
                            if (HARQAddOperationJUnitTest.this.barrierCount == 3) {
                                HARQAddOperationJUnitTest.this.notifyAll();
                            } else {
                                HARQAddOperationJUnitTest.this.wait();
                            }
                        }
                        createHARegionQueue.remove();
                    } catch (Exception e) {
                        HARQAddOperationJUnitTest.this.testFailed = true;
                        e.printStackTrace();
                        HARQAddOperationJUnitTest.this.message.append("Exception while performing peak operation " + e.getStackTrace());
                    }
                }
            };
        }
        for (int i6 = 0; i6 < 3; i6++) {
            threadArr2[i6].start();
        }
        for (int i7 = 0; i7 < 3; i7++) {
            DistributedTestBase.join(threadArr2[i7], 180000L, (Logger) null);
        }
        if (this.testFailed) {
            fail("Test failed due to " + ((Object) this.message));
        }
        assertEquals(5, createHARegionQueue.getAvalaibleIds().size());
        this.logger.info("testPeekForDiffBatchSizeAndRemoveSome() completed successfully");
    }

    public void testAddWithQRMAndExpiry() throws Exception {
        try {
            HARegionQueueAttributes hARegionQueueAttributes = new HARegionQueueAttributes();
            hARegionQueueAttributes.setExpiryTime(10);
            HARegionQueue.TestOnlyHARegionQueue testOnlyHARegionQueue = new HARegionQueue.TestOnlyHARegionQueue("testing", this.cache, hARegionQueueAttributes) { // from class: com.gemstone.gemfire.internal.cache.ha.HARQAddOperationJUnitTest.12
                CacheListener createCacheListenerForHARegion() {
                    return new CacheListenerAdapter() { // from class: com.gemstone.gemfire.internal.cache.ha.HARQAddOperationJUnitTest.12.1
                        public void afterInvalidate(EntryEvent entryEvent) {
                            try {
                                expireTheEventOrThreadIdentifier(entryEvent);
                            } catch (CacheException e) {
                                if (AnonymousClass12.this.logger.errorEnabled()) {
                                    AnonymousClass12.this.logger.error(StringIdImpl.LITERAL, "HAREgionQueue::createCacheListener::Exception in the expiry thread", e);
                                }
                            }
                            if (entryEvent.getKey() instanceof ThreadIdentifier) {
                                synchronized (HARQAddOperationJUnitTest.this) {
                                    HARQAddOperationJUnitTest.expiryCount++;
                                    HARQAddOperationJUnitTest.this.notify();
                                }
                            }
                        }
                    };
                }
            };
            Conflatable[] conflatableArr = new Conflatable[10];
            for (int i = 0; i < 10; i++) {
                conflatableArr[i] = new ConflatableObject("key" + i, "value", new EventID(new byte[]{1}, 1L, i), true, "testing");
                testOnlyHARegionQueue.put(conflatableArr[i]);
            }
            ThreadIdentifier threadIdentifier = new ThreadIdentifier(new byte[]{1}, 1L);
            assertEquals(new Long(-1L), testOnlyHARegionQueue.getRegion().get(threadIdentifier));
            testOnlyHARegionQueue.removeDispatchedEvents(new EventID(new byte[]{1}, 1L, 4L));
            assertEquals(4L, testOnlyHARegionQueue.getLastDispatchedSequenceId(new EventID(new byte[]{1}, 1L, 1L)));
            for (long j = 1; j < 6; j++) {
                Assert.assertTrue(!testOnlyHARegionQueue.getRegion().containsKey(new Long(j)));
            }
            for (long j2 = 6; j2 < 11; j2++) {
                Assert.assertTrue(testOnlyHARegionQueue.getRegion().containsKey(new Long(j2)));
            }
            for (long j3 = 6; j3 < 11; j3++) {
                testOnlyHARegionQueue.take();
            }
            assertEquals(9L, testOnlyHARegionQueue.getLastDispatchedSequenceId(new EventID(new byte[]{1}, 1L, 1L)));
            for (long j4 = 1; j4 < 11; j4++) {
                Assert.assertTrue(!testOnlyHARegionQueue.getRegion().containsKey(new Long(j4)));
            }
            synchronized (this) {
                if (0 == expiryCount) {
                    wait();
                }
                if (1 == expiryCount) {
                    assertEquals(1, testOnlyHARegionQueue.getEventsMapForTesting().size());
                    assertEquals(new Long(9L), testOnlyHARegionQueue.getRegion().get(threadIdentifier));
                    wait();
                }
            }
            assertEquals(0, testOnlyHARegionQueue.getEventsMapForTesting().size());
            assertNull(testOnlyHARegionQueue.getRegion().get(threadIdentifier));
        } catch (Exception e) {
            fail("Exception occured in test due to " + e);
        }
    }

    public void _testDispatchedMsgsMapUpdateOnTakes() throws Exception {
        this.logger.info("HARQAddOperationJUnitTest : testDispatchedEventsMapUpdateOnTakes BEGIN");
        HARegionQueue createHARegionQueue = createHARegionQueue("testDispatchedEventsMapUpdateOnTakes");
        EventID eventID = null;
        for (int i = 0; i < 10; i++) {
            eventID = new EventID(new byte[]{1}, 1L, i);
            createHARegionQueue.put(new ConflatableObject("key" + i, "value" + i, eventID, false, "testing"));
        }
        for (int i2 = 0; i2 < 10; i2++) {
            createHARegionQueue.take();
        }
        Map dispatchedMessagesMapForTesting = HARegionQueue.getDispatchedMessagesMapForTesting();
        assertNotNull("dispatchedMessagesMap found null", dispatchedMessagesMapForTesting);
        assertEquals("size of dispatched msgs should be 1", 1, dispatchedMessagesMapForTesting.size());
        MapWrapper mapWrapper = (MapWrapper) dispatchedMessagesMapForTesting.get("testDispatchedEventsMapUpdateOnTakes");
        assertNotNull("dispatchedMsgMap should contain an entry with queueregion name as key", mapWrapper);
        Map map = mapWrapper.map;
        assertEquals("size of wrapper-map should be 1 as all events had same ThreadId", 1, map.size());
        assertEquals("sequenceId against the ThreadId in the wrapper-map should be that of the last event taken.", eventID.getSequenceID(), ((Long) map.get(new ThreadIdentifier(new byte[]{1}, 1L))).longValue());
        this.logger.info("HARQAddOperationJUnitTest : testDispatchedEventsMapUpdateOnTakes END");
    }
}
