package org.axonframework.eventsourcing.eventstore;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import junit.framework.TestCase;
import org.axonframework.common.MockException;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventsourcing.DomainEventMessage;
import org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine;
import org.axonframework.metrics.NoOpMessageMonitor;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/axonframework/eventsourcing/eventstore/EmbeddedEventStoreTest.class */
public class EmbeddedEventStoreTest {
    private static final int CACHED_EVENTS = 10;
    private static final long FETCH_DELAY = 1000;
    private static final long CLEANUP_DELAY = 10000;
    private EmbeddedEventStore testSubject;
    private EventStorageEngine storageEngine;

    @Before
    public void setUp() {
        this.storageEngine = (EventStorageEngine) Mockito.spy(new InMemoryEventStorageEngine());
        newTestSubject(CACHED_EVENTS, FETCH_DELAY, CLEANUP_DELAY);
    }

    private void newTestSubject(int i, long j, long j2) {
        Optional.ofNullable(this.testSubject).ifPresent((v0) -> {
            v0.shutDown();
        });
        this.testSubject = new EmbeddedEventStore(this.storageEngine, NoOpMessageMonitor.INSTANCE, i, j, j2, TimeUnit.MILLISECONDS);
        this.testSubject.initialize();
    }

    @After
    public void tearDown() {
        this.testSubject.shutDown();
    }

    @Test
    public void testExistingEventIsPassedToReader() throws Exception {
        EventMessage createEvent = EventStoreTestUtils.createEvent();
        this.testSubject.publish(new EventMessage[]{createEvent});
        TrackingEventStream streamEvents = this.testSubject.streamEvents((TrackingToken) null);
        Assert.assertTrue(streamEvents.hasNextAvailable());
        DomainEventMessage nextAvailable = streamEvents.nextAvailable();
        Assert.assertEquals(createEvent.getIdentifier(), nextAvailable.getIdentifier());
        Assert.assertEquals(createEvent.getPayload(), nextAvailable.getPayload());
        Assert.assertTrue(nextAvailable instanceof DomainEventMessage);
        Assert.assertEquals(createEvent.getAggregateIdentifier(), nextAvailable.getAggregateIdentifier());
    }

    @Test(timeout = 100)
    public void testEventPublishedAfterOpeningStreamIsPassedToReaderImmediately() throws Exception {
        TrackingEventStream streamEvents = this.testSubject.streamEvents((TrackingToken) null);
        TestCase.assertFalse(streamEvents.hasNextAvailable());
        EventMessage createEvent = EventStoreTestUtils.createEvent();
        Thread thread = new Thread(() -> {
            try {
                Assert.assertEquals(createEvent.getIdentifier(), streamEvents.nextAvailable().getIdentifier());
            } catch (InterruptedException e) {
                TestCase.fail();
            }
        });
        thread.start();
        this.testSubject.publish(new EventMessage[]{createEvent});
        thread.join();
    }

    @Test(timeout = FETCH_DELAY)
    public void testReadingIsBlockedWhenStoreIsEmpty() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        TrackingEventStream streamEvents = this.testSubject.streamEvents((TrackingToken) null);
        Thread thread = new Thread(() -> {
            EventUtils.asStream(streamEvents).findFirst().ifPresent(trackedEventMessage -> {
                countDownLatch.countDown();
            });
        });
        thread.start();
        TestCase.assertFalse(countDownLatch.await(100L, TimeUnit.MILLISECONDS));
        this.testSubject.publish(new EventMessage[]{EventStoreTestUtils.createEvent()});
        thread.join();
        Assert.assertEquals(0L, countDownLatch.getCount());
    }

    @Test(timeout = FETCH_DELAY)
    public void testReadingIsBlockedWhenEndOfStreamIsReached() throws Exception {
        this.testSubject.publish(new EventMessage[]{EventStoreTestUtils.createEvent()});
        CountDownLatch countDownLatch = new CountDownLatch(2);
        TrackingEventStream streamEvents = this.testSubject.streamEvents((TrackingToken) null);
        Thread thread = new Thread(() -> {
            EventUtils.asStream(streamEvents).limit(2L).forEach(trackedEventMessage -> {
                countDownLatch.countDown();
            });
        });
        thread.start();
        TestCase.assertFalse(countDownLatch.await(100L, TimeUnit.MILLISECONDS));
        Assert.assertEquals(1L, countDownLatch.getCount());
        this.testSubject.publish(new EventMessage[]{EventStoreTestUtils.createEvent()});
        thread.join();
        Assert.assertEquals(0L, countDownLatch.getCount());
    }

    @Test(timeout = FETCH_DELAY)
    public void testReadingCanBeContinuedUsingLastToken() throws Exception {
        this.testSubject.publish(EventStoreTestUtils.createEvents(2));
        TrackingToken trackingToken = this.testSubject.streamEvents((TrackingToken) null).nextAvailable().trackingToken();
        Assert.assertTrue(this.testSubject.streamEvents(trackingToken).nextAvailable().trackingToken().isAfter(trackingToken));
    }

    @Test(timeout = FETCH_DELAY)
    public void testEventIsFetchedFromCacheWhenFetchedASecondTime() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        Thread thread = new Thread(() -> {
            EventUtils.asStream(this.testSubject.streamEvents((TrackingToken) null)).limit(2L).forEach(trackedEventMessage -> {
                countDownLatch.countDown();
                copyOnWriteArrayList.add(trackedEventMessage);
            });
        });
        thread.start();
        TestCase.assertFalse(countDownLatch.await(100L, TimeUnit.MILLISECONDS));
        this.testSubject.publish(EventStoreTestUtils.createEvents(2));
        thread.join();
        Mockito.reset(new EventStorageEngine[]{this.storageEngine});
        Assert.assertSame(copyOnWriteArrayList.get(1), this.testSubject.streamEvents(((TrackedEventMessage) copyOnWriteArrayList.get(0)).trackingToken()).nextAvailable());
        Mockito.verifyNoMoreInteractions(new Object[]{this.storageEngine});
    }

    @Test(timeout = FETCH_DELAY)
    public void testPeriodicPollingWhenEventStorageIsUpdatedIndependently() throws Exception {
        newTestSubject(CACHED_EVENTS, 20L, CLEANUP_DELAY);
        TrackingEventStream streamEvents = this.testSubject.streamEvents((TrackingToken) null);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Thread thread = new Thread(() -> {
            EventUtils.asStream(streamEvents).findFirst().ifPresent(trackedEventMessage -> {
                countDownLatch.countDown();
            });
        });
        thread.start();
        TestCase.assertFalse(countDownLatch.await(100L, TimeUnit.MILLISECONDS));
        this.storageEngine.appendEvents(new EventMessage[]{EventStoreTestUtils.createEvent()});
        thread.join();
        Assert.assertTrue(countDownLatch.await(100L, TimeUnit.MILLISECONDS));
    }

    @Test(timeout = FETCH_DELAY)
    public void testConsumerStopsTailingWhenItFallsBehindTheCache() throws Exception {
        newTestSubject(CACHED_EVENTS, FETCH_DELAY, 20L);
        TrackingEventStream streamEvents = this.testSubject.streamEvents((TrackingToken) null);
        TestCase.assertFalse(streamEvents.hasNextAvailable());
        this.testSubject.publish(EventStoreTestUtils.createEvents(CACHED_EVENTS));
        Thread.sleep(100L);
        Mockito.reset(new EventStorageEngine[]{this.storageEngine});
        Assert.assertTrue(streamEvents.hasNextAvailable());
        TrackedEventMessage nextAvailable = streamEvents.nextAvailable();
        Mockito.verifyZeroInteractions(new Object[]{this.storageEngine});
        this.testSubject.publish(new EventMessage[]{EventStoreTestUtils.createEvent(10L), EventStoreTestUtils.createEvent(11L)});
        Thread.sleep(100L);
        Mockito.reset(new EventStorageEngine[]{this.storageEngine});
        Assert.assertTrue(streamEvents.hasNextAvailable());
        ((EventStorageEngine) Mockito.verify(this.storageEngine)).readEvents(nextAvailable.trackingToken(), false);
    }

    @Test
    public void testLoadWithoutSnapshot() {
        this.testSubject.publish(EventStoreTestUtils.createEvents(110));
        List list = (List) EventUtils.asStream(this.testSubject.readEvents(EventStoreTestUtils.AGGREGATE)).collect(Collectors.toList());
        Assert.assertEquals(110L, list.size());
        Assert.assertEquals(109L, ((DomainEventMessage) list.get(list.size() - 1)).getSequenceNumber());
    }

    @Test
    public void testLoadWithSnapshot() {
        this.testSubject.publish(EventStoreTestUtils.createEvents(110));
        this.storageEngine.storeSnapshot(EventStoreTestUtils.createEvent(30L));
        List list = (List) EventUtils.asStream(this.testSubject.readEvents(EventStoreTestUtils.AGGREGATE)).collect(Collectors.toList());
        Assert.assertEquals(80L, list.size());
        Assert.assertEquals(30L, ((DomainEventMessage) list.get(0)).getSequenceNumber());
        Assert.assertEquals(109L, ((DomainEventMessage) list.get(list.size() - 1)).getSequenceNumber());
    }

    @Test
    public void testLoadWithFailingSnapshot() {
        this.testSubject.publish(EventStoreTestUtils.createEvents(110));
        this.storageEngine.storeSnapshot(EventStoreTestUtils.createEvent(30L));
        Mockito.when(this.storageEngine.readSnapshot(EventStoreTestUtils.AGGREGATE)).thenThrow(new Throwable[]{new MockException()});
        List list = (List) EventUtils.asStream(this.testSubject.readEvents(EventStoreTestUtils.AGGREGATE)).collect(Collectors.toList());
        Assert.assertEquals(110L, list.size());
        Assert.assertEquals(0L, ((DomainEventMessage) list.get(0)).getSequenceNumber());
        Assert.assertEquals(109L, ((DomainEventMessage) list.get(list.size() - 1)).getSequenceNumber());
    }
}
