package com.littlesaints.protean.functions.streams;

import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedTransferQueue;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Stream;

/* loaded from: input_file:com/littlesaints/protean/functions/streams/ForkJoin.class */
public class ForkJoin<T, Q> implements Consumer<T>, AutoCloseable {
    private final Supplier<Q> exchangeProvider;
    private final Function<Q, Supplier<T>> exchangeReaderProvider;
    private final Function<Q, Consumer<T>> exchangeWriterProvider;
    private final String name = String.valueOf(System.currentTimeMillis());
    private final ExecutorService executor = Executors.newCachedThreadPool();
    private final Map<Predicate<T>, Consumer<T>> predicates = new LinkedHashMap(1);
    private final Collection<Future<?>> forks = new ArrayList(2);
    private final Collection<Stream<T>> streams = new ArrayList(2);
    private final Consumer<T> acceptAction = obj -> {
        ((Stream) this.predicates.entrySet().stream().sequential()).filter(entry -> {
            return ((Predicate) entry.getKey()).test(obj);
        }).map((v0) -> {
            return v0.getValue();
        }).forEach(consumer -> {
            consumer.accept(obj);
        });
    };

    private ForkJoin(Supplier<Q> supplier, Function<Q, Supplier<T>> function, Function<Q, Consumer<T>> function2) {
        this.exchangeProvider = supplier;
        this.exchangeReaderProvider = function;
        this.exchangeWriterProvider = function2;
    }

    public static <T> ForkJoin<T, BlockingQueue<T>> newInstance() {
        return of(LinkedTransferQueue::new);
    }

    public static <T> ForkJoin<T, BlockingQueue<T>> of(Supplier<BlockingQueue<T>> supplier) {
        return of(supplier, blockingQueue -> {
            return () -> {
                while (true) {
                    try {
                        return blockingQueue.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
        }, blockingQueue2 -> {
            return obj -> {
                while (true) {
                    try {
                        blockingQueue2.put(obj);
                        return;
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
        });
    }

    private static <T, Q> ForkJoin<T, Q> of(Supplier<Q> supplier, Function<Q, Supplier<T>> function, Function<Q, Consumer<T>> function2) {
        return new ForkJoin<>(supplier, function, function2);
    }

    public ForkJoin<T, Q> fork(Predicate<T> predicate, Consumer<Stream<T>> consumer) {
        Q q = this.exchangeProvider.get();
        Stream<T> stream = StreamSource.of(this.exchangeReaderProvider.apply(q)).get();
        this.streams.add(stream);
        this.predicates.put(predicate, this.exchangeWriterProvider.apply(q));
        this.forks.add(this.executor.submit(() -> {
            consumer.accept(stream);
        }));
        return this;
    }

    public ForkJoin<T, Q> fork(Consumer<Stream<T>> consumer) {
        fork(obj -> {
            return true;
        }, consumer);
        return this;
    }

    @Override // java.util.function.Consumer
    public void accept(T t) {
        this.acceptAction.accept(t);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.streams.forEach((v0) -> {
            v0.close();
        });
        this.forks.forEach(future -> {
            future.cancel(true);
        });
    }

    public String getName() {
        return this.name;
    }
}
