package org.janusgraph.diskstorage.log;

import com.google.common.base.Preconditions;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.janusgraph.diskstorage.BackendException;
import org.janusgraph.diskstorage.StaticBuffer;
import org.janusgraph.diskstorage.util.BufferUtil;
import org.janusgraph.diskstorage.util.StaticArrayBuffer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/janusgraph/diskstorage/log/LogTest.class */
public abstract class LogTest {
    private static final Logger log = LoggerFactory.getLogger(LogTest.class);
    public static final String DEFAULT_SENDER_ID = "sender";
    private static final long TIMEOUT_MS = 30000;
    private LogManager manager;

    @Rule
    public TestName testName = new TestName();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/janusgraph/diskstorage/log/LogTest$CountingReader.class */
    public static class CountingReader extends LatchMessageReader {
        private static final Logger log = LoggerFactory.getLogger(CountingReader.class);
        private final AtomicLong totalMsg;
        private final AtomicLong totalValue;
        private final boolean expectIncreasingValues;
        private long lastMessageValue;

        private CountingReader(int i, boolean z) {
            super(i);
            this.totalMsg = new AtomicLong(0L);
            this.totalValue = new AtomicLong(0L);
            this.lastMessageValue = 0L;
            this.expectIncreasingValues = z;
        }

        @Override // org.janusgraph.diskstorage.log.LogTest.LatchMessageReader
        public void processMessage(Message message) {
            StaticBuffer content = message.getContent();
            Assert.assertEquals(8L, content.length());
            long j = content.getLong(0);
            log.debug("Read log value {} by senderid \"{}\"", Long.valueOf(j), message.getSenderId());
            if (this.expectIncreasingValues) {
                Assert.assertTrue("Message out of order or duplicated: " + this.lastMessageValue + " preceded " + j, this.lastMessageValue < j);
                this.lastMessageValue = j;
            }
            this.totalMsg.incrementAndGet();
            this.totalValue.addAndGet(j);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/janusgraph/diskstorage/log/LogTest$LatchMessageReader.class */
    public static class LatchMessageReader implements MessageReader {
        private final CountDownLatch latch;

        LatchMessageReader(int i) {
            this.latch = new CountDownLatch(i);
        }

        public final void read(Message message) {
            Assert.assertNotNull(message);
            Assert.assertNotNull(message.getSenderId());
            Assert.assertNotNull(message.getContent());
            Instant now = Instant.now();
            Assert.assertTrue(now.isAfter(message.getTimestamp()) || now.equals(message.getTimestamp()));
            processMessage(message);
            this.latch.countDown();
        }

        protected void processMessage(Message message) {
        }

        public void await(long j) throws InterruptedException {
            if (this.latch.await(j, TimeUnit.MILLISECONDS)) {
                return;
            }
            long count = this.latch.getCount();
            Preconditions.checkState(0 < count);
            String str = "Did not read expected number of messages before timeout was reached (latch count is " + count + ")";
            LogTest.log.error(str);
            throw new AssertionError(str);
        }
    }

    /* loaded from: input_file:org/janusgraph/diskstorage/log/LogTest$StoringReader.class */
    private static class StoringReader extends LatchMessageReader {
        private List<StaticBuffer> msgs;
        private volatile int msgCount;

        StoringReader(int i) {
            super(i);
            this.msgs = new ArrayList(64);
            this.msgCount = 0;
        }

        @Override // org.janusgraph.diskstorage.log.LogTest.LatchMessageReader
        public void processMessage(Message message) {
            this.msgs.add(message.getContent());
            this.msgCount++;
        }
    }

    public abstract LogManager openLogManager(String str, boolean z) throws BackendException;

    @Before
    public void setup() throws Exception {
        boolean endsWith = this.testName.getMethodName().toLowerCase().endsWith("serial");
        log.debug("Starting {}.{} - Order preserving {}", new Object[]{getClass().getSimpleName(), this.testName.getMethodName(), Boolean.valueOf(endsWith)});
        this.manager = openLogManager(DEFAULT_SENDER_ID, endsWith);
    }

    @After
    public void shutdown() throws Exception {
        close();
        log.debug("Finished {}.{}", getClass().getSimpleName(), this.testName.getMethodName());
    }

    public void close() throws Exception {
        this.manager.close();
    }

    @Test
    public void smallSendReceiveSerial() throws Exception {
        simpleSendReceive(100, 50);
    }

    @Test
    public void mediumSendReceiveSerial() throws Exception {
        simpleSendReceive(2000, 1);
    }

    @Test
    public void testMultipleReadersOnSingleLogSerial() throws Exception {
        sendReceive(4, 2000, 5, true);
    }

    @Test
    public void testMultipleReadersOnSingleLog() throws Exception {
        sendReceive(4, 2000, 5, false);
    }

    @Test
    public void testReadMarkerResumesInMiddleOfLog() throws Exception {
        Log openLog = this.manager.openLog("test1");
        openLog.add(BufferUtil.getLongBuffer(1L));
        openLog.close();
        Log openLog2 = this.manager.openLog("test1");
        CountingReader countingReader = new CountingReader(1, true);
        openLog2.registerReader(ReadMarker.fromNow(), new MessageReader[]{countingReader});
        openLog2.add(BufferUtil.getLongBuffer(2L));
        countingReader.await(TIMEOUT_MS);
        Assert.assertEquals(1L, countingReader.totalMsg.get());
        Assert.assertEquals(2L, countingReader.totalValue.get());
    }

    @Test
    public void testLogIsDurableAcrossReopenSerial() throws Exception {
        long currentTimeMillis = System.currentTimeMillis() - 10;
        this.manager.openLog("durable").add(BufferUtil.getLongBuffer(1L));
        this.manager.close();
        Log openLog = this.manager.openLog("durable");
        openLog.add(BufferUtil.getLongBuffer(2L));
        openLog.close();
        Log openLog2 = this.manager.openLog("durable");
        CountingReader countingReader = new CountingReader(2, true);
        openLog2.registerReader(ReadMarker.fromTime(Instant.ofEpochMilli(currentTimeMillis)), new MessageReader[]{countingReader});
        countingReader.await(TIMEOUT_MS);
        Assert.assertEquals(2L, countingReader.totalMsg.get());
        Assert.assertEquals(3L, countingReader.totalValue.get());
    }

    @Test
    public void testMultipleLogsWithSingleReaderSerial() throws Exception {
        Log[] logArr = new Log[3];
        CountingReader countingReader = new CountingReader(3, false);
        for (int i = 0; i < 3; i++) {
            logArr[i] = this.manager.openLog("ml" + i);
        }
        for (int i2 = 0; i2 < 3; i2++) {
            logArr[i2].registerReader(ReadMarker.fromNow(), new MessageReader[]{countingReader});
        }
        long j = 1;
        for (int i3 = 0; i3 < 3; i3++) {
            logArr[i3].add(BufferUtil.getLongBuffer(j));
            j <<= 1;
        }
        countingReader.await(TIMEOUT_MS);
        Assert.assertEquals(3L, countingReader.totalMsg.get());
        Assert.assertEquals(j - 1, countingReader.totalValue.get());
    }

    @Test
    public void testSeparateReadersAndLogsInSharedManager() throws Exception {
        Log[] logArr = new Log[5];
        CountingReader[] countingReaderArr = new CountingReader[5];
        for (int i = 0; i < 5; i++) {
            countingReaderArr[i] = new CountingReader(1, true);
            logArr[i] = this.manager.openLog("loner" + i);
        }
        for (int i2 = 0; i2 < 5; i2++) {
            logArr[i2].registerReader(ReadMarker.fromNow(), new MessageReader[]{countingReaderArr[i2]});
            logArr[i2].add(BufferUtil.getLongBuffer(1 << (i2 + 1)));
        }
        for (int i3 = 0; i3 < 5; i3++) {
            log.debug("Awaiting CountingReader[{}]", Integer.valueOf(i3));
            countingReaderArr[i3].await(TIMEOUT_MS);
            Assert.assertEquals(1 << (i3 + 1), countingReaderArr[i3].totalValue.get());
            Assert.assertEquals(1L, countingReaderArr[i3].totalMsg.get());
        }
    }

    @Test
    public void testFuzzMessagesSerial() throws Exception {
        StoringReader storingReader = new StoringReader(32);
        ArrayList arrayList = new ArrayList(32);
        Log openLog = this.manager.openLog("fuzz");
        openLog.registerReader(ReadMarker.fromNow(), new MessageReader[]{storingReader});
        Random random = new Random();
        for (int i = 0; i < 32; i++) {
            int i2 = 4096;
            if (0 == 4096) {
                i2 = 1;
            }
            byte[] bArr = new byte[i2];
            random.nextBytes(bArr);
            StaticArrayBuffer of = StaticArrayBuffer.of(bArr);
            openLog.add(of);
            arrayList.add(of);
            Thread.sleep(50L);
        }
        storingReader.await(TIMEOUT_MS);
        Assert.assertEquals(32L, storingReader.msgCount);
        Assert.assertEquals(arrayList, storingReader.msgs);
    }

    @Test
    public void testReadMarkerCompatibility() throws Exception {
        Log openLog = this.manager.openLog("testx");
        openLog.registerReader(ReadMarker.fromIdentifierOrNow("mark"), new MessageReader[]{new StoringReader(0)});
        openLog.registerReader(ReadMarker.fromIdentifierOrTime("mark", Instant.now().minusMillis(100L)), new MessageReader[]{new StoringReader(1)});
        try {
            openLog.registerReader(ReadMarker.fromIdentifierOrNow("other"), new MessageReader[0]);
            Assert.fail();
        } catch (IllegalArgumentException e) {
        }
        try {
            openLog.registerReader(ReadMarker.fromTime(Instant.now().minusMillis(100L)), new MessageReader[0]);
            Assert.fail();
        } catch (IllegalArgumentException e2) {
        }
        openLog.registerReader(ReadMarker.fromNow(), new MessageReader[]{new StoringReader(2)});
    }

    @Test
    public void testUnregisterReaderSerial() throws Exception {
        Log openLog = this.manager.openLog("test1");
        CountingReader countingReader = new CountingReader(1, true);
        CountingReader countingReader2 = new CountingReader(2, true);
        openLog.registerReader(ReadMarker.fromNow(), new MessageReader[]{countingReader, countingReader2});
        openLog.add(BufferUtil.getLongBuffer(1L));
        countingReader.await(TIMEOUT_MS);
        openLog.unregisterReader(countingReader);
        openLog.add(BufferUtil.getLongBuffer(2L));
        countingReader2.await(TIMEOUT_MS);
        Assert.assertEquals(1L, countingReader.totalMsg.get());
        Assert.assertEquals(1L, countingReader.totalValue.get());
        Assert.assertEquals(2L, countingReader2.totalMsg.get());
        Assert.assertEquals(3L, countingReader2.totalValue.get());
    }

    private void simpleSendReceive(int i, int i2) throws Exception {
        sendReceive(1, i, i2, true);
    }

    public void sendReceive(int i, int i2, int i3, boolean z) throws Exception {
        Preconditions.checkState(0 < i);
        Log openLog = this.manager.openLog("test1");
        Assert.assertEquals("test1", openLog.getName());
        CountingReader[] countingReaderArr = new CountingReader[i];
        for (int i4 = 0; i4 < countingReaderArr.length; i4++) {
            countingReaderArr[i4] = new CountingReader(i2, z);
            openLog.registerReader(ReadMarker.fromNow(), new MessageReader[]{countingReaderArr[i4]});
        }
        long j = 1;
        while (true) {
            long j2 = j;
            if (j2 > i2) {
                break;
            }
            openLog.add(BufferUtil.getLongBuffer(j2));
            Thread.sleep(i3);
            j = j2 + 1;
        }
        for (int i5 = 0; i5 < countingReaderArr.length; i5++) {
            CountingReader countingReader = countingReaderArr[i5];
            countingReader.await(TIMEOUT_MS);
            Assert.assertEquals("counter index " + i5 + " message count mismatch", i2, countingReader.totalMsg.get());
            Assert.assertEquals("counter index " + i5 + " value mismatch", (i2 * (i2 + 1)) / 2, countingReader.totalValue.get());
            Assert.assertTrue(openLog.unregisterReader(countingReader));
        }
        openLog.close();
    }
}
