package com.oath.cyclops.async.adapters;

import com.oath.cyclops.async.QueueFactories;
import com.oath.cyclops.types.futurestream.BaseSimpleReactStream;
import com.oath.cyclops.types.futurestream.SimpleReactStream;
import cyclops.data.Seq;
import cyclops.futurestream.LazyReact;
import cyclops.futurestream.SimpleReact;
import cyclops.reactive.ReactiveSeq;
import cyclops.reactive.collections.mutable.ListX;
import java.io.PrintStream;
import java.util.List;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

/* loaded from: input_file:com/oath/cyclops/async/adapters/QueueTest.class */
public class QueueTest {
    private final AtomicInteger found = new AtomicInteger(0);
    volatile boolean success = false;
    volatile int count = 0;
    volatile int count1 = 10000;
    boolean called = false;

    @Before
    public void setup() {
        this.found.set(0);
    }

    @Test
    public void batchBySizeAndTimeSizeCollection() {
        Queue build = QueueFactories.boundedQueue(10).build();
        build.fromStream(ReactiveSeq.of(new Integer[]{1, 2, 3, 4, 5, 6}));
        build.add(1);
        build.add(2);
        build.close();
        Assert.assertThat(Integer.valueOf(((Seq) build.streamGroupedBySizeAndTime(3, 10L, TimeUnit.SECONDS).toList().get(0)).size()), Matchers.is(3));
    }

    @Test
    public void batchByTime() {
        Queue build = QueueFactories.boundedQueue(10).build();
        build.fromStream(ReactiveSeq.of(new Integer[]{1, 2, 3, 4, 5, 6}));
        build.add(1);
        build.add(2);
        build.close();
        Assert.assertThat(Integer.valueOf(((Seq) build.streamGroupedByTime(1L, TimeUnit.SECONDS).toList().get(0)).size()), Matchers.is(8));
    }

    @Test
    public void parallelStreamClose() {
        System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", String.valueOf(Runtime.getRuntime().availableProcessors() * 2));
        for (int i = 0; i < 100; i++) {
            System.gc();
            System.out.println(i);
            Queue build = QueueFactories.boundedQueue(5).build();
            for (int i2 = 0; i2 < 10; i2++) {
                build.add(Integer.valueOf(i2));
            }
            new Thread(() -> {
                while (!build.isOpen()) {
                    System.out.println("Queue isn't open yet!");
                }
                System.err.println("Closing " + build.close());
                for (int i3 = 0; i3 < 100; i3++) {
                    try {
                        Thread.sleep(100L);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    build.disconnectStreams(1000);
                }
            }).start();
            ((Stream) build.jdkStream(100).parallel()).forEach(num -> {
                System.out.println(num);
            });
            System.out.println("done " + i);
        }
    }

    @Test
    public void parallelStreamCloseNoData() {
        System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", String.valueOf(Runtime.getRuntime().availableProcessors() * 16));
        for (int i = 0; i < 1000; i++) {
            System.out.println(i);
            Queue build = QueueFactories.boundedQueue(5000).build();
            new Thread(() -> {
                while (!build.isOpen()) {
                    System.out.println("Queue isn't open yet!");
                }
                System.err.println("Closing " + build.close());
            }).start();
            ((Stream) build.jdkStream().parallel()).forEach(num -> {
                System.out.println(num);
            });
            System.out.println("done " + i);
        }
    }

    @Test
    public void closedParallelStreamTimeout() {
        Queue withTimeout = QueueFactories.boundedQueue(100).build().withTimeout(1);
        for (int i = 0; i < 1000; i++) {
            withTimeout.add(Integer.valueOf(i));
        }
        withTimeout.close();
        Assert.assertThat(Integer.valueOf(((List) ((Stream) withTimeout.jdkStream().parallel()).collect(Collectors.toList())).size()), CoreMatchers.equalTo(100));
    }

    @Test
    public void closedParallelStream() {
        Queue withTimeout = QueueFactories.boundedQueue(100).build().withTimeout(1);
        for (int i = 0; i < 1000; i++) {
            withTimeout.add(Integer.valueOf(i));
        }
        withTimeout.close();
        Stream stream = (Stream) withTimeout.jdkStream(100).parallel();
        PrintStream printStream = System.out;
        printStream.getClass();
        stream.forEach((v1) -> {
            r1.println(v1);
        });
    }

    @Test
    public void parallelStream() {
        this.success = false;
        AtomicLong atomicLong = new AtomicLong(Thread.currentThread().getId());
        Queue build = QueueFactories.boundedQueue(2000).build();
        for (int i = 0; i < 10000; i++) {
            build.add(Integer.valueOf(i));
        }
        System.out.println(" queue " + build.size());
        System.out.println(atomicLong.get());
        Stream stream = (Stream) build.jdkStream().parallel();
        PrintStream printStream = System.out;
        printStream.getClass();
        Stream peek = stream.peek((v1) -> {
            r1.println(v1);
        }).peek(num -> {
            System.out.println(Thread.currentThread().getId());
            if (atomicLong.get() != Thread.currentThread().getId()) {
                System.out.println("closing");
                this.success = true;
                build.close();
            }
        }).peek(num2 -> {
            System.out.println(Thread.currentThread().getId());
        });
        PrintStream printStream2 = System.out;
        printStream2.getClass();
        peek.forEach((v1) -> {
            r1.println(v1);
        });
        Assert.assertTrue(this.success);
    }

    @Test
    public void parallelStreamSmallBounds() {
        for (int i = 0; i < 10; i++) {
            System.out.println("Run  " + i);
            this.success = false;
            AtomicLong atomicLong = new AtomicLong(Thread.currentThread().getId());
            Queue build = QueueFactories.boundedQueue(100).build();
            for (int i2 = 0; i2 < 10000; i2++) {
                build.add(Integer.valueOf(i2));
            }
            System.out.println(" queue " + build.size());
            System.out.println(atomicLong.get());
            Stream stream = (Stream) build.jdkStream().parallel();
            PrintStream printStream = System.out;
            printStream.getClass();
            Stream peek = stream.peek((v1) -> {
                r1.println(v1);
            }).peek(num -> {
                System.out.println(Thread.currentThread().getId());
                if (atomicLong.get() != Thread.currentThread().getId()) {
                    System.out.println("closing");
                    this.success = true;
                    build.close();
                }
            }).peek(num2 -> {
                System.out.println(Thread.currentThread().getId());
            });
            PrintStream printStream2 = System.out;
            printStream2.getClass();
            peek.forEach((v1) -> {
                r1.println(v1);
            });
            Assert.assertTrue(this.success);
        }
    }

    @Test
    public void closeQueue() {
        Queue build = QueueFactories.boundedQueue(100).build();
        build.add(1);
        new Thread(() -> {
            build.close();
        }).run();
        ReactiveSeq stream = build.stream();
        PrintStream printStream = System.out;
        printStream.getClass();
        stream.forEach((v1) -> {
            r1.println(v1);
        });
    }

    @Test
    public void backPressureTest() {
        Queue queue = new Queue(new LinkedBlockingQueue(2));
        new SimpleReact().ofAsync(new Supplier[]{() -> {
            queue.offer(1);
            return Integer.valueOf(this.found.getAndAdd(1));
        }, () -> {
            queue.offer(1);
            return Integer.valueOf(this.found.getAndAdd(1));
        }, () -> {
            queue.offer(6);
            return Integer.valueOf(this.found.getAndAdd(1));
        }, () -> {
            queue.offer(5);
            return Integer.valueOf(this.found.getAndAdd(1));
        }});
        sleep(10);
        Assert.assertThat(Integer.valueOf(this.found.get()), Matchers.is(2));
        Assert.assertThat(Integer.valueOf(((List) queue.stream().limit(2L).collect(Collectors.toList())).size()), Matchers.is(2));
        Assert.assertThat(Integer.valueOf(((List) queue.stream().limit(2L).collect(Collectors.toList())).size()), Matchers.is(2));
        sleep(10);
        Assert.assertThat(Integer.valueOf(this.found.get()), Matchers.is(4));
    }

    @Test
    public void backPressureJDKTest() {
        Queue queue = new Queue(new LinkedBlockingQueue(2));
        new SimpleReact().ofAsync(new Supplier[]{() -> {
            Stream.of((Object[]) new String[]{"1", "2", "3", "4"}).forEach(str -> {
                queue.offer(str);
                this.found.getAndAdd(1);
            });
            return 1;
        }});
        sleep(10);
        Assert.assertThat(Integer.valueOf(this.found.get()), Matchers.is(2));
        Assert.assertThat(Integer.valueOf(((List) queue.stream().limit(2L).collect(Collectors.toList())).size()), Matchers.is(2));
        Assert.assertThat(Integer.valueOf(((List) queue.stream().limit(2L).collect(Collectors.toList())).size()), Matchers.is(2));
        sleep(10);
        Assert.assertThat(Integer.valueOf(this.found.get()), Matchers.is(4));
    }

    @Test
    public void backPressureTimeoutTestVeryLow() {
        Queue withOfferTimeUnit = new Queue(new LinkedBlockingQueue(2)).withOfferTimeout(1L).withOfferTimeUnit(TimeUnit.MICROSECONDS);
        Set set = (Set) new SimpleReact().ofAsync(new Supplier[]{() -> {
            return offerAndIncrementFound(withOfferTimeUnit);
        }, () -> {
            return offerAndIncrementFound(withOfferTimeUnit);
        }, () -> {
            return offerAndIncrementFound(withOfferTimeUnit);
        }, () -> {
            return offerAndIncrementFound(withOfferTimeUnit);
        }}).block(Collectors.toSet());
        sleep(10);
        Assert.assertThat(Integer.valueOf(this.found.get()), Matchers.is(4));
        Assert.assertThat(Integer.valueOf(set.size()), Matchers.is(2));
        Assert.assertThat(set, Matchers.hasItem(false));
    }

    @Test
    public void backPressureTimeoutTestVeryHigh() {
        Queue withOfferTimeUnit = new Queue(new LinkedBlockingQueue(2)).withOfferTimeout(1L).withOfferTimeUnit(TimeUnit.DAYS);
        SimpleReactStream ofAsync = new SimpleReact().ofAsync(new Supplier[]{() -> {
            return offerAndIncrementFound(withOfferTimeUnit);
        }, () -> {
            return offerAndIncrementFound(withOfferTimeUnit);
        }, () -> {
            return offerAndIncrementFound(withOfferTimeUnit);
        }, () -> {
            return offerAndIncrementFound(withOfferTimeUnit);
        }});
        sleep(10);
        Assert.assertThat(Integer.valueOf(this.found.get()), Matchers.is(2));
        Assert.assertThat(Integer.valueOf(((List) withOfferTimeUnit.stream().limit(4L).collect(Collectors.toList())).size()), Matchers.is(4));
        Set set = (Set) ofAsync.block(Collectors.toSet());
        Assert.assertThat(Integer.valueOf(this.found.get()), Matchers.is(4));
        Assert.assertThat(Integer.valueOf(set.size()), Matchers.is(1));
        Assert.assertThat(set, Matchers.not(Matchers.hasItem(false)));
    }

    private Boolean offerAndIncrementFound(Queue<Integer> queue) {
        boolean offer = queue.offer(1);
        this.found.getAndAdd(1);
        return Boolean.valueOf(offer);
    }

    @Test
    public void testAdd() {
        for (int i = 0; i < 1000; i++) {
            this.found.set(0);
            Queue queue = new Queue(new LinkedBlockingQueue(2));
            new SimpleReact().ofAsync(new Supplier[]{() -> {
                queue.add(1);
                return Integer.valueOf(this.found.getAndAdd(1));
            }, () -> {
                queue.add(1);
                return Integer.valueOf(this.found.getAndAdd(1));
            }, () -> {
                queue.add(6);
                return Integer.valueOf(this.found.getAndAdd(1));
            }, () -> {
                queue.add(5);
                return Integer.valueOf(this.found.getAndAdd(1));
            }}).block();
            Assert.assertThat(Integer.valueOf(this.found.get()), Matchers.is(4));
        }
    }

    @Test
    public void testAddFull() {
        Queue queue = new Queue(new LinkedBlockingQueue(2));
        Assert.assertTrue(queue.add(1));
        Assert.assertTrue(queue.add(2));
        Assert.assertFalse(queue.add(3));
    }

    @Test
    public void enqueueTest() {
        Stream of = Stream.of((Object[]) new String[]{"1", "2", "3"});
        Queue queue = new Queue(new LinkedBlockingQueue());
        queue.fromStream(of);
        Assert.assertThat((Integer) queue.stream().limit(3L).map(str -> {
            return Integer.valueOf(str);
        }).reduce(0, (num, num2) -> {
            return Integer.valueOf(num.intValue() + num2.intValue());
        }), Matchers.is(6));
    }

    @Test
    public void simpleMergingTestLazyIndividualMerge() {
        Queue queue = new Queue(new LinkedBlockingQueue());
        queue.offer(0);
        queue.offer(100000);
        List list = (List) queue.stream().limit(2L).peek(num -> {
            System.out.println(num);
        }).collect(Collectors.toList());
        Assert.assertThat(list, Matchers.hasItem(100000));
        Assert.assertThat(list, Matchers.hasItem(0));
    }

    @Test
    @Ignore
    public void mergingTestLazyIndividualMerge() {
        this.count = 0;
        this.count1 = 100000;
        Queue queue = new Queue(new LinkedBlockingQueue());
        LazyReact.parallelBuilder().generateAsync(() -> {
            int i = this.count;
            this.count = i + 1;
            return Integer.valueOf(i);
        }).then(num -> {
            return Boolean.valueOf(queue.offer(num));
        }).runThread(new Thread());
        LazyReact.parallelBuilder().generateAsync(() -> {
            int i = this.count1;
            this.count1 = i + 1;
            return Integer.valueOf(i);
        }).then(num2 -> {
            return Boolean.valueOf(queue.offer(num2));
        }).runThread(new Thread());
        List list = (List) queue.stream().limit(1000L).peek(num3 -> {
            System.out.println(num3);
        }).collect(Collectors.toList());
        Assert.assertThat(list, Matchers.hasItem(100000));
        Assert.assertThat(list, Matchers.hasItem(0));
    }

    @Test
    public void simpleMergingTestEagerStreamMerge() {
        Queue queue = new Queue(new LinkedBlockingQueue());
        queue.offer(0);
        queue.offer(100000);
        List list = (List) queue.stream().limit(2L).peek(num -> {
            System.out.println(num);
        }).collect(Collectors.toList());
        Assert.assertThat(list, Matchers.hasItem(100000));
        Assert.assertThat(list, Matchers.hasItem(0));
    }

    @Test
    @Ignore
    public void mergingTestEagerStreamMerge() {
        this.count = 0;
        this.count1 = 100000;
        Queue queue = new Queue(new LinkedBlockingQueue());
        new SimpleReact().ofAsync(new Supplier[]{() -> {
            return Boolean.valueOf(queue.fromStream(Stream.generate(() -> {
                int i = this.count;
                this.count = i + 1;
                return Integer.valueOf(i);
            })));
        }});
        new SimpleReact().ofAsync(new Supplier[]{() -> {
            return Boolean.valueOf(queue.fromStream(Stream.generate(() -> {
                int i = this.count1;
                this.count1 = i + 1;
                return Integer.valueOf(i);
            })));
        }});
        List list = (List) queue.stream().limit(1000L).peek(num -> {
            System.out.println(num);
        }).collect(Collectors.toList());
        Assert.assertThat(list, Matchers.hasItem(100000));
        Assert.assertThat(list, Matchers.hasItem(0));
    }

    @Test
    public void queueTestBlock() {
        try {
            Queue queue = new Queue(new LinkedBlockingQueue());
            new SimpleReact().ofAsync(new Supplier[]{() -> {
                return Boolean.valueOf(queue.offer(1));
            }, () -> {
                return Boolean.valueOf(queue.offer(2));
            }, () -> {
                sleep(50);
                return Boolean.valueOf(queue.offer(4));
            }, () -> {
                sleep(400);
                queue.close();
                return 1;
            }});
            BaseSimpleReactStream.parallel(new Object[0]).fromStream(queue.streamCompletableFutures()).then(obj -> {
                return "*" + obj;
            }).peek(obj2 -> {
                this.found.getAndAdd(1);
            }).peek(obj3 -> {
                System.out.println(obj3);
            }).block();
        } finally {
            Assert.assertThat(Integer.valueOf(this.found.get()), Matchers.is(Integer.valueOf(3)));
        }
    }

    @Test
    public void queueTestTimeout() {
        Queue withTimeUnit = new Queue(new LinkedBlockingQueue()).withTimeout(1).withTimeUnit(TimeUnit.MILLISECONDS);
        new SimpleReact().ofAsync(new Supplier[]{() -> {
            return Boolean.valueOf(withTimeUnit.offer(1));
        }, () -> {
            return Boolean.valueOf(withTimeUnit.offer(2));
        }, () -> {
            sleep(500);
            return Boolean.valueOf(withTimeUnit.offer(4));
        }, () -> {
            return Boolean.valueOf(withTimeUnit.offer(5));
        }, () -> {
            sleep(1000);
            return Boolean.valueOf(withTimeUnit.close());
        }});
        ListX block = BaseSimpleReactStream.parallel(new Object[0]).fromStream(withTimeUnit.stream()).then(num -> {
            return "*" + num;
        }).block();
        Assert.assertThat(Integer.valueOf(block.size()), CoreMatchers.equalTo(4));
        Assert.assertThat(block, Matchers.hasItem("*5"));
    }

    @Test
    public void queueTestRun() {
        try {
            Queue queue = new Queue(new LinkedBlockingQueue());
            new SimpleReact().ofAsync(new Supplier[]{() -> {
                return Boolean.valueOf(queue.offer(1));
            }, () -> {
                return Boolean.valueOf(queue.offer(2));
            }, () -> {
                sleep(20);
                return Boolean.valueOf(queue.offer(4));
            }, () -> {
                sleep(400);
                queue.close();
                return 1;
            }});
            Assert.assertThat(BaseSimpleReactStream.parallel(new Object[0]).fromStream(queue.stream()).then(num -> {
                return "*" + num;
            }).peek(str -> {
                this.found.getAndAdd(1);
            }).peek(str2 -> {
                System.out.println(str2);
            }).block(), Matchers.hasItem("*1"));
        } finally {
            Assert.assertThat(Integer.valueOf(this.found.get()), Matchers.is(Integer.valueOf(3)));
        }
    }

    @Test
    public void stackOverflowQuestion() {
        this.called = false;
        Queue build = QueueFactories.unboundedQueue().build();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                build.add("New message " + System.currentTimeMillis());
            }
            build.close();
        }).start();
        build.stream().peek(this::called).forEach(str -> {
            System.out.println(str);
        });
        Assert.assertTrue(this.called);
    }

    private void called(String str) {
        this.called = true;
    }

    private int sleep(int i) {
        try {
            Thread.sleep(i);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return i;
    }
}
