package cz.o2.proxima.utils.zookeeper.org.apache.zookeeper.server.quorum;

import cz.o2.proxima.utils.zookeeper.org.apache.zookeeper.CreateMode;
import cz.o2.proxima.utils.zookeeper.org.apache.zookeeper.ZKTestCase;
import cz.o2.proxima.utils.zookeeper.org.apache.zookeeper.ZooDefs;
import cz.o2.proxima.utils.zookeeper.org.apache.zookeeper.proto.CreateRequest;
import cz.o2.proxima.utils.zookeeper.org.apache.zookeeper.proto.GetDataRequest;
import cz.o2.proxima.utils.zookeeper.org.apache.zookeeper.proto.SetDataRequest;
import cz.o2.proxima.utils.zookeeper.org.apache.zookeeper.server.Request;
import cz.o2.proxima.utils.zookeeper.org.apache.zookeeper.server.RequestProcessor;
import cz.o2.proxima.utils.zookeeper.org.apache.zookeeper.server.ServerCnxn;
import cz.o2.proxima.utils.zookeeper.org.apache.zookeeper.server.WorkerService;
import cz.o2.proxima.utils.zookeeper.org.apache.zookeeper.server.ZooKeeperServerListener;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.Record;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/utils/zookeeper/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.class */
public class CommitProcessorConcurrencyTest extends ZKTestCase {
    protected static final Logger LOG = LoggerFactory.getLogger(CommitProcessorConcurrencyTest.class);
    BlockingQueue<Request> processedRequests;
    MockCommitProcessor processor;
    int defaultSizeOfThreadPool = 16;

    /* loaded from: input_file:cz/o2/proxima/utils/zookeeper/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest$MockCommitProcessor.class */
    class MockCommitProcessor extends CommitProcessor {
        MockCommitProcessor() {
            super(new RequestProcessor() { // from class: cz.o2.proxima.utils.zookeeper.org.apache.zookeeper.server.quorum.CommitProcessorConcurrencyTest.MockCommitProcessor.1
                public void processRequest(Request request) throws RequestProcessor.RequestProcessorException {
                    CommitProcessorConcurrencyTest.this.processedRequests.offer(request);
                }

                public void shutdown() {
                }
            }, "0", false, new ZooKeeperServerListener() { // from class: cz.o2.proxima.utils.zookeeper.org.apache.zookeeper.server.quorum.CommitProcessorConcurrencyTest.MockCommitProcessor.2
                public void notifyStopping(String str, int i) {
                    Assert.fail("Commit processor crashed " + i);
                }
            });
        }

        public void initThreads(int i) {
            this.stopped = false;
            this.workerPool = new WorkerService("CommitProcWork", i, true);
        }
    }

    /* loaded from: input_file:cz/o2/proxima/utils/zookeeper/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest$MockRequestsQueue.class */
    class MockRequestsQueue extends LinkedBlockingQueue<Request> {
        private static final long serialVersionUID = 1;
        int readReqId = 0;

        MockRequestsQueue() {
        }

        @Override // java.util.concurrent.LinkedBlockingQueue, java.util.Queue
        public Request poll() {
            this.readReqId++;
            try {
                return CommitProcessorConcurrencyTest.this.newRequest(new GetDataRequest("/", false), 4, this.readReqId % 50, this.readReqId);
            } catch (IOException e) {
                e.printStackTrace();
                return null;
            }
        }

        @Override // java.util.concurrent.LinkedBlockingQueue, java.util.AbstractCollection, java.util.Collection
        public int size() {
            return 42;
        }
    }

    @Before
    public void setUp() throws Exception {
        this.processedRequests = new LinkedBlockingQueue();
        this.processor = new MockCommitProcessor();
        CommitProcessor.setMaxReadBatchSize(-1);
        CommitProcessor.setMaxCommitBatchSize(1);
    }

    @After
    public void tearDown() throws Exception {
        this.processor.shutdown();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Request newRequest(Record record, int i, int i2, int i3) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        record.serialize(BinaryOutputArchive.getArchive(byteArrayOutputStream), "request");
        return new Request((ServerCnxn) null, i2, i3, i, ByteBuffer.wrap(byteArrayOutputStream.toByteArray()), new ArrayList());
    }

    @Test
    public void committedAndUncommittedOfTheSameSessionRaceTest() throws Exception {
        Request newRequest = newRequest(new GetDataRequest("/testCvsUCRace", false), 4, 0, 0);
        Request newRequest2 = newRequest(new SetDataRequest("/testCvsUCRace", new byte[16], -1), 5, 0, 1);
        this.processor.committedRequests.add(newRequest2);
        this.processor.queuedRequests.add(newRequest);
        this.processor.queuedRequests.add(newRequest2);
        this.processor.queuedWriteRequests.add(newRequest2);
        this.processor.initThreads(1);
        this.processor.stoppedMainLoop = true;
        this.processor.run();
        Assert.assertTrue("Request was not processed " + newRequest + " instead " + this.processedRequests.peek(), this.processedRequests.peek() != null && this.processedRequests.peek().equals(newRequest));
        this.processedRequests.poll();
        Assert.assertTrue("Request was not processed " + newRequest2 + " instead " + this.processedRequests.peek(), this.processedRequests.peek() != null && this.processedRequests.peek().equals(newRequest2));
    }

    @Test
    public void processAsMuchUncommittedRequestsAsPossibleTest() throws Exception {
        LinkedList linkedList = new LinkedList();
        HashSet hashSet = new HashSet();
        for (int i = 1; i <= 5; i++) {
            for (int i2 = 1; i2 <= i; i2++) {
                Request newRequest = newRequest(new GetDataRequest("/testAsMuchAsPossible", false), 4, i, i2);
                linkedList.add(newRequest);
                this.processor.queuedRequests.add(newRequest);
            }
            Request newRequest2 = newRequest(new CreateRequest("/testAsMuchAsPossible", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), 1, i, i + 1);
            Request newRequest3 = newRequest(new GetDataRequest("/testAsMuchAsPossible", false), 4, i, i + 2);
            this.processor.queuedRequests.add(newRequest2);
            this.processor.queuedWriteRequests.add(newRequest2);
            this.processor.queuedRequests.add(newRequest3);
            hashSet.add(newRequest2);
            hashSet.add(newRequest3);
        }
        this.processor.initThreads(this.defaultSizeOfThreadPool);
        this.processor.stoppedMainLoop = true;
        this.processor.run();
        Thread.sleep(1000L);
        linkedList.removeAll(this.processedRequests);
        Iterator it = linkedList.iterator();
        while (it.hasNext()) {
            LOG.error("Did not process {}", (Request) it.next());
        }
        Assert.assertTrue("Not all requests were processed", linkedList.isEmpty());
        Assert.assertFalse("Processed a wrong request", hashSet.removeAll(this.processedRequests));
    }

    @Test
    public void processAllFollowingUncommittedAfterFirstCommitTest() throws Exception {
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        Request newRequest = newRequest(new CreateRequest("/testUncommittedFollowingCommited", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), 1, 1, 1);
        this.processor.queuedRequests.add(newRequest);
        this.processor.queuedWriteRequests.add(newRequest);
        hashSet.add(newRequest);
        for (int i = 2; i <= 5; i++) {
            Request newRequest2 = newRequest(new GetDataRequest("/testUncommittedFollowingCommited", false), 4, 1, i);
            this.processor.queuedRequests.add(newRequest2);
            hashSet.add(newRequest2);
            hashSet2.add(newRequest2);
        }
        this.processor.initThreads(this.defaultSizeOfThreadPool);
        this.processor.stoppedMainLoop = true;
        this.processor.run();
        Assert.assertTrue("Processed without waiting for commit", this.processedRequests.isEmpty());
        Assert.assertTrue("Did not handled all of queuedRequests' requests", this.processor.queuedRequests.isEmpty());
        Assert.assertTrue("Removed from blockedQueuedRequests before commit", !this.processor.queuedWriteRequests.isEmpty());
        hashSet.removeAll((Collection) this.processor.pendingRequests.get(Long.valueOf(newRequest.sessionId)));
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            LOG.error("Should be in pending {}", (Request) it.next());
        }
        Assert.assertTrue("Not all requests moved to pending from queuedRequests", hashSet.isEmpty());
        this.processor.committedRequests.add(newRequest);
        this.processor.stoppedMainLoop = true;
        this.processor.run();
        this.processor.initThreads(this.defaultSizeOfThreadPool);
        Thread.sleep(500L);
        Assert.assertTrue("Did not process committed request", this.processedRequests.peek() == newRequest);
        Assert.assertTrue("Did not process following read request", this.processedRequests.containsAll(hashSet2));
        Assert.assertTrue("Did not process committed request", this.processor.committedRequests.isEmpty());
        Assert.assertTrue("Did not process committed request", this.processor.pendingRequests.isEmpty());
        Assert.assertTrue("Did not remove from blockedQueuedRequests", this.processor.queuedWriteRequests.isEmpty());
    }

    @Test
    public void processAllWritesMaxBatchSize() throws Exception {
        HashSet hashSet = new HashSet();
        Request newRequest = newRequest(new CreateRequest("/processAllWritesMaxBatchSize_1", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), 1, 1, 1);
        this.processor.queuedRequests.add(newRequest);
        this.processor.queuedWriteRequests.add(newRequest);
        Request newRequest2 = newRequest(new CreateRequest("/processAllWritesMaxBatchSize_2", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), 1, 2, 1);
        this.processor.queuedRequests.add(newRequest2);
        this.processor.queuedWriteRequests.add(newRequest2);
        for (int i = 2; i <= 5; i++) {
            Request newRequest3 = newRequest(new GetDataRequest("/processAllWritesMaxBatchSize", false), 4, 1, i);
            Request newRequest4 = newRequest(new GetDataRequest("/processAllWritesMaxBatchSize", false), 4, 2, i);
            this.processor.queuedRequests.add(newRequest3);
            hashSet.add(newRequest3);
            this.processor.queuedRequests.add(newRequest4);
            hashSet.add(newRequest4);
        }
        Request newRequest5 = newRequest(new CreateRequest("/processAllWritesMaxBatchSize_3", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), 1, 2, 6);
        this.processor.queuedRequests.add(newRequest5);
        this.processor.queuedWriteRequests.add(newRequest5);
        this.processor.initThreads(this.defaultSizeOfThreadPool);
        this.processor.stoppedMainLoop = true;
        CommitProcessor.setMaxCommitBatchSize(2);
        this.processor.run();
        Assert.assertTrue("Processed without waiting for commit", this.processedRequests.isEmpty());
        Assert.assertTrue("Did not handled all of queuedRequests' requests", this.processor.queuedRequests.isEmpty());
        Assert.assertTrue("Removed from blockedQueuedRequests before commit", !this.processor.queuedWriteRequests.isEmpty());
        Assert.assertTrue("Missing session 1 in pending queue", this.processor.pendingRequests.containsKey(Long.valueOf(newRequest.sessionId)));
        Assert.assertTrue("Missing session 2 in pending queue", this.processor.pendingRequests.containsKey(Long.valueOf(newRequest2.sessionId)));
        this.processor.committedRequests.add(newRequest);
        this.processor.committedRequests.add(newRequest2);
        this.processor.committedRequests.add(newRequest5);
        this.processor.stoppedMainLoop = true;
        CommitProcessor.setMaxCommitBatchSize(3);
        this.processor.run();
        this.processor.initThreads(this.defaultSizeOfThreadPool);
        Thread.sleep(500L);
        Assert.assertTrue("Did not process committed request", this.processedRequests.peek() == newRequest);
        Assert.assertTrue("Did not process following read request", this.processedRequests.containsAll(hashSet));
        Assert.assertTrue("Processed committed request", !this.processor.committedRequests.isEmpty());
        Assert.assertTrue("Removed commit for write req 3", this.processor.committedRequests.peek() == newRequest5);
        Assert.assertTrue("Processed committed request", !this.processor.pendingRequests.isEmpty());
        Assert.assertTrue("Missing session 2 in pending queue", this.processor.pendingRequests.containsKey(Long.valueOf(newRequest5.sessionId)));
        Assert.assertTrue("Missing write 3 in pending queue", ((Deque) this.processor.pendingRequests.get(Long.valueOf(newRequest5.sessionId))).peek() == newRequest5);
        Assert.assertTrue("Removed from blockedQueuedRequests", !this.processor.queuedWriteRequests.isEmpty());
        Assert.assertTrue("Removed write req 3 from blockedQueuedRequests", this.processor.queuedWriteRequests.peek() == newRequest5);
        Request newRequest6 = newRequest(new GetDataRequest("/processAllWritesMaxBatchSize", false), 4, 1, 7);
        this.processor.queuedRequests.add(newRequest6);
        hashSet.add(newRequest6);
        Request newRequest7 = newRequest(new CreateRequest("/processAllWritesMaxBatchSize_4", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), 1, 2, 7);
        this.processor.queuedRequests.add(newRequest7);
        this.processor.queuedWriteRequests.add(newRequest7);
        this.processor.committedRequests.add(newRequest7);
        this.processor.stoppedMainLoop = true;
        CommitProcessor.setMaxCommitBatchSize(3);
        this.processor.run();
        this.processor.initThreads(this.defaultSizeOfThreadPool);
        Thread.sleep(500L);
        Assert.assertTrue("Did not process committed request", this.processedRequests.peek() == newRequest);
        Assert.assertTrue("Did not process following read request", this.processedRequests.containsAll(hashSet));
        Assert.assertTrue("Processed unexpected committed request", !this.processor.committedRequests.isEmpty());
        Assert.assertTrue("Unexpected pending request", this.processor.pendingRequests.isEmpty());
        Assert.assertTrue("Removed from blockedQueuedRequests", !this.processor.queuedWriteRequests.isEmpty());
        Assert.assertTrue("Removed write req 4 from blockedQueuedRequests", this.processor.queuedWriteRequests.peek() == newRequest7);
        this.processor.stoppedMainLoop = true;
        CommitProcessor.setMaxCommitBatchSize(3);
        this.processor.run();
        this.processor.initThreads(this.defaultSizeOfThreadPool);
        Thread.sleep(500L);
        Assert.assertTrue("Did not process committed request", this.processedRequests.peek() == newRequest);
        Assert.assertTrue("Did not process following read request", this.processedRequests.containsAll(hashSet));
        Assert.assertTrue("Did not process committed request", this.processor.committedRequests.isEmpty());
        Assert.assertTrue("Did not process committed request", this.processor.pendingRequests.isEmpty());
        Assert.assertTrue("Did not remove from blockedQueuedRequests", this.processor.queuedWriteRequests.isEmpty());
    }

    @Test(timeout = 1000)
    public void noStarvationOfNonLocalCommittedRequestsTest() throws Exception {
        this.processor.queuedRequests = new MockRequestsQueue();
        HashSet hashSet = new HashSet();
        for (int i = 0; i < 10; i++) {
            Request newRequest = newRequest(new CreateRequest("/noStarvationOfCommittedRequests", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), 1, 51, i + 1);
            this.processor.committedRequests.add(newRequest);
            hashSet.add(newRequest);
        }
        for (int i2 = 0; i2 < 10; i2++) {
            this.processor.initThreads(this.defaultSizeOfThreadPool);
            this.processor.stoppedMainLoop = true;
            this.processor.run();
        }
        Assert.assertTrue("commit request was not processed", this.processedRequests.containsAll(hashSet));
    }

    @Test
    public void noStarvationOfReadRequestsTest() throws Exception {
        Request newRequest = newRequest(new CreateRequest("/noStarvationOfReadRequests", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), 1, 3, 1);
        this.processor.queuedRequests.add(newRequest);
        this.processor.queuedWriteRequests.add(newRequest);
        this.processor.committedRequests.add(newRequest);
        HashSet hashSet = new HashSet();
        Request newRequest2 = newRequest(new GetDataRequest("/noStarvationOfReadRequests", false), 4, 1, 0);
        hashSet.add(newRequest2);
        this.processor.queuedRequests.add(newRequest2);
        Request newRequest3 = newRequest(new CreateRequest("/noStarvationOfReadRequests", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), 1, 153, 2);
        this.processor.committedRequests.add(newRequest3);
        HashSet hashSet2 = new HashSet();
        for (int i = 3; i < 102; i++) {
            Request newRequest4 = newRequest(new CreateRequest("/noStarvationOfReadRequests", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), 1, 8, i);
            this.processor.committedRequests.add(newRequest4);
            hashSet2.add(newRequest4);
        }
        for (int i2 = 1; i2 <= 50; i2++) {
            Request newRequest5 = newRequest(new GetDataRequest("/noStarvationOfReadRequests", false), 4, 5, i2);
            hashSet.add(newRequest5);
            this.processor.queuedRequests.add(newRequest5);
        }
        this.processor.initThreads(this.defaultSizeOfThreadPool);
        this.processor.stoppedMainLoop = true;
        this.processor.run();
        Assert.assertTrue("Did not process the first write request", this.processedRequests.contains(newRequest));
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            Assert.assertTrue("Processed read request", !this.processedRequests.contains((Request) it.next()));
        }
        this.processor.run();
        Assert.assertTrue("did not processed all reads", this.processedRequests.containsAll(hashSet));
        Assert.assertTrue("Did not process the second write request", this.processedRequests.contains(newRequest3));
        Iterator it2 = hashSet2.iterator();
        while (it2.hasNext()) {
            Assert.assertTrue("Processed additional committed request", !this.processedRequests.contains((Request) it2.next()));
        }
    }

    @Test(timeout = 5000)
    public void noCrashOnCommittedRequestsOfUnseenRequestTest() throws Exception {
        this.processor.stoppedMainLoop = true;
        HashSet hashSet = new HashSet();
        Request newRequest = newRequest(new CreateRequest("/noCrash/OnCommittedRequests/OfUnseenRequestTest", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), 1, 1193046, 256);
        this.processor.queuedRequests.add(newRequest);
        this.processor.queuedWriteRequests.add(newRequest);
        hashSet.add(newRequest);
        for (int i = 256 + 1; i <= 266; i++) {
            Request newRequest2 = newRequest(new GetDataRequest("/noCrash/OnCommittedRequests/OfUnseenRequestTest", false), 4, 1193046, i);
            this.processor.queuedRequests.add(newRequest2);
            hashSet.add(newRequest2);
        }
        Assert.assertTrue(this.processor.queuedRequests.containsAll(hashSet));
        this.processor.initThreads(this.defaultSizeOfThreadPool);
        this.processor.run();
        Thread.sleep(1000L);
        Assert.assertTrue(this.processedRequests.isEmpty());
        Request newRequest3 = newRequest(new CreateRequest("/noCrash/OnCommittedRequests/OfUnseenRequestTest", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), 1, 1193046, 254);
        this.processor.committedRequests.add(newRequest3);
        this.processor.committedRequests.add(newRequest);
        this.processor.run();
        Thread.sleep(1000L);
        Assert.assertTrue(this.processedRequests.peek() == newRequest3);
        this.processor.run();
        Thread.sleep(1000L);
        Assert.assertTrue(this.processedRequests.containsAll(hashSet));
    }

    @Test(timeout = 5000)
    public void noCrashOnOutofOrderCommittedRequestTest() throws Exception {
        this.processor.stoppedMainLoop = true;
        HashSet hashSet = new HashSet();
        Request newRequest = newRequest(new CreateRequest("/noCrash/OnCommittedRequests/OfUnSeenRequestTest", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), 1, 1193046, 256);
        this.processor.queuedRequests.add(newRequest);
        this.processor.queuedWriteRequests.add(newRequest);
        hashSet.add(newRequest);
        for (int i = 256; i <= 266; i++) {
            Request newRequest2 = newRequest(new GetDataRequest("/noCrash/OnCommittedRequests/OfUnSeenRequestTest", false), 4, 1193046, i);
            this.processor.queuedRequests.add(newRequest2);
            hashSet.add(newRequest2);
        }
        this.processor.initThreads(this.defaultSizeOfThreadPool);
        this.processor.run();
        Thread.sleep(1000L);
        Assert.assertTrue(this.processedRequests.isEmpty());
        Request newRequest3 = newRequest(new CreateRequest("/noCrash/OnCommittedRequests/OfUnSeenRequestTest", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), 1, 1193046, 266);
        this.processor.committedRequests.add(newRequest3);
        this.processor.committedRequests.add(newRequest);
        this.processor.run();
        Thread.sleep(1000L);
        Assert.assertTrue(this.processedRequests.size() == 1);
        Assert.assertTrue(this.processedRequests.contains(newRequest3));
        this.processor.run();
        Thread.sleep(1000L);
        Assert.assertTrue(this.processedRequests.containsAll(hashSet));
    }
}
