package org.neo4j.util.concurrent;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.neo4j.util.concurrent.AsyncEvents;

@Timeout(30)
/* loaded from: input_file:org/neo4j/util/concurrent/AsyncEventsTest.class */
class AsyncEventsTest {
    private ExecutorService executor;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/neo4j/util/concurrent/AsyncEventsTest$Event.class */
    public static class Event extends AsyncEvent {
        Thread processedBy;

        Event() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/neo4j/util/concurrent/AsyncEventsTest$EventConsumer.class */
    public static class EventConsumer implements Consumer<Event> {
        final BlockingQueue<Event> eventsProcessed = new LinkedBlockingQueue();

        EventConsumer() {
        }

        @Override // java.util.function.Consumer
        public void accept(Event event) {
            event.processedBy = Thread.currentThread();
            this.eventsProcessed.offer(event);
        }

        public Event poll(long j, TimeUnit timeUnit) throws InterruptedException {
            return this.eventsProcessed.poll(j, timeUnit);
        }
    }

    AsyncEventsTest() {
    }

    @BeforeEach
    void setUp() {
        this.executor = Executors.newCachedThreadPool();
    }

    @AfterEach
    void tearDown() {
        this.executor.shutdown();
    }

    @Test
    void eventsMustBeProcessedByBackgroundThread() throws Exception {
        EventConsumer eventConsumer = new EventConsumer();
        Runnable asyncEvents = new AsyncEvents(eventConsumer, AsyncEvents.Monitor.NONE);
        this.executor.submit(asyncEvents);
        Event event = new Event();
        asyncEvents.send(event);
        Event poll = eventConsumer.poll(10L, TimeUnit.SECONDS);
        Event event2 = new Event();
        asyncEvents.send(event2);
        Event poll2 = eventConsumer.poll(10L, TimeUnit.SECONDS);
        asyncEvents.shutdown();
        Assertions.assertThat(poll).isEqualTo(event);
        Assertions.assertThat(poll2).isEqualTo(event2);
    }

    @Test
    void mustNotProcessEventInSameThreadWhenNotShutDown() throws Exception {
        EventConsumer eventConsumer = new EventConsumer();
        Runnable asyncEvents = new AsyncEvents(eventConsumer, AsyncEvents.Monitor.NONE);
        this.executor.submit(asyncEvents);
        asyncEvents.send(new Event());
        Thread thread = eventConsumer.poll(10L, TimeUnit.SECONDS).processedBy;
        asyncEvents.shutdown();
        Assertions.assertThat(thread).isNotEqualTo(Thread.currentThread());
    }

    @Test
    void mustProcessEventsDirectlyWhenShutDown() throws InterruptedException {
        EventConsumer eventConsumer = new EventConsumer();
        Runnable asyncEvents = new AsyncEvents(eventConsumer, AsyncEvents.Monitor.NONE);
        this.executor.submit(asyncEvents);
        asyncEvents.send(new Event());
        Thread thread = eventConsumer.poll(10L, TimeUnit.SECONDS).processedBy;
        asyncEvents.shutdown();
        Assertions.assertThat(thread).isNotEqualTo(Thread.currentThread());
        do {
            asyncEvents.send(new Event());
        } while (eventConsumer.poll(10L, TimeUnit.SECONDS).processedBy != Thread.currentThread());
    }

    @Test
    void concurrentlyPublishedEventsMustAllBeProcessed() throws InterruptedException {
        EventConsumer eventConsumer = new EventConsumer();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Runnable asyncEvents = new AsyncEvents(eventConsumer, AsyncEvents.Monitor.NONE);
        this.executor.submit(asyncEvents);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
        Runnable runnable = () -> {
            try {
                countDownLatch.await();
                for (int i = 0; i < 2000; i++) {
                    asyncEvents.send(new Event());
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        };
        for (int i = 0; i < 10; i++) {
            newFixedThreadPool.submit(runnable);
        }
        countDownLatch.countDown();
        Thread currentThread = Thread.currentThread();
        int i2 = 0;
        while (i2 < 20000) {
            try {
                Event poll = eventConsumer.poll(1L, TimeUnit.SECONDS);
                if (poll == null) {
                    i2--;
                } else {
                    Assertions.assertThat(poll.processedBy).isNotEqualTo(currentThread);
                }
                i2++;
            } finally {
                asyncEvents.shutdown();
                newFixedThreadPool.shutdown();
            }
        }
    }

    @Test
    void awaitingShutdownMustBlockUntilAllMessagesHaveBeenProcessed() throws Exception {
        Event event = new Event();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        EventConsumer eventConsumer = new EventConsumer();
        Runnable asyncEvents = new AsyncEvents(eventConsumer, AsyncEvents.Monitor.NONE);
        this.executor.submit(asyncEvents);
        do {
            asyncEvents.send(new Event());
        } while (eventConsumer.eventsProcessed.take().processedBy == Thread.currentThread());
        Future<?> submit = this.executor.submit(() -> {
            countDownLatch.countDown();
            asyncEvents.awaitTermination();
            eventConsumer.eventsProcessed.offer(event);
        });
        countDownLatch.await();
        asyncEvents.send(new Event());
        asyncEvents.send(new Event());
        asyncEvents.send(new Event());
        asyncEvents.send(new Event());
        asyncEvents.send(new Event());
        Assertions.assertThat(eventConsumer.eventsProcessed.take()).isNotNull();
        Assertions.assertThat(eventConsumer.eventsProcessed.take()).isNotNull();
        Assertions.assertThat(eventConsumer.eventsProcessed.take()).isNotNull();
        Assertions.assertThat(eventConsumer.eventsProcessed.take()).isNotNull();
        Assertions.assertThat(eventConsumer.eventsProcessed.take()).isNotNull();
        Assertions.assertThat(eventConsumer.eventsProcessed.poll(20L, TimeUnit.MILLISECONDS)).isNull();
        asyncEvents.shutdown();
        submit.get();
        Assertions.assertThat(eventConsumer.eventsProcessed.take()).isSameAs(event);
    }
}
