package cyclops.futurestream.react.async.pipes;

import com.oath.cyclops.async.QueueFactories;
import com.oath.cyclops.async.adapters.Queue;
import com.oath.cyclops.types.reactive.QueueBasedSubscriber;
import cyclops.control.Eval;
import cyclops.control.Future;
import cyclops.control.Maybe;
import cyclops.control.Option;
import cyclops.control.Try;
import cyclops.futurestream.FutureStream;
import cyclops.futurestream.LazyReact;
import cyclops.futurestream.Pipes;
import cyclops.reactive.ReactiveSeq;
import cyclops.reactive.collections.mutable.ListX;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import reactor.core.publisher.Flux;

/* loaded from: input_file:cyclops/futurestream/react/async/pipes/PipesTest.class */
public class PipesTest {
    Pipes<String, String> pipes;
    Executor ex = Executors.newFixedThreadPool(1);

    @Before
    public void setup() {
        this.pipes = Pipes.of(new HashMap());
    }

    @Test
    public void evalIssue() {
        Pipes of = Pipes.of();
        of.register("reactor", QueueFactories.boundedNonBlockingQueue(1000).build());
        of.publishTo("reactor", ReactiveSeq.of(new Integer[]{10, 20, 30}));
        Eval nextOrNull = of.nextOrNull("reactor");
        ArrayList arrayList = new ArrayList();
        arrayList.add(nextOrNull.get());
        arrayList.add(nextOrNull.get());
        arrayList.add(nextOrNull.get());
        Assert.assertThat(arrayList, Matchers.equalTo(ListX.of(new Integer[]{10, 20, 30})));
    }

    @Test
    public void nextValueFinished() {
        Maybe just = Maybe.just("hello");
        PrintStream printStream = System.out;
        printStream.getClass();
        just.peek(printStream::println);
        Pipes of = Pipes.of();
        of.register("reactor", QueueFactories.boundedNonBlockingQueue(1000).build());
        of.publishTo("reactor", ReactiveSeq.of(new Integer[]{10, 20, 30}));
        Eval nextValue = of.nextValue("reactor");
        System.out.println("EV");
        ArrayList arrayList = new ArrayList();
        arrayList.add(nextValue.get());
        arrayList.add(nextValue.get());
        System.out.println("Results " + arrayList);
        arrayList.add(nextValue.get());
        Assert.assertThat(arrayList, Matchers.equalTo(ListX.of(new Maybe[]{Maybe.of(10), Maybe.of(20), Maybe.of(30)})));
    }

    @Test
    public void oneOrErrorFinishes() {
        Pipes of = Pipes.of();
        of.register("reactor", QueueFactories.boundedNonBlockingQueue(1000).build());
        of.publishTo("reactor", ReactiveSeq.of(new Integer[]{10, 20, 30}));
        ArrayList arrayList = new ArrayList();
        arrayList.add(of.oneOrError("reactor").get());
        arrayList.add(of.oneOrError("reactor").get());
        arrayList.add(of.oneOrError("reactor").get());
        Assert.assertThat(arrayList, Matchers.equalTo(ListX.of(new Integer[]{10, 20, 30}).map((v0) -> {
            return Option.some(v0);
        })));
    }

    @Test
    public void futureStreamTest() {
        Pipes of = Pipes.of();
        of.register("reactor", QueueFactories.boundedNonBlockingQueue(1000).build());
        of.publishTo("reactor", ReactiveSeq.of(new Integer[]{10, 20, 30}));
        of.close("reactor");
        System.out.println(Thread.currentThread().getId());
        List list = ((FutureStream) of.futureStream("reactor", new LazyReact()).toOptional().get()).map(num -> {
            return "fan-out toNested handle blocking I/O:" + Thread.currentThread().getId() + ":" + num;
        }).toList();
        System.out.println(list);
        Assert.assertThat(Integer.valueOf(list.size()), Matchers.equalTo(3));
    }

    @Test
    public void futureStreamCustomTest() {
        Pipes of = Pipes.of();
        of.register("reactor", QueueFactories.boundedNonBlockingQueue(1000).build());
        of.publishTo("reactor", ReactiveSeq.of(new Integer[]{10, 20, 30}));
        of.close("reactor");
        System.out.println(Thread.currentThread().getId());
        List list = ((FutureStream) of.futureStream("reactor", new LazyReact(10, 10)).toOptional().get()).map(num -> {
            return "fan-out toNested handle blocking I/O:" + Thread.currentThread().getId() + ":" + num;
        }).toList();
        System.out.println(list);
        Assert.assertThat(Integer.valueOf(list.size()), Matchers.equalTo(3));
    }

    @Test
    public void publishToTest() {
        Pipes of = Pipes.of();
        of.register("reactor", QueueFactories.boundedNonBlockingQueue(1000).build());
        of.publishTo("reactor", Flux.just(new Integer[]{10, 20, 30}));
        of.close("reactor");
        System.out.println(Thread.currentThread().getId());
        List list = ((FutureStream) of.futureStream("reactor", new LazyReact()).toOptional().get()).map(num -> {
            return "fan-out toNested handle blocking I/O:" + Thread.currentThread().getId() + ":" + num;
        }).toList();
        System.out.println(list);
        Assert.assertThat(Integer.valueOf(list.size()), Matchers.equalTo(3));
    }

    @Test
    public void testGetAbsent() {
        Assert.assertFalse(this.pipes.get("hello").isPresent());
    }

    @Test
    public void testGetPresent() {
        this.pipes.register("hello", new Queue());
        Assert.assertTrue(this.pipes.get("hello").isPresent());
    }

    @Test
    public void reactiveSeq() throws InterruptedException {
        Queue queue = new Queue();
        this.pipes.register("hello", queue);
        this.pipes.push("hello", "world");
        queue.close();
        Assert.assertThat(((ReactiveSeq) this.pipes.reactiveSeq("hello").toOptional().get()).toList(), Matchers.equalTo(Arrays.asList("world")));
    }

    @Test
    public void xValues() throws InterruptedException {
        Queue queue = new Queue();
        this.pipes.register("hello", queue);
        this.pipes.push("hello", "world");
        this.pipes.push("hello", "world2");
        this.pipes.push("hello", "world3");
        this.pipes.push("hello", "world4");
        queue.close();
        Assert.assertThat(this.pipes.xValues("hello", 2L), Matchers.equalTo(ListX.of(new String[]{"world", "world2"})));
        Assert.assertThat(this.pipes.xValues("hello", 2L), Matchers.equalTo(ListX.of(new String[]{"world3", "world4"})));
    }

    @Test
    public void nextValue() throws InterruptedException {
        Queue queue = new Queue();
        this.pipes.register("hello", queue);
        this.pipes.push("hello", "world");
        this.pipes.push("hello", "world2");
        queue.close();
        Eval nextValue = this.pipes.nextValue("hello");
        int i = 0;
        while (((Maybe) nextValue.get()).isPresent()) {
            int i2 = i;
            i++;
            System.out.println(i2);
        }
        Assert.assertThat(Integer.valueOf(i), Matchers.equalTo(2));
    }

    @Test
    public void nextNull() throws InterruptedException {
        Queue queue = new Queue();
        this.pipes.register("hello", queue);
        this.pipes.push("hello", "world");
        this.pipes.push("hello", "world2");
        queue.close();
        Eval nextOrNull = this.pipes.nextOrNull("hello");
        int i = 0;
        while (nextOrNull.get() != null) {
            int i2 = i;
            i++;
            System.out.println(i2);
        }
        Assert.assertThat(Integer.valueOf(i), Matchers.equalTo(2));
    }

    @Test
    public void nextAsync() {
        Queue queue = new Queue();
        this.pipes.register("hello", queue);
        this.pipes.push("hello", "world");
        this.pipes.push("hello", "world2");
        queue.close();
        Assert.assertThat(this.pipes.oneOrErrorAsync("hello", this.ex).get(), Matchers.equalTo(Future.ofResult("world").get()));
        Assert.assertThat(this.pipes.oneOrErrorAsync("hello", this.ex).get(), Matchers.equalTo(Future.ofResult("world2").get()));
    }

    @Test
    public void oneOrError() throws InterruptedException {
        Queue queue = new Queue();
        this.pipes.register("hello", queue);
        this.pipes.push("hello", "world");
        this.pipes.push("hello", "world2");
        queue.close();
        Assert.assertThat(this.pipes.oneOrError("hello").toOptional().get(), Matchers.equalTo("world"));
        Assert.assertThat(this.pipes.oneOrError("hello").toOptional().get(), Matchers.equalTo("world2"));
    }

    @Test
    public void oneValueOrError() throws InterruptedException {
        Queue queue = new Queue();
        this.pipes.register("hello", queue);
        this.pipes.push("hello", "world");
        this.pipes.push("hello", "world2");
        queue.close();
        Assert.assertThat(((Try) this.pipes.oneValueOrError("hello").toOptional().get()).get(), Matchers.equalTo(Option.some("world")));
        Assert.assertThat(((Try) this.pipes.oneValueOrError("hello").toOptional().get()).get(), Matchers.equalTo(Option.some("world2")));
    }

    @Test
    public void oneValueOrErrorTry() throws InterruptedException {
        Queue queue = new Queue();
        this.pipes.register("hello", queue);
        this.pipes.push("hello", "world");
        this.pipes.push("hello", "world2");
        queue.close();
        Assert.assertThat(((Try) this.pipes.oneValueOrError("hello", new Class[]{Throwable.class}).toOptional().get()).get(), Matchers.equalTo(Option.some("world")));
        Assert.assertThat(((Try) this.pipes.oneValueOrError("hello").toOptional().get()).get(), Matchers.equalTo(Option.some("world2")));
    }

    @Test
    public void oneValueOrErrorTryException() throws InterruptedException {
        Queue queue = new Queue();
        this.pipes.register("hello", queue);
        this.pipes.push("hello", "world");
        queue.close();
        Assert.assertThat(this.pipes.oneValueOrError("hello", new Class[]{Throwable.class}), Matchers.equalTo(Option.some(Try.success("world"))));
        Assert.assertThat(((Try) this.pipes.oneValueOrError("hello", new Class[]{Throwable.class}).toOptional().get()).failureGet().orElse((Object) null), Matchers.instanceOf(NoSuchElementException.class));
    }

    @Test
    @Ignore
    public void subscribeTo() {
        QueueBasedSubscriber.Counter counter = new QueueBasedSubscriber.Counter();
        counter.active.incrementAndGet();
        QueueBasedSubscriber subscriber = QueueBasedSubscriber.subscriber(counter, 2);
        this.pipes.register("hello", subscriber.getQueue());
        this.pipes.subscribeTo("hello", subscriber, ForkJoinPool.commonPool());
        subscriber.getQueue().offer("world");
        subscriber.getQueue().close();
        Assert.assertThat(subscriber.jdkStream().findAny().get(), Matchers.equalTo("world"));
    }

    @Test
    public void publishTo() throws InterruptedException {
        Pipes of = Pipes.of();
        Queue queue = new Queue();
        of.register("hello", queue);
        of.publishToAsync("hello", LazyReact.sequentialBuilder().of(new Integer[]{1, 2, 3}));
        Thread.sleep(100L);
        queue.offer(4);
        queue.close();
        Assert.assertThat(queue.stream().toList(), Matchers.equalTo(Arrays.asList(1, 2, 3, 4)));
    }

    @Test
    public void publishToSync() throws InterruptedException {
        Pipes of = Pipes.of();
        Queue queue = new Queue();
        of.register("hello", queue);
        of.publishTo("hello", ReactiveSeq.of(new Integer[]{1, 2, 3}));
        queue.offer(4);
        queue.close();
        Assert.assertThat(queue.stream().toList(), Matchers.equalTo(Arrays.asList(1, 2, 3, 4)));
    }
}
