package com.oath.cyclops.types.reactive;

import com.oath.cyclops.async.adapters.Queue;
import com.oath.cyclops.async.adapters.QueueFactory;
import com.oath.cyclops.react.async.subscription.Continueable;
import com.oath.cyclops.types.futurestream.Continuation;
import cyclops.control.Eval;
import cyclops.futurestream.FutureStream;
import cyclops.futurestream.LazyReact;
import cyclops.reactive.ReactiveSeq;
import cyclops.reactive.collections.mutable.QueueX;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:com/oath/cyclops/types/reactive/QueueBasedSubscriber.class */
public class QueueBasedSubscriber<T> implements Subscriber<T> {
    private final int maxConcurrency;
    private final QueueFactory<T> factory;
    protected volatile Queue<T> queue;
    volatile Subscription subscription;
    private volatile FutureStream<T> stream;
    private volatile Supplier<FutureStream<T>> futureStream;
    private volatile Supplier<Stream<T>> jdkStream;
    private volatile Supplier<ReactiveSeq<T>> reactiveSeq;
    private volatile Consumer<Throwable> errorHandler;
    private final Counter counter;

    /* loaded from: input_file:com/oath/cyclops/types/reactive/QueueBasedSubscriber$Counter.class */
    public static class Counter {
        public AtomicLong active = new AtomicLong(0);
        public volatile boolean completable = false;
        public final QueueX<Subscription> subscription = QueueX.fromIterable(Collectors.toCollection(() -> {
            return new ConcurrentLinkedQueue();
        }), Arrays.asList(new Subscription[0]));
        volatile boolean closed = false;
        public volatile int added = 0;
        final AtomicBoolean closing = new AtomicBoolean(false);
    }

    public static <T> QueueBasedSubscriber<T> subscriber(Counter counter, int i) {
        return new QueueBasedSubscriber<>(counter, i);
    }

    public static <T> QueueBasedSubscriber<T> subscriber(Queue<T> queue, Counter counter, int i) {
        return new QueueBasedSubscriber<>(queue, counter, i);
    }

    public static <T> QueueBasedSubscriber<T> subscriber(QueueFactory<T> queueFactory, Counter counter, int i) {
        return new QueueBasedSubscriber<>(queueFactory, counter, i);
    }

    private Stream<T> genJdkStream() {
        return this.queue.stream(new com.oath.cyclops.react.async.subscription.Subscription());
    }

    private FutureStream<T> genStream() {
        Continueable subscription = new com.oath.cyclops.react.async.subscription.Subscription();
        return new LazyReact().of(new Object[0]).withSubscription(subscription).fromStream((Stream) this.queue.stream(subscription));
    }

    public QueueBasedSubscriber(final Counter counter, int i) {
        this.futureStream = Eval.later(this::genStream);
        this.jdkStream = Eval.later(this::genJdkStream);
        this.reactiveSeq = Eval.later(() -> {
            return ReactiveSeq.fromStream(this.jdkStream.get());
        });
        this.maxConcurrency = i;
        this.factory = null;
        this.counter = counter;
        this.queue = new Queue<T>() { // from class: com.oath.cyclops.types.reactive.QueueBasedSubscriber.1
            public T get() {
                counter.subscription.forEach(subscription -> {
                    subscription.request(1L);
                });
                return (T) super.get();
            }
        };
    }

    private QueueBasedSubscriber(Queue<T> queue, Counter counter, int i) {
        this.futureStream = Eval.later(this::genStream);
        this.jdkStream = Eval.later(this::genJdkStream);
        this.reactiveSeq = Eval.later(() -> {
            return ReactiveSeq.fromStream(this.jdkStream.get());
        });
        this.factory = null;
        this.maxConcurrency = i;
        this.counter = counter;
        this.queue = queue;
    }

    private QueueBasedSubscriber(QueueFactory<T> queueFactory, final Counter counter, final int i) {
        this.futureStream = Eval.later(this::genStream);
        this.jdkStream = Eval.later(this::genJdkStream);
        this.reactiveSeq = Eval.later(() -> {
            return ReactiveSeq.fromStream(this.jdkStream.get());
        });
        this.counter = counter;
        this.factory = queueFactory;
        this.maxConcurrency = i;
        this.queue = new Queue<T>(queueFactory) { // from class: com.oath.cyclops.types.reactive.QueueBasedSubscriber.2
            public T get() {
                if (size() < i * 3 && counter.subscription.size() > 0) {
                    counter.subscription.forEach(subscription -> {
                        subscription.request(1L);
                    });
                }
                return (T) super.get();
            }
        };
    }

    public FutureStream<T> futureStream() {
        FutureStream<T> futureStream = this.futureStream.get();
        this.stream = futureStream;
        return futureStream;
    }

    public Stream<T> jdkStream() {
        return this.jdkStream.get();
    }

    public ReactiveSeq<T> reactiveSeq() {
        return this.reactiveSeq.get();
    }

    public void onSubscribe(Subscription subscription) {
        Objects.requireNonNull(subscription);
        if (this.subscription != null) {
            this.subscription.cancel();
            subscription.cancel();
            return;
        }
        this.subscription = subscription;
        while (this.counter.subscription.size() > this.maxConcurrency) {
            LockSupport.parkNanos(100L);
        }
        this.counter.subscription.plus(this.subscription);
        subscription.request(1L);
    }

    public void onNext(T t) {
        Objects.requireNonNull(t);
        this.counter.added++;
        this.queue.add(t);
    }

    public void onError(Throwable th) {
        Objects.requireNonNull(th);
        if (this.stream != null) {
            this.stream.getErrorHandler().orElse(obj -> {
            }).accept(th);
        }
        if (this.errorHandler != null) {
            this.errorHandler.accept(th);
        }
    }

    public void onComplete() {
        this.counter.active.decrementAndGet();
        this.counter.subscription.removeValue(this.subscription);
        if (this.queue != null && this.counter.active.get() == 0 && this.counter.completable && this.counter.closing.compareAndSet(false, true)) {
            this.counter.closed = true;
            this.queue.addContinuation(new Continuation(() -> {
                ArrayList arrayList = new ArrayList();
                while (this.queue.size() > 0) {
                    try {
                        arrayList.add(this.queue.get());
                    } catch (Queue.ClosedQueueException e) {
                    }
                }
                throw new Queue.ClosedQueueException(arrayList);
            }));
            this.queue.close();
        }
    }

    public void close() {
        this.counter.completable = true;
        if (this.queue != null && this.counter.active.get() == 0 && this.counter.closing.compareAndSet(false, true)) {
            this.counter.closed = true;
            this.queue.addContinuation(new Continuation(() -> {
                throw new Queue.ClosedQueueException();
            }));
            this.queue.close();
        }
    }

    public void addContinuation(Continuation continuation) {
        this.queue.addContinuation(continuation);
    }

    public Queue<T> getQueue() {
        return this.queue;
    }

    public Subscription getSubscription() {
        return this.subscription;
    }

    public void setErrorHandler(Consumer<Throwable> consumer) {
        this.errorHandler = consumer;
    }
}
