package io.reacted.flow;

import io.reacted.patterns.Try;
import io.reacted.streams.ReactedSubmissionPublisher;
import java.io.Serializable;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Optional;
import java.util.Spliterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Flow;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.ToDoubleFunction;
import java.util.function.ToIntFunction;
import java.util.function.ToLongFunction;
import java.util.stream.Collector;
import java.util.stream.DoubleStream;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;

/* loaded from: input_file:io/reacted/flow/SourceStream.class */
public class SourceStream<OutputT extends Serializable> extends StreamProxy<OutputT> {

    @Nullable
    private final SourceSubscriber<OutputT> sourceSubscriber;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/reacted/flow/SourceStream$SourceSubscriber.class */
    public static class SourceSubscriber<OutputT extends Serializable> implements Flow.Subscriber<OutputT> {
        private final BlockingQueue<OutputT> dataOutput = new LinkedBlockingQueue();
        private volatile boolean isTerminated = false;
        private Flow.Subscription subscription;

        private SourceSubscriber() {
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;
            requestNext();
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onNext(OutputT outputt) {
            this.dataOutput.add(outputt);
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onError(Throwable th) {
            this.isTerminated = true;
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onComplete() {
            this.isTerminated = true;
        }

        private boolean hasNext() {
            return !this.isTerminated;
        }

        @Nullable
        private OutputT getNext() {
            return (OutputT) Try.of(() -> {
                return this.dataOutput.poll(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
            }).orElse((Object) null, th -> {
                stop();
            });
        }

        private void stop() {
            this.isTerminated = true;
            this.subscription.cancel();
        }

        private void requestNext() {
            this.subscription.request(1L);
        }
    }

    private SourceStream(Stream<OutputT> stream) {
        super(stream);
        this.sourceSubscriber = null;
    }

    private SourceStream(Stream<OutputT> stream, SourceSubscriber<OutputT> sourceSubscriber) {
        super(stream);
        this.sourceSubscriber = sourceSubscriber;
    }

    public static <OutputT extends Serializable> SourceStream<OutputT> of(Collection<OutputT> collection) {
        return of((Stream) collection.stream());
    }

    public static <OutputT extends Serializable> SourceStream<OutputT> of(Flow.Publisher<OutputT> publisher) {
        SourceSubscriber sourceSubscriber = new SourceSubscriber();
        publisher.subscribe(sourceSubscriber);
        return of(sourceSubscriber);
    }

    public static <OutputT extends Serializable> SourceStream<OutputT> of(ReactedSubmissionPublisher<OutputT> reactedSubmissionPublisher, ReactedSubmissionPublisher.ReActedSubscriptionConfig<OutputT> reActedSubscriptionConfig) {
        SourceSubscriber sourceSubscriber = new SourceSubscriber();
        reactedSubmissionPublisher.subscribe(reActedSubscriptionConfig, sourceSubscriber);
        return of(sourceSubscriber);
    }

    public static <OutputT extends Serializable> SourceStream<OutputT> of(Stream<OutputT> stream) {
        return new SourceStream<>(stream);
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.BaseStream, java.lang.AutoCloseable
    public void close() {
        if (this.sourceSubscriber != null) {
            this.sourceSubscriber.stop();
        }
    }

    private static <OutputT extends Serializable> SourceStream<OutputT> of(final SourceSubscriber<OutputT> sourceSubscriber) {
        return new SourceStream<>(StreamSupport.stream(new Spliterator<OutputT>() { // from class: io.reacted.flow.SourceStream.1
            @Override // java.util.Spliterator
            public boolean tryAdvance(Consumer<? super OutputT> consumer) {
                Serializable next = SourceSubscriber.this.getNext();
                if (next != null) {
                    consumer.accept(next);
                    SourceSubscriber.this.requestNext();
                }
                return SourceSubscriber.this.hasNext();
            }

            @Override // java.util.Spliterator
            @Nullable
            public Spliterator<OutputT> trySplit() {
                return null;
            }

            @Override // java.util.Spliterator
            public long estimateSize() {
                return Long.MAX_VALUE;
            }

            @Override // java.util.Spliterator
            public int characteristics() {
                return 1040;
            }
        }, false), sourceSubscriber);
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.BaseStream
    public /* bridge */ /* synthetic */ Stream onClose(Runnable runnable) {
        return super.onClose(runnable);
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.BaseStream
    public /* bridge */ /* synthetic */ Stream unordered() {
        return super.unordered();
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.BaseStream
    public /* bridge */ /* synthetic */ Stream parallel() {
        return super.parallel();
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.BaseStream
    public /* bridge */ /* synthetic */ Stream sequential() {
        return super.sequential();
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.BaseStream
    public /* bridge */ /* synthetic */ boolean isParallel() {
        return super.isParallel();
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.BaseStream
    public /* bridge */ /* synthetic */ Spliterator spliterator() {
        return super.spliterator();
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.BaseStream
    public /* bridge */ /* synthetic */ Iterator iterator() {
        return super.iterator();
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.Stream
    public /* bridge */ /* synthetic */ Optional findAny() {
        return super.findAny();
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.Stream
    public /* bridge */ /* synthetic */ Optional findFirst() {
        return super.findFirst();
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.Stream
    public /* bridge */ /* synthetic */ boolean noneMatch(Predicate predicate) {
        return super.noneMatch(predicate);
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.Stream
    public /* bridge */ /* synthetic */ boolean allMatch(Predicate predicate) {
        return super.allMatch(predicate);
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.Stream
    public /* bridge */ /* synthetic */ boolean anyMatch(Predicate predicate) {
        return super.anyMatch(predicate);
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.Stream
    public /* bridge */ /* synthetic */ long count() {
        return super.count();
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.Stream
    public /* bridge */ /* synthetic */ Optional max(Comparator comparator) {
        return super.max(comparator);
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.Stream
    public /* bridge */ /* synthetic */ Optional min(Comparator comparator) {
        return super.min(comparator);
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.Stream
    public /* bridge */ /* synthetic */ Object collect(Collector collector) {
        return super.collect(collector);
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.Stream
    public /* bridge */ /* synthetic */ Object collect(Supplier supplier, BiConsumer biConsumer, BiConsumer biConsumer2) {
        return super.collect(supplier, biConsumer, biConsumer2);
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.Stream
    public /* bridge */ /* synthetic */ Object reduce(Object obj, BiFunction biFunction, BinaryOperator binaryOperator) {
        return super.reduce(obj, biFunction, binaryOperator);
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.Stream
    public /* bridge */ /* synthetic */ Optional reduce(BinaryOperator binaryOperator) {
        return super.reduce(binaryOperator);
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.Stream
    public /* bridge */ /* synthetic */ Object reduce(Object obj, BinaryOperator binaryOperator) {
        return super.reduce(obj, binaryOperator);
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.Stream
    public /* bridge */ /* synthetic */ Object[] toArray(IntFunction intFunction) {
        return super.toArray(intFunction);
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.Stream
    public /* bridge */ /* synthetic */ Object[] toArray() {
        return super.toArray();
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.Stream
    public /* bridge */ /* synthetic */ void forEachOrdered(Consumer consumer) {
        super.forEachOrdered(consumer);
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.Stream
    public /* bridge */ /* synthetic */ void forEach(Consumer consumer) {
        super.forEach(consumer);
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.Stream
    public /* bridge */ /* synthetic */ Stream skip(long j) {
        return super.skip(j);
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.Stream
    public /* bridge */ /* synthetic */ Stream limit(long j) {
        return super.limit(j);
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.Stream
    public /* bridge */ /* synthetic */ Stream peek(Consumer consumer) {
        return super.peek(consumer);
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.Stream
    public /* bridge */ /* synthetic */ Stream sorted(Comparator comparator) {
        return super.sorted(comparator);
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.Stream
    public /* bridge */ /* synthetic */ Stream sorted() {
        return super.sorted();
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.Stream
    public /* bridge */ /* synthetic */ Stream distinct() {
        return super.distinct();
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.Stream
    public /* bridge */ /* synthetic */ DoubleStream flatMapToDouble(Function function) {
        return super.flatMapToDouble(function);
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.Stream
    public /* bridge */ /* synthetic */ LongStream flatMapToLong(Function function) {
        return super.flatMapToLong(function);
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.Stream
    public /* bridge */ /* synthetic */ IntStream flatMapToInt(Function function) {
        return super.flatMapToInt(function);
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.Stream
    public /* bridge */ /* synthetic */ Stream flatMap(Function function) {
        return super.flatMap(function);
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.Stream
    public /* bridge */ /* synthetic */ DoubleStream mapToDouble(ToDoubleFunction toDoubleFunction) {
        return super.mapToDouble(toDoubleFunction);
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.Stream
    public /* bridge */ /* synthetic */ LongStream mapToLong(ToLongFunction toLongFunction) {
        return super.mapToLong(toLongFunction);
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.Stream
    public /* bridge */ /* synthetic */ IntStream mapToInt(ToIntFunction toIntFunction) {
        return super.mapToInt(toIntFunction);
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.Stream
    public /* bridge */ /* synthetic */ Stream map(Function function) {
        return super.map(function);
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.Stream
    public /* bridge */ /* synthetic */ Stream filter(Predicate predicate) {
        return super.filter(predicate);
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.Stream
    public /* bridge */ /* synthetic */ Stream dropWhile(Predicate predicate) {
        return super.dropWhile(predicate);
    }

    @Override // io.reacted.flow.StreamProxy, java.util.stream.Stream
    public /* bridge */ /* synthetic */ Stream takeWhile(Predicate predicate) {
        return super.takeWhile(predicate);
    }
}
