package cyclops.futurestream.react.lazy;

import com.oath.cyclops.async.QueueFactories;
import com.oath.cyclops.async.adapters.Queue;
import com.oath.cyclops.react.ThreadPools;
import com.oath.cyclops.react.async.subscription.Subscription;
import cyclops.futurestream.FutureStream;
import cyclops.futurestream.LazyReact;
import cyclops.stream.StreamSource;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import org.junit.Ignore;
import org.junit.Test;

@Ignore
/* loaded from: input_file:cyclops/futurestream/react/lazy/BatchingInvestigationsTest.class */
public class BatchingInvestigationsTest {
    @Test
    public void streamBatch() {
        Queue build = QueueFactories.unboundedQueue().build();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                build.offer("New message " + i);
                sleep(10000);
            }
            build.close();
        }).start();
        long nanos = TimeUnit.MILLISECONDS.toNanos(500L);
        FutureStream async = ((FutureStream) build.streamBatch(new Subscription(), biFunction -> {
            return () -> {
                ArrayList arrayList = new ArrayList();
                long nanoTime = System.nanoTime();
                while (arrayList.size() < 10 && System.nanoTime() - nanoTime < nanos) {
                    try {
                        String str = (String) biFunction.apply(1L, TimeUnit.MILLISECONDS);
                        if (str != null) {
                            arrayList.add(str);
                        }
                    } catch (Queue.QueueTimeoutException e) {
                    }
                }
                if (arrayList.size() > 0) {
                    System.out.println("Result " + arrayList);
                }
                System.nanoTime();
                return arrayList;
            };
        }).filter(collection -> {
            return collection.size() > 0;
        }).to(reactiveSeq -> {
            return new LazyReact(ThreadPools.getSequential()).fromStream(reactiveSeq);
        })).async();
        PrintStream printStream = System.out;
        printStream.getClass();
        async.peek((v1) -> {
            r1.println(v1);
        }).run();
        while (true) {
        }
    }

    @Test
    public void batchIssue() throws InterruptedException {
        Queue build = QueueFactories.unboundedQueue().build();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                build.offer("New message " + i);
                sleep(10000);
            }
        }).start();
        FutureStream async = ((FutureStream) build.stream().groupedBySizeAndTime(10, 500L, TimeUnit.MILLISECONDS).to(reactiveSeq -> {
            return new LazyReact(ThreadPools.getSequential()).fromStream(reactiveSeq);
        })).async();
        PrintStream printStream = System.out;
        printStream.getClass();
        async.peek((v1) -> {
            r1.println(v1);
        }).run();
        while (true) {
        }
    }

    @Test
    public void batchIssueStreamSource() throws InterruptedException {
        Queue build = QueueFactories.unboundedQueue().build();
        new Thread(() -> {
            while (true) {
                sleep(1000);
                build.offer("New message " + System.currentTimeMillis());
            }
        }).start();
        StreamSource.futureStream(build, new LazyReact(ThreadPools.getSequential())).groupedBySizeAndTime(10, 500L, TimeUnit.MILLISECONDS).forEach(vector -> {
            System.out.println(vector + " Batch Time:" + System.currentTimeMillis());
        });
        while (true) {
        }
    }

    @Test
    public void groupedBySizeAndTimeNoQueue() {
        new LazyReact().generate(() -> {
            sleep(1000);
            return "New message " + System.currentTimeMillis();
        }).map(str -> {
            return str + "  t " + Thread.currentThread().getId();
        }).groupedBySizeAndTime(10, 500L, TimeUnit.MILLISECONDS).elapsed().forEach(tuple2 -> {
            System.out.println(tuple2 + " Batch Time:" + System.currentTimeMillis());
        });
    }

    private boolean sleep(int i) {
        try {
            Thread.sleep(i);
            return true;
        } catch (InterruptedException e) {
            e.printStackTrace();
            return true;
        }
    }
}
