package com.oath.cyclops.async.adapters;

import com.oath.cyclops.types.futurestream.BaseSimpleReactStream;
import com.oath.cyclops.types.futurestream.SimpleReactStream;
import cyclops.futurestream.SimpleReact;
import cyclops.reactive.ReactiveSeq;
import cyclops.reactive.collections.mutable.ListX;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

/* loaded from: input_file:com/oath/cyclops/async/adapters/TopicTest.class */
public class TopicTest {
    int count = 0;
    int count1 = 100000;

    @Before
    public void setup() {
        this.count = 0;
        this.count1 = 100000;
    }

    @Test
    public void multipleSubscribersGetSameMessages() {
        Topic topic = new Topic(new Queue());
        Stream of = Stream.of((Object[]) new String[]{"hello", "world"});
        ReactiveSeq stream = topic.stream();
        ReactiveSeq stream2 = topic.stream();
        topic.fromStream(of);
        Assert.assertThat(stream.limit(1L).findFirst().get(), Matchers.is("hello"));
        Assert.assertThat(stream2.limit(2L).reduce("", (str, str2) -> {
            return str + ' ' + str2;
        }), Matchers.is(" hello world"));
    }

    @Test
    public void multipleSubscribersGetSameMessagesSimpleReact() throws InterruptedException, ExecutionException {
        Topic topic = new Topic(new Queue());
        Stream of = Stream.of((Object[]) new String[]{"hello", "world"});
        SimpleReactStream ofAsync = new SimpleReact(new ForkJoinPool(2)).ofAsync(new Supplier[]{() -> {
            return BaseSimpleReactStream.parallel(new Object[0]).fromStream(topic.stream()).then(str -> {
                return str + "*";
            }).block();
        }, () -> {
            return (Set) BaseSimpleReactStream.parallel(new Object[0]).fromStream(topic.stream()).then(str -> {
                return str + "!";
            }).peek(str2 -> {
                sleep(10);
            }).block(Collectors.toSet());
        }});
        sleep(50);
        topic.fromStream(of);
        sleep(400);
        topic.close();
        ListX block = ofAsync.block();
        Assert.assertThat(block.get(0), Matchers.instanceOf(List.class));
        Assert.assertThat(block.get(0), Matchers.hasItem("hello*"));
        Assert.assertThat(block.get(0), Matchers.hasItem("world*"));
        Assert.assertThat(block.get(1), Matchers.instanceOf(HashSet.class));
        Assert.assertThat(block.get(1), Matchers.hasItem("hello!"));
        Assert.assertThat(block.get(1), Matchers.hasItem("world!"));
    }

    @Test
    @Ignore
    public void mergingAndSplitting() {
        Topic topic = new Topic();
        Stream stream = topic.stream();
        Stream stream2 = topic.stream();
        new SimpleReact().ofAsync(new Supplier[]{() -> {
            return Boolean.valueOf(topic.fromStream(Stream.generate(() -> {
                int i = this.count;
                this.count = i + 1;
                return Integer.valueOf(i);
            })));
        }});
        new SimpleReact().ofAsync(new Supplier[]{() -> {
            return Boolean.valueOf(topic.fromStream(Stream.generate(() -> {
                int i = this.count1;
                this.count1 = i + 1;
                return Integer.valueOf(i);
            })));
        }});
        Iterator it = Arrays.asList(stream, stream2).iterator();
        while (it.hasNext()) {
            List list = (List) ((Stream) it.next()).limit(1000L).peek(num -> {
                System.out.println(num);
            }).collect(Collectors.toList());
            Assert.assertThat(list, Matchers.hasItem(100000));
            Assert.assertThat(list, Matchers.hasItem(0));
        }
    }

    @Test
    public void simpleMergingAndSplitting() {
        Topic topic = new Topic();
        Stream stream = topic.stream();
        Stream stream2 = topic.stream();
        topic.offer(Integer.valueOf(this.count));
        topic.offer(Integer.valueOf(this.count1));
        Iterator it = Arrays.asList(stream, stream2).iterator();
        while (it.hasNext()) {
            List list = (List) ((Stream) it.next()).limit(2L).peek(num -> {
                System.out.println(num);
            }).collect(Collectors.toList());
            Assert.assertThat(list, Matchers.hasItem(100000));
            Assert.assertThat(list, Matchers.hasItem(0));
        }
    }

    @Test
    @Ignore
    public void mergingAndSplittingSimpleReact() {
        Topic topic = new Topic();
        SimpleReactStream ofAsync = new SimpleReact(new ForkJoinPool(2)).ofAsync(new Supplier[]{() -> {
            return (List) BaseSimpleReactStream.parallel(new Object[0]).fromStream(topic.streamCompletableFutures()).then(completableFuture -> {
                return completableFuture + "*";
            }).block(Collectors.toList());
        }, () -> {
            return (Set) BaseSimpleReactStream.parallel(new Object[0]).fromStream(topic.streamCompletableFutures()).then(completableFuture -> {
                return completableFuture + "!";
            }).block(Collectors.toSet());
        }});
        sleep(50);
        new SimpleReact(new ForkJoinPool(1)).ofAsync(new Supplier[]{() -> {
            return Boolean.valueOf(topic.fromStream(Stream.generate(() -> {
                int i = this.count;
                this.count = i + 1;
                return Integer.valueOf(i);
            })));
        }});
        new SimpleReact(new ForkJoinPool(1)).ofAsync(new Supplier[]{() -> {
            return Boolean.valueOf(topic.fromStream(Stream.generate(() -> {
                int i = this.count1;
                this.count1 = i + 1;
                return Integer.valueOf(i);
            })));
        }});
        sleep(40);
        System.out.println("Closing!");
        topic.close();
        System.out.println("Closed! Blocking..");
        ListX block = ofAsync.block();
        System.out.println("Completed " + block.size());
        Assert.assertThat(extract1(block), Matchers.hasItem("0*"));
        Assert.assertThat(extract1(block), Matchers.hasItem("100000*"));
        Assert.assertThat(extract2(block), Matchers.hasItem("0!"));
        Assert.assertThat(extract2(block), Matchers.hasItem("100000!"));
    }

    @Test
    public void simpleMergingAndSplittingSimpleReact() {
        Topic topic = new Topic();
        SimpleReactStream ofAsync = new SimpleReact(new ForkJoinPool(2)).ofAsync(new Supplier[]{() -> {
            return (List) BaseSimpleReactStream.parallel(new Object[0]).fromStream(topic.stream()).then(num -> {
                return num + "*";
            }).block(Collectors.toList());
        }, () -> {
            return (Set) BaseSimpleReactStream.parallel(new Object[0]).fromStream(topic.stream()).then(num -> {
                return num + "!";
            }).block(Collectors.toSet());
        }});
        sleep(50);
        topic.offer(Integer.valueOf(this.count));
        topic.offer(Integer.valueOf(this.count1));
        sleep(40);
        System.out.println("Closing!");
        topic.close();
        System.out.println("Closed! Blocking..");
        ListX block = ofAsync.block();
        System.out.println("Completed " + block.size());
        Assert.assertThat(extract1(block), Matchers.hasItem("0*"));
        Assert.assertThat(extract1(block), Matchers.hasItem("100000*"));
        Assert.assertThat(extract2(block), Matchers.hasItem("0!"));
        Assert.assertThat(extract2(block), Matchers.hasItem("100000!"));
    }

    @Test
    public void multipleQueues() {
        Topic topic = new Topic();
        topic.stream();
        topic.stream();
        Assert.assertThat(Integer.valueOf(topic.getDistributor().getSubscribers().size()), Matchers.is(2));
        Assert.assertThat(Integer.valueOf(topic.getStreamToQueue().size()), Matchers.is(2));
    }

    @Test
    public void disconnectStreams() {
        Topic topic = new Topic();
        ReactiveSeq stream = topic.stream();
        topic.stream();
        topic.disconnect(stream);
        Assert.assertThat(Integer.valueOf(topic.getDistributor().getSubscribers().size()), Matchers.is(1));
        Assert.assertThat(Integer.valueOf(topic.getStreamToQueue().size()), Matchers.is(1));
    }

    @Test
    public void disconnectAllStreams() {
        Topic topic = new Topic();
        ReactiveSeq stream = topic.stream();
        ReactiveSeq stream2 = topic.stream();
        topic.disconnect(stream);
        topic.disconnect(stream2);
        Assert.assertThat(Integer.valueOf(topic.getDistributor().getSubscribers().size()), Matchers.is(0));
        Assert.assertThat(Integer.valueOf(topic.getStreamToQueue().size()), Matchers.is(0));
    }

    @Test
    public void disconnectAllStreamsAndReconnect() {
        for (int i = 0; i < 100000; i++) {
            Topic topic = new Topic();
            ReactiveSeq stream = topic.stream();
            ReactiveSeq stream2 = topic.stream();
            topic.disconnect(stream);
            topic.disconnect(stream2);
            Assert.assertThat("" + topic.getDistributor().getSubscribers(), Integer.valueOf(topic.getDistributor().getSubscribers().size()), Matchers.is(0));
            Assert.assertThat(Integer.valueOf(topic.getStreamToQueue().size()), Matchers.is(0));
            topic.stream();
            Assert.assertThat(Integer.valueOf(topic.getDistributor().getSubscribers().size()), Matchers.is(1));
            Assert.assertThat(Integer.valueOf(topic.getStreamToQueue().size()), Matchers.is(1));
        }
    }

    private Collection<String> extract1(List<Collection<String>> list) {
        for (Collection<String> collection : list) {
            if (collection instanceof ArrayList) {
                return collection;
            }
        }
        return null;
    }

    private Collection<String> extract2(List<Collection<String>> list) {
        for (Collection<String> collection : list) {
            if (collection instanceof HashSet) {
                return collection;
            }
        }
        return null;
    }

    private int sleep(int i) {
        try {
            Thread.sleep(i);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return i;
    }
}
