package io.smart.cache.projectors.driver;

import io.telicent.smart.cache.projectors.NoOpProjector;
import io.telicent.smart.cache.projectors.driver.ProjectorDriver;
import io.telicent.smart.cache.projectors.sinks.NullSink;
import io.telicent.smart.cache.sources.Event;
import io.telicent.smart.cache.sources.memory.InMemoryEventSource;
import io.telicent.smart.cache.sources.memory.SimpleEvent;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:io/smart/cache/projectors/driver/TestProjectorDriver.class */
public class TestProjectorDriver {
    private final ExecutorService executor = Executors.newFixedThreadPool(4);
    private final List<Future<?>> futures = new ArrayList();

    private <TKey, TValue> Future<?> runDriver(ProjectorDriver<TKey, TValue, Event<TKey, TValue>> projectorDriver) {
        Future<?> submit = this.executor.submit((Runnable) projectorDriver);
        this.futures.add(submit);
        return submit;
    }

    @AfterMethod
    public void testCleanup() {
        int i = 0;
        while (i < this.futures.size()) {
            Future<?> future = this.futures.get(i);
            if (future.isDone() || future.isCancelled()) {
                this.futures.remove(i);
            } else {
                future.cancel(true);
                i++;
            }
        }
    }

    @AfterClass
    public void cleanup() {
        this.executor.shutdownNow();
    }

    @Test
    public void projector_driver_01() throws ExecutionException, InterruptedException, TimeoutException {
        InfiniteEventSource infiniteEventSource = new InfiniteEventSource("Event %,d", 0L);
        runDriver(ProjectorDriver.create().source(infiniteEventSource).projector(new NoOpProjector()).destination(NullSink.of()).limit(10000L).build()).get(5L, TimeUnit.SECONDS);
        Assert.assertTrue(infiniteEventSource.isClosed());
        Assert.assertEquals(infiniteEventSource.eventsYielded(), 10000);
    }

    @Test
    public void projector_driver_02() {
        InfiniteEventSource infiniteEventSource = new InfiniteEventSource("Event %,d", 100L);
        ProjectorDriver build = ProjectorDriver.create().source(infiniteEventSource).projector(new NoOpProjector()).destination(NullSink.of()).unlimited().build();
        Future<?> runDriver = runDriver(build);
        waitIgnoringErrors(runDriver, 1, TimeUnit.SECONDS);
        build.cancel();
        waitIgnoringErrors(runDriver, 1, TimeUnit.SECONDS);
        Assert.assertTrue(infiniteEventSource.isClosed());
    }

    @Test
    public void projector_driver_03() {
        InfiniteEventSource infiniteEventSource = new InfiniteEventSource("Event %,d", 100L);
        Future<?> runDriver = runDriver(ProjectorDriver.create().source(infiniteEventSource).projector(new NoOpProjector()).destination(NullSink.of()).unlimited().build());
        waitIgnoringErrors(runDriver, 1, TimeUnit.SECONDS);
        infiniteEventSource.close();
        waitIgnoringErrors(runDriver, 1, TimeUnit.SECONDS);
        Assert.assertTrue(infiniteEventSource.isClosed());
    }

    @Test
    public void projector_driver_04() {
        InMemoryEventSource inMemoryEventSource = new InMemoryEventSource(List.of(new SimpleEvent(Collections.emptyList(), 1, "Singleton event")));
        waitIgnoringErrors(runDriver(ProjectorDriver.create().source(inMemoryEventSource).projector(new NoOpProjector()).destination(NullSink.of()).unlimited().build()), 1, TimeUnit.SECONDS);
        Assert.assertTrue(inMemoryEventSource.isClosed());
    }

    @Test
    public void projector_driver_05() {
        InfiniteEventSource infiniteEventSource = new InfiniteEventSource("Event %,d", 5000L);
        ProjectorDriver build = ProjectorDriver.create().source(infiniteEventSource).projector(new NoOpProjector()).destination(NullSink.of()).unlimited().pollTimeout(Duration.ofSeconds(1L)).build();
        Future<?> runDriver = runDriver(build);
        waitIgnoringErrors(runDriver, 1, TimeUnit.SECONDS);
        build.cancel();
        waitIgnoringErrors(runDriver, 3, TimeUnit.SECONDS);
        Assert.assertEquals(infiniteEventSource.eventsYielded(), 0);
        Assert.assertTrue(infiniteEventSource.isClosed());
    }

    @Test
    public void projector_driver_06() {
        InfiniteEventSource infiniteEventSource = new InfiniteEventSource("Event %,d", 1000L);
        ProjectorDriver build = ProjectorDriver.create().source(infiniteEventSource).projector(new NoOpProjector()).destination(NullSink.of()).unlimited().pollTimeout(Duration.ofMillis(100L)).maxStalls(3L).build();
        Future<?> runDriver = runDriver(build);
        waitIgnoringErrors(runDriver, 1, TimeUnit.SECONDS);
        build.cancel();
        waitIgnoringErrors(runDriver, 1, TimeUnit.SECONDS);
        Assert.assertEquals(infiniteEventSource.eventsYielded(), 0);
        Assert.assertTrue(infiniteEventSource.isClosed());
    }

    @Test
    public void projector_driver_07() {
        LyingEventSource lyingEventSource = new LyingEventSource("Event %,d", 1000L);
        ProjectorDriver build = ProjectorDriver.create().source(lyingEventSource).projector(new NoOpProjector()).destination(NullSink.of()).unlimited().pollTimeout(Duration.ofMillis(100L)).maxStalls(3L).build();
        Future<?> runDriver = runDriver(build);
        waitIgnoringErrors(runDriver, 1, TimeUnit.SECONDS);
        build.cancel();
        waitIgnoringErrors(runDriver, 1, TimeUnit.SECONDS);
        Assert.assertEquals(lyingEventSource.eventsYielded(), 0);
        Assert.assertTrue(lyingEventSource.isClosed());
    }

    @Test
    public void projector_driver_08() {
        RemainingInfiniteEventSource remainingInfiniteEventSource = new RemainingInfiniteEventSource("Event %,d", 2L);
        ProjectorDriver build = ProjectorDriver.create().source(remainingInfiniteEventSource).projector(new NoOpProjector()).destination(NullSink.of()).unlimited().unlimitedStalls().build();
        Future<?> runDriver = runDriver(build);
        waitIgnoringErrors(runDriver, 10, TimeUnit.SECONDS);
        build.cancel();
        waitIgnoringErrors(runDriver, 1, TimeUnit.SECONDS);
        Assert.assertTrue(remainingInfiniteEventSource.eventsYielded() > 0);
        Assert.assertTrue(remainingInfiniteEventSource.isClosed());
    }

    @Test
    public void projector_driver_08a() {
        RemainingInfiniteEventSource remainingInfiniteEventSource = new RemainingInfiniteEventSource("Event %,d", 100L);
        ProjectorDriver build = ProjectorDriver.create().source(remainingInfiniteEventSource).projector(new NoOpProjector()).destination(NullSink.of()).unlimited().unlimitedStalls().pollTimeout(Duration.ofMillis(10L)).build();
        Future<?> runDriver = runDriver(build);
        waitIgnoringErrors(runDriver, 10, TimeUnit.SECONDS);
        build.cancel();
        waitIgnoringErrors(runDriver, 1, TimeUnit.SECONDS);
        Assert.assertEquals(remainingInfiniteEventSource.eventsYielded(), 0);
        Assert.assertTrue(remainingInfiniteEventSource.isClosed());
    }

    @Test
    public void projector_driver_09() {
        RemainingInfiniteEventSource remainingInfiniteEventSource = new RemainingInfiniteEventSource("Event %,d", 5000L, 1000L);
        ProjectorDriver build = ProjectorDriver.create().source(remainingInfiniteEventSource).projector(new NoOpProjector()).destination(NullSink.of()).unlimited().unlimitedStalls().pollTimeout(Duration.ofMillis(10L)).build();
        Future<?> runDriver = runDriver(build);
        waitIgnoringErrors(runDriver, 1, TimeUnit.SECONDS);
        build.cancel();
        waitIgnoringErrors(runDriver, 1, TimeUnit.SECONDS);
        Assert.assertEquals(remainingInfiniteEventSource.eventsYielded(), 0);
        Assert.assertTrue(remainingInfiniteEventSource.isClosed());
    }

    private static void waitIgnoringErrors(Future<?> future, int i, TimeUnit timeUnit) {
        try {
            future.get(i, timeUnit);
        } catch (Throwable th) {
        }
    }

    @Test
    public void projector_driver_10() {
        InMemoryEventSource inMemoryEventSource = new InMemoryEventSource(Collections.emptyList());
        waitIgnoringErrors(runDriver(ProjectorDriver.create().source(inMemoryEventSource).projector(new NoOpProjector()).destination(NullSink::of).pollTimeout(Duration.ofSeconds(5L)).limit(10000000L).maxStalls(36L).reportBatchSize(100000L).build()), 1, TimeUnit.SECONDS);
    }

    @Test
    public void projector_driver_11() throws InterruptedException {
        Future<?> runDriver = runDriver(ProjectorDriver.create().source(new StallingInfiniteEventSource("Event %,d", 1500L, 3)).projector(new NoOpProjector()).destination(NullSink::of).pollTimeout(Duration.ofSeconds(1L)).limit(10L).build());
        waitIgnoringErrors(runDriver, 5, TimeUnit.SECONDS);
        try {
            runDriver.get();
        } catch (ExecutionException e) {
            Assert.fail("Unexpected driver error: " + e.getMessage());
        }
        Assert.assertTrue(runDriver.isDone());
    }
}
