package com.oath.cyclops.async.adapters;

import com.oath.cyclops.types.futurestream.BaseSimpleReactStream;
import cyclops.futurestream.SimpleReact;
import cyclops.reactive.ReactiveSeq;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:com/oath/cyclops/async/adapters/SignalTest.class */
public class SignalTest {
    int found = 0;

    @Before
    public void setup() {
        this.found = 0;
    }

    public synchronized int getFound() {
        return this.found;
    }

    public synchronized void incrementFound() {
        this.found++;
    }

    @Test
    public void signalFromStream() {
        Signal signal = Signal.topicBackedSignal();
        ReactiveSeq limit = signal.getDiscrete().stream().limit(2L);
        signal.fromStream(Stream.of((Object[]) new Integer[]{1, 1, 1, 2, 2}));
        Assert.assertThat(Integer.valueOf(((Integer) limit.map(num -> {
            return Integer.valueOf(num.intValue() * 100);
        }).reduce(0, (num2, num3) -> {
            return Integer.valueOf(num2.intValue() + num3.intValue());
        })).intValue()), Matchers.is(300));
    }

    @Test
    public void signalDiscrete3() {
        try {
            Signal queueBackedSignal = Signal.queueBackedSignal();
            new SimpleReact().ofAsync(new Supplier[]{() -> {
                return (Integer) queueBackedSignal.set(1);
            }, () -> {
                return (Integer) queueBackedSignal.set(2);
            }, () -> {
                sleep(20);
                return (Integer) queueBackedSignal.set(4);
            }, () -> {
                sleep(400);
                queueBackedSignal.getDiscrete().close();
                return 1;
            }});
            BaseSimpleReactStream.parallel(new Object[0]).fromStream(queueBackedSignal.getDiscrete().streamCompletableFutures()).then(completableFuture -> {
                return "*" + completableFuture;
            }).peek(str -> {
                incrementFound();
            }).peek(str2 -> {
                System.out.println(str2);
            }).block();
        } finally {
            Assert.assertThat(Integer.valueOf(this.found), Matchers.is(Integer.valueOf(3)));
        }
    }

    @Test
    public void signalDiscrete1() {
        for (int i = 0; i < 100; i++) {
            resetFound();
            try {
                Signal queueBackedSignal = Signal.queueBackedSignal();
                new SimpleReact().ofAsync(new Supplier[]{() -> {
                    return (Integer) queueBackedSignal.set(1);
                }, () -> {
                    return (Integer) queueBackedSignal.set(1);
                }, () -> {
                    sleep(200);
                    return (Integer) queueBackedSignal.set(1);
                }, () -> {
                    sleep(40);
                    queueBackedSignal.close();
                    return 1;
                }});
                BaseSimpleReactStream.parallel(new Object[0]).fromStreamOfFutures(queueBackedSignal.getDiscrete().streamCompletableFutures()).then(num -> {
                    return "*" + num;
                }).peek(str -> {
                    incrementFound();
                }).peek(str2 -> {
                    System.out.println(str2);
                }).block();
            } finally {
                Assert.assertThat(Integer.valueOf(this.found), Matchers.is(Integer.valueOf(1)));
            }
        }
    }

    private synchronized void resetFound() {
        this.found = 0;
    }

    @Test
    public void signalContinuous3() {
        for (int i = 0; i < 10; i++) {
            System.out.println(i);
            resetFound();
            try {
                Signal queueBackedSignal = Signal.queueBackedSignal();
                new SimpleReact().ofAsync(new Supplier[]{() -> {
                    return (Integer) queueBackedSignal.set(1);
                }, () -> {
                    return (Integer) queueBackedSignal.set(1);
                }, () -> {
                    sleep(1);
                    return (Integer) queueBackedSignal.set(1);
                }, () -> {
                    sleep(150);
                    queueBackedSignal.close();
                    return 1;
                }});
                BaseSimpleReactStream.parallel(new Object[0]).fromStream(queueBackedSignal.getContinuous().streamCompletableFutures()).then(completableFuture -> {
                    return "*" + completableFuture;
                }).peek(str -> {
                    incrementFound();
                }).peek(str2 -> {
                    System.out.println(str2);
                }).block();
            } finally {
                Assert.assertThat(Integer.valueOf(getFound()), Matchers.is(Integer.valueOf(3)));
            }
        }
    }

    @Test
    public void testDiscreteMultipleStreamsQueue() {
        Signal queueBackedSignal = Signal.queueBackedSignal();
        queueBackedSignal.set(1);
        queueBackedSignal.set(2);
        queueBackedSignal.getDiscrete().stream().limit(1L);
        queueBackedSignal.getDiscrete().stream().limit(1L);
    }

    @Test
    public void testContinuousMultipleStreamsQueue() {
        Signal queueBackedSignal = Signal.queueBackedSignal();
        queueBackedSignal.set(1);
        queueBackedSignal.set(2);
        queueBackedSignal.getContinuous().stream().limit(1L);
        queueBackedSignal.getContinuous().stream().limit(1L);
    }

    @Test
    public void testDiscreteMultipleStreamsTopic() {
        Signal signal = Signal.topicBackedSignal();
        signal.set(1);
        signal.set(2);
        signal.getDiscrete().stream().limit(1L);
        signal.getDiscrete().stream().limit(1L);
    }

    @Test
    public void testContinuousMultipleStreamsTopic() {
        Signal signal = Signal.topicBackedSignal();
        signal.set(1);
        signal.set(2);
        signal.getContinuous().stream().limit(1L);
        signal.getContinuous().stream().limit(1L);
    }

    private void sleep(int i) {
        try {
            Thread.sleep(i);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
