package cyclops.futurestream.react.stream.pushable;

import com.oath.cyclops.async.QueueFactories;
import com.oath.cyclops.async.adapters.Queue;
import com.oath.cyclops.async.adapters.Signal;
import com.oath.cyclops.react.threads.SequentialElasticPools;
import cyclops.futurestream.FutureStream;
import cyclops.futurestream.LazyReact;
import cyclops.reactive.ReactiveSeq;
import cyclops.reactive.collections.immutable.LinkedListX;
import cyclops.reactive.collections.immutable.PersistentSetX;
import cyclops.reactive.collections.mutable.SetX;
import cyclops.stream.StreamSource;
import cyclops.stream.pushable.MultipleStreamSource;
import cyclops.stream.pushable.PushableFutureStream;
import cyclops.stream.pushable.PushableReactiveSeq;
import cyclops.stream.pushable.PushableStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.internal.util.collections.Sets;
import reactor.core.publisher.Flux;

/* loaded from: input_file:cyclops/futurestream/react/stream/pushable/PushableStreamTest.class */
public class PushableStreamTest {
    int i = 0;

    @Test
    public void pipes() throws InterruptedException {
        Flux.from(LinkedListX.of(new Integer[]{10, 20, 30}));
        SetX.fromPublisher(Flux.just(new Integer[]{10, 20, 30}));
        PersistentSetX.of(new Integer[]{1, 2, 3}).mergeMap(num -> {
            return Flux.just(new Integer[]{num, Integer.valueOf(num.intValue() * 10)});
        }).to().vectorX();
    }

    @Test
    public void testLazyFutureStream() {
        PushableFutureStream futureStream = StreamSource.ofUnbounded().futureStream(new LazyReact());
        futureStream.getInput().add(100);
        futureStream.getInput().close();
        Assert.assertThat(futureStream.getStream().collect(Collectors.toList()), Matchers.hasItem(100));
    }

    @Test
    public void testReactPool() {
        PushableFutureStream futureStream = StreamSource.ofUnbounded().futureStream(SequentialElasticPools.lazyReact.nextReactor());
        futureStream.getInput().add(100);
        futureStream.getInput().close();
        Assert.assertThat(futureStream.getStream().collect(Collectors.toList()), Matchers.hasItem(100));
    }

    @Test
    public void testStreamTuple() {
        PushableStream stream = StreamSource.ofUnbounded().stream();
        ((Queue) stream._1()).add(10);
        ((Queue) stream._1()).close();
        Assert.assertThat(((Stream) stream._2()).collect(Collectors.toList()), Matchers.hasItem(10));
    }

    @Test
    public void testStream() {
        PushableStream stream = StreamSource.ofUnbounded().stream();
        stream.getInput().add(10);
        stream.getInput().close();
        Assert.assertThat(stream.getStream().collect(Collectors.toList()), Matchers.hasItem(10));
    }

    @Test
    public void testStreamBackPressure1() throws InterruptedException {
        PushableStream stream = StreamSource.of(1).stream();
        List synchronizedList = Collections.synchronizedList(new ArrayList());
        new Thread(() -> {
            Stream stream2 = stream.getStream();
            synchronizedList.getClass();
            stream2.forEach((v1) -> {
                r1.add(v1);
            });
        }).start();
        stream.getInput().offer(10);
        synchronizedList.add("here!");
        stream.getInput().offer(20);
        synchronizedList.add("there!");
        stream.getInput().offer(30);
        synchronizedList.add("there2!");
        stream.getInput().close();
        System.out.println(synchronizedList);
    }

    @Test
    public void testSeqTuple() {
        PushableReactiveSeq reactiveSeq = StreamSource.ofUnbounded().reactiveSeq();
        ((Queue) reactiveSeq._1()).add(10);
        ((Queue) reactiveSeq._1()).close();
        Assert.assertThat(((ReactiveSeq) reactiveSeq._2()).collect(Collectors.toList()), Matchers.hasItem(10));
    }

    @Test
    public void testSeq() {
        PushableReactiveSeq reactiveSeq = StreamSource.ofUnbounded().reactiveSeq();
        reactiveSeq.getInput().add(10);
        reactiveSeq.getInput().close();
        Assert.assertThat(reactiveSeq.getStream().collect(Collectors.toList()), Matchers.hasItem(10));
    }

    @Test
    public void testLazyFutureStreamAdapter() {
        Signal queueBackedSignal = Signal.queueBackedSignal();
        FutureStream futureStream = StreamSource.futureStream(queueBackedSignal.getDiscrete(), new LazyReact());
        queueBackedSignal.set(100);
        queueBackedSignal.close();
        Assert.assertThat(futureStream.collect(Collectors.toList()), Matchers.hasItem(100));
    }

    @Test
    public void testSeqAdapter() {
        Signal queueBackedSignal = Signal.queueBackedSignal();
        ReactiveSeq reactiveSeq = StreamSource.reactiveSeq(queueBackedSignal.getDiscrete());
        queueBackedSignal.set(100);
        queueBackedSignal.close();
        Assert.assertThat(reactiveSeq.collect(Collectors.toList()), Matchers.hasItem(100));
    }

    @Test
    public void testStreamAdapter() {
        Signal queueBackedSignal = Signal.queueBackedSignal();
        Stream stream = StreamSource.stream(queueBackedSignal.getDiscrete());
        queueBackedSignal.set(100);
        queueBackedSignal.close();
        Assert.assertThat(stream.collect(Collectors.toList()), Matchers.hasItem(100));
    }

    @Test
    public void testLazyFutureStreamTopic() {
        MultipleStreamSource ofMultiple = StreamSource.ofMultiple();
        FutureStream futureStream = ofMultiple.futureStream(new LazyReact());
        ofMultiple.getInput().offer(100);
        ofMultiple.getInput().close();
        Assert.assertThat(futureStream.collect(Collectors.toList()), Matchers.hasItem(100));
    }

    @Test
    public void testLazyFutureStreamTopicBackPressure() {
        MultipleStreamSource ofMultiple = StreamSource.ofMultiple(2);
        FutureStream futureStream = ofMultiple.futureStream(new LazyReact());
        ofMultiple.getInput().offer(100);
        ofMultiple.getInput().close();
        Assert.assertThat(futureStream.collect(Collectors.toList()), Matchers.hasItem(100));
    }

    @Test
    public void testLazyFutureStreamTopicQueueFactory() {
        MultipleStreamSource ofMultiple = StreamSource.ofMultiple(QueueFactories.boundedQueue(100));
        FutureStream futureStream = ofMultiple.futureStream(new LazyReact());
        ofMultiple.getInput().offer(100);
        ofMultiple.getInput().close();
        Assert.assertThat(futureStream.collect(Collectors.toList()), Matchers.hasItem(100));
    }

    @Test
    public void testReactPoolTopic() {
        MultipleStreamSource ofMultiple = StreamSource.ofMultiple();
        FutureStream futureStream = ofMultiple.futureStream(SequentialElasticPools.lazyReact.nextReactor());
        ofMultiple.getInput().offer(100);
        ofMultiple.getInput().close();
        Assert.assertThat(futureStream.collect(Collectors.toList()), Matchers.hasItem(100));
    }

    @Test
    public void testStreamTopic() {
        MultipleStreamSource ofMultiple = StreamSource.ofMultiple();
        Stream stream = ofMultiple.stream();
        ofMultiple.getInput().offer(10);
        ofMultiple.getInput().close();
        Assert.assertThat(stream.collect(Collectors.toList()), Matchers.hasItem(10));
    }

    @Test
    public void testSeqTopic() {
        PushableReactiveSeq reactiveSeq = StreamSource.ofUnbounded().reactiveSeq();
        reactiveSeq.getInput().offer(10);
        reactiveSeq.getInput().close();
        Assert.assertThat(reactiveSeq.getStream().collect(Collectors.toList()), Matchers.hasItem(10));
    }

    @Test
    public void testMultiple() {
        MultipleStreamSource ofMultiple = StreamSource.ofMultiple();
        FutureStream futureStream = ofMultiple.futureStream(new LazyReact());
        ReactiveSeq reactiveSeq = ofMultiple.reactiveSeq();
        Stream stream = ofMultiple.stream();
        ofMultiple.getInput().offer(100);
        ofMultiple.getInput().close();
        TreeSet treeSet = new TreeSet();
        treeSet.getClass();
        futureStream.forEach((v1) -> {
            r1.add(v1);
        });
        treeSet.getClass();
        reactiveSeq.forEach((v1) -> {
            r1.add(v1);
        });
        treeSet.getClass();
        stream.forEach((v1) -> {
            r1.add(v1);
        });
        Assert.assertThat(Sets.newSet(new Integer[]{100}), Matchers.is(treeSet));
    }

    @Test(expected = IllegalArgumentException.class)
    public void testWithBackPressureNegativeAfterButOn() {
        PushableFutureStream futureStream = StreamSource.of(-10).futureStream(new LazyReact());
        futureStream.getInput().add(100);
        futureStream.getInput().close();
        Assert.assertThat(futureStream.getStream().collect(Collectors.toList()), Matchers.hasItem(100));
    }

    @Test
    public void stackoverflow() {
        Queue build = QueueFactories.unboundedQueue().build();
        new Thread(() -> {
            for (int i = 0; i < 2000; i++) {
                build.add("New message " + System.currentTimeMillis());
            }
            build.close();
        }).start();
        System.out.println("setup ");
        build.stream().forEach(str -> {
            int i = this.i;
            this.i = i + 1;
            if (i % 1000 == 0) {
                System.out.println(str);
            }
        });
        if (build.isOpen()) {
            Assert.fail("Should block!");
        }
    }
}
