/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.util.concurrent;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.neo4j.util.concurrent.AsyncEvent;
import org.neo4j.util.concurrent.AsyncEvents;

@Timeout(value=30L)
class AsyncEventsTest {
    private ExecutorService executor;

    AsyncEventsTest() {
    }

    @BeforeEach
    void setUp() {
        this.executor = Executors.newCachedThreadPool();
    }

    @AfterEach
    void tearDown() {
        this.executor.shutdown();
    }

    @Test
    void eventsMustBeProcessedByBackgroundThread() throws Exception {
        EventConsumer consumer = new EventConsumer();
        AsyncEvents asyncEvents = new AsyncEvents((Consumer)consumer, AsyncEvents.Monitor.NONE);
        this.executor.submit((Runnable)asyncEvents);
        Event firstSentEvent = new Event();
        asyncEvents.send((AsyncEvent)firstSentEvent);
        Event firstProcessedEvent = consumer.poll(10L, TimeUnit.SECONDS);
        Event secondSentEvent = new Event();
        asyncEvents.send((AsyncEvent)secondSentEvent);
        Event secondProcessedEvent = consumer.poll(10L, TimeUnit.SECONDS);
        asyncEvents.shutdown();
        Assertions.assertThat((Object)((Object)firstProcessedEvent)).isEqualTo((Object)firstSentEvent);
        Assertions.assertThat((Object)((Object)secondProcessedEvent)).isEqualTo((Object)secondSentEvent);
    }

    @Test
    void mustNotProcessEventInSameThreadWhenNotShutDown() throws Exception {
        EventConsumer consumer = new EventConsumer();
        AsyncEvents asyncEvents = new AsyncEvents((Consumer)consumer, AsyncEvents.Monitor.NONE);
        this.executor.submit((Runnable)asyncEvents);
        asyncEvents.send((AsyncEvent)new Event());
        Thread processingThread = consumer.poll((long)10L, (TimeUnit)TimeUnit.SECONDS).processedBy;
        asyncEvents.shutdown();
        Assertions.assertThat((Object)processingThread).isNotEqualTo((Object)Thread.currentThread());
    }

    @Test
    void mustProcessEventsDirectlyWhenShutDown() throws InterruptedException {
        Thread threadForSubsequentEvents;
        EventConsumer consumer = new EventConsumer();
        AsyncEvents asyncEvents = new AsyncEvents((Consumer)consumer, AsyncEvents.Monitor.NONE);
        this.executor.submit((Runnable)asyncEvents);
        asyncEvents.send((AsyncEvent)new Event());
        Thread threadForFirstEvent = consumer.poll((long)10L, (TimeUnit)TimeUnit.SECONDS).processedBy;
        asyncEvents.shutdown();
        Assertions.assertThat((Object)threadForFirstEvent).isNotEqualTo((Object)Thread.currentThread());
        do {
            asyncEvents.send((AsyncEvent)new Event());
        } while ((threadForSubsequentEvents = consumer.poll((long)10L, (TimeUnit)TimeUnit.SECONDS).processedBy) != Thread.currentThread());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void concurrentlyPublishedEventsMustAllBeProcessed() throws InterruptedException {
        EventConsumer consumer = new EventConsumer();
        CountDownLatch startLatch = new CountDownLatch(1);
        int threads = 10;
        int iterations = 2000;
        AsyncEvents asyncEvents = new AsyncEvents((Consumer)consumer, AsyncEvents.Monitor.NONE);
        this.executor.submit((Runnable)asyncEvents);
        ExecutorService threadPool = Executors.newFixedThreadPool(10);
        Runnable runner = () -> {
            try {
                startLatch.await();
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            for (int i = 0; i < 2000; ++i) {
                asyncEvents.send((AsyncEvent)new Event());
            }
        };
        for (int i = 0; i < 10; ++i) {
            threadPool.submit(runner);
        }
        startLatch.countDown();
        Thread thisThread = Thread.currentThread();
        int eventCount = 20000;
        try {
            for (int i = 0; i < eventCount; ++i) {
                Event event = consumer.poll(1L, TimeUnit.SECONDS);
                if (event == null) {
                    --i;
                    continue;
                }
                Assertions.assertThat((Object)event.processedBy).isNotEqualTo((Object)thisThread);
            }
        }
        finally {
            asyncEvents.shutdown();
            threadPool.shutdown();
        }
    }

    @Test
    void awaitingShutdownMustBlockUntilAllMessagesHaveBeenProcessed() throws Exception {
        Event specialShutdownObservedEvent = new Event();
        CountDownLatch awaitStartLatch = new CountDownLatch(1);
        EventConsumer consumer = new EventConsumer();
        AsyncEvents asyncEvents = new AsyncEvents((Consumer)consumer, AsyncEvents.Monitor.NONE);
        this.executor.submit((Runnable)asyncEvents);
        do {
            asyncEvents.send((AsyncEvent)new Event());
        } while (consumer.eventsProcessed.take().processedBy == Thread.currentThread());
        Future<?> awaitShutdownFuture = this.executor.submit(() -> {
            awaitStartLatch.countDown();
            asyncEvents.awaitTermination();
            consumer.eventsProcessed.offer(specialShutdownObservedEvent);
        });
        awaitStartLatch.await();
        asyncEvents.send((AsyncEvent)new Event());
        asyncEvents.send((AsyncEvent)new Event());
        asyncEvents.send((AsyncEvent)new Event());
        asyncEvents.send((AsyncEvent)new Event());
        asyncEvents.send((AsyncEvent)new Event());
        Assertions.assertThat((Object)((Object)consumer.eventsProcessed.take())).isNotNull();
        Assertions.assertThat((Object)((Object)consumer.eventsProcessed.take())).isNotNull();
        Assertions.assertThat((Object)((Object)consumer.eventsProcessed.take())).isNotNull();
        Assertions.assertThat((Object)((Object)consumer.eventsProcessed.take())).isNotNull();
        Assertions.assertThat((Object)((Object)consumer.eventsProcessed.take())).isNotNull();
        Assertions.assertThat((Object)((Object)consumer.eventsProcessed.poll(20L, TimeUnit.MILLISECONDS))).isNull();
        asyncEvents.shutdown();
        awaitShutdownFuture.get();
        Assertions.assertThat((Object)((Object)consumer.eventsProcessed.take())).isSameAs((Object)specialShutdownObservedEvent);
    }

    static class EventConsumer
    implements Consumer<Event> {
        final BlockingQueue<Event> eventsProcessed = new LinkedBlockingQueue<Event>();

        EventConsumer() {
        }

        @Override
        public void accept(Event event) {
            event.processedBy = Thread.currentThread();
            this.eventsProcessed.offer(event);
        }

        public Event poll(long timeout, TimeUnit unit) throws InterruptedException {
            return this.eventsProcessed.poll(timeout, unit);
        }
    }

    static class Event
    extends AsyncEvent {
        Thread processedBy;

        Event() {
        }
    }
}

