package org.neo4j.kernel.impl.transaction.log;

import java.io.IOException;
import java.lang.Thread;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mockito;
import org.neo4j.kernel.KernelHealth;
import org.neo4j.kernel.impl.transaction.DeadSimpleTransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.rotation.LogRotation;
import org.neo4j.kernel.impl.transaction.tracing.LogAppendEvent;
import org.neo4j.kernel.impl.util.IdOrderingQueue;
import org.neo4j.kernel.lifecycle.LifeRule;
import org.neo4j.test.ThreadTestUtils;

/* loaded from: input_file:org/neo4j/kernel/impl/transaction/log/BatchingTransactionAppenderConcurrencyTest.class */
public class BatchingTransactionAppenderConcurrencyTest {
    private static ExecutorService executor;

    @Rule
    public final LifeRule life = new LifeRule();
    private final LogAppendEvent logAppendEvent = LogAppendEvent.NULL;
    private LogFile logFile;
    private LogRotation logRotation;
    private TransactionMetadataCache transactionMetadataCache;
    private TransactionIdStore transactionIdStore;
    private IdOrderingQueue legacyindexTransactionOrdering;
    private KernelHealth kernelHealth;
    private WritableLogChannel channel;
    private BlockingQueue<ChannelCommand> channelCommandQueue;
    private Semaphore forceSemaphore;

    /* loaded from: input_file:org/neo4j/kernel/impl/transaction/log/BatchingTransactionAppenderConcurrencyTest$ChannelCommand.class */
    private enum ChannelCommand {
        emptyBufferIntoChannelAndClearIt,
        force,
        dummy
    }

    @BeforeClass
    public static void setUpExecutor() {
        executor = Executors.newCachedThreadPool();
    }

    @AfterClass
    public static void tearDownExecutor() {
        executor.shutdown();
        executor = null;
    }

    @Before
    public void setUp() {
        this.logFile = (LogFile) Mockito.mock(LogFile.class);
        this.logRotation = LogRotation.NO_ROTATION;
        this.transactionMetadataCache = new TransactionMetadataCache(10, 10);
        this.transactionIdStore = new DeadSimpleTransactionIdStore();
        this.legacyindexTransactionOrdering = IdOrderingQueue.BYPASS;
        this.kernelHealth = (KernelHealth) Mockito.mock(KernelHealth.class);
        this.channelCommandQueue = new LinkedBlockingQueue();
        this.forceSemaphore = new Semaphore(0);
        this.channel = new InMemoryLogChannel() { // from class: org.neo4j.kernel.impl.transaction.log.BatchingTransactionAppenderConcurrencyTest.1
            @Override // org.neo4j.kernel.impl.transaction.log.InMemoryLogChannel
            public void force() throws IOException {
                try {
                    BatchingTransactionAppenderConcurrencyTest.this.forceSemaphore.release();
                    BatchingTransactionAppenderConcurrencyTest.this.channelCommandQueue.put(ChannelCommand.force);
                } catch (InterruptedException e) {
                    throw new IOException(e);
                }
            }

            @Override // org.neo4j.kernel.impl.transaction.log.InMemoryLogChannel
            public void emptyBufferIntoChannelAndClearIt() {
                try {
                    BatchingTransactionAppenderConcurrencyTest.this.channelCommandQueue.put(ChannelCommand.emptyBufferIntoChannelAndClearIt);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        };
        Mockito.when(this.logFile.getWriter()).thenReturn(this.channel);
    }

    private Runnable createForceAfterAppendRunnable(final BatchingTransactionAppender batchingTransactionAppender) {
        return new Runnable() { // from class: org.neo4j.kernel.impl.transaction.log.BatchingTransactionAppenderConcurrencyTest.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    batchingTransactionAppender.forceAfterAppend(BatchingTransactionAppenderConcurrencyTest.this.logAppendEvent);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        };
    }

    @Test
    public void shouldForceLogChannel() throws Throwable {
        BatchingTransactionAppender batchingTransactionAppender = (BatchingTransactionAppender) this.life.add(new BatchingTransactionAppender(this.logFile, this.logRotation, this.transactionMetadataCache, this.transactionIdStore, this.legacyindexTransactionOrdering, this.kernelHealth));
        this.life.start();
        batchingTransactionAppender.forceAfterAppend(this.logAppendEvent);
        Assert.assertThat(this.channelCommandQueue.take(), Matchers.is(ChannelCommand.emptyBufferIntoChannelAndClearIt));
        Assert.assertThat(this.channelCommandQueue.take(), Matchers.is(ChannelCommand.force));
        Assert.assertTrue(this.channelCommandQueue.isEmpty());
    }

    @Test
    public void shouldWaitForOngoingForceToCompleteBeforeForcingAgain() throws Throwable {
        this.channelCommandQueue = new LinkedBlockingQueue(2);
        this.channelCommandQueue.put(ChannelCommand.dummy);
        BatchingTransactionAppender batchingTransactionAppender = (BatchingTransactionAppender) this.life.add(new BatchingTransactionAppender(this.logFile, this.logRotation, this.transactionMetadataCache, this.transactionIdStore, this.legacyindexTransactionOrdering, this.kernelHealth));
        this.life.start();
        Runnable createForceAfterAppendRunnable = createForceAfterAppendRunnable(batchingTransactionAppender);
        Future<?> submit = executor.submit(createForceAfterAppendRunnable);
        this.forceSemaphore.acquire();
        Thread fork = ThreadTestUtils.fork(createForceAfterAppendRunnable);
        ThreadTestUtils.awaitThreadState(fork, 5000L, Thread.State.TIMED_WAITING, new Thread.State[0]);
        Assert.assertThat(this.channelCommandQueue.take(), Matchers.is(ChannelCommand.dummy));
        Assert.assertThat(this.channelCommandQueue.take(), Matchers.is(ChannelCommand.emptyBufferIntoChannelAndClearIt));
        Assert.assertThat(this.channelCommandQueue.take(), Matchers.is(ChannelCommand.force));
        Assert.assertThat(this.channelCommandQueue.take(), Matchers.is(ChannelCommand.emptyBufferIntoChannelAndClearIt));
        Assert.assertThat(this.channelCommandQueue.take(), Matchers.is(ChannelCommand.force));
        submit.get();
        fork.join();
        Assert.assertTrue(this.channelCommandQueue.isEmpty());
    }

    @Test
    public void shouldBatchUpMultipleWaitingForceRequests() throws Throwable {
        this.channelCommandQueue = new LinkedBlockingQueue(2);
        this.channelCommandQueue.put(ChannelCommand.dummy);
        BatchingTransactionAppender batchingTransactionAppender = (BatchingTransactionAppender) this.life.add(new BatchingTransactionAppender(this.logFile, this.logRotation, this.transactionMetadataCache, this.transactionIdStore, this.legacyindexTransactionOrdering, this.kernelHealth));
        this.life.start();
        Runnable createForceAfterAppendRunnable = createForceAfterAppendRunnable(batchingTransactionAppender);
        Future<?> submit = executor.submit(createForceAfterAppendRunnable);
        this.forceSemaphore.acquire();
        Thread[] threadArr = new Thread[10];
        for (int i = 0; i < threadArr.length; i++) {
            threadArr[i] = ThreadTestUtils.fork(createForceAfterAppendRunnable);
        }
        for (Thread thread : threadArr) {
            ThreadTestUtils.awaitThreadState(thread, 5000L, Thread.State.TIMED_WAITING, new Thread.State[0]);
        }
        Assert.assertThat(this.channelCommandQueue.take(), Matchers.is(ChannelCommand.dummy));
        Assert.assertThat(this.channelCommandQueue.take(), Matchers.is(ChannelCommand.emptyBufferIntoChannelAndClearIt));
        Assert.assertThat(this.channelCommandQueue.take(), Matchers.is(ChannelCommand.force));
        Assert.assertThat(this.channelCommandQueue.take(), Matchers.is(ChannelCommand.emptyBufferIntoChannelAndClearIt));
        Assert.assertThat(this.channelCommandQueue.take(), Matchers.is(ChannelCommand.force));
        submit.get();
        for (Thread thread2 : threadArr) {
            thread2.join();
        }
        Assert.assertTrue(this.channelCommandQueue.isEmpty());
    }
}
