package io.vlingo.xoom.reactivestreams;

import io.vlingo.xoom.actors.Actor;
import io.vlingo.xoom.actors.Stoppable;
import io.vlingo.xoom.common.Completes;
import io.vlingo.xoom.common.Scheduled;
import io.vlingo.xoom.reactivestreams.sink.ConsumerSink;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.function.Consumer;
import org.reactivestreams.Processor;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:io/vlingo/xoom/reactivestreams/StreamProcessor.class */
public class StreamProcessor<T, R> extends Actor implements Processor<T, R>, ControlledSubscription<R>, Scheduled<Void>, Stoppable {
    private final StreamPublisherDelegate<R> publisherDelegate;
    private final StreamProcessor<T, R>.PublisherSource publisherSource = new PublisherSource();
    private final long requestThreshold;
    private final StreamSubscriberDelegate<T> subscriberDelegate;

    /* loaded from: input_file:io/vlingo/xoom/reactivestreams/StreamProcessor$ConsumerOperator.class */
    private class ConsumerOperator implements Consumer<T> {
        private final Operator<T, R> operator;

        ConsumerOperator(Operator<T, R> operator) {
            this.operator = operator;
        }

        @Override // java.util.function.Consumer
        public void accept(T t) {
            try {
                this.operator.performInto(t, obj -> {
                    StreamProcessor.this.publisherSource.enqueue(obj);
                });
            } catch (Exception e) {
                StreamProcessor.this.publisherDelegate.publish((Throwable) e);
            }
        }
    }

    /* loaded from: input_file:io/vlingo/xoom/reactivestreams/StreamProcessor$PublisherSource.class */
    private class PublisherSource implements Source<R> {
        private final Queue<R> values = new ArrayDeque();
        private boolean terminated = false;

        PublisherSource() {
        }

        @Override // io.vlingo.xoom.reactivestreams.Source
        public Completes<Elements<R>> next() {
            return next((int) StreamProcessor.this.requestThreshold);
        }

        @Override // io.vlingo.xoom.reactivestreams.Source
        public Completes<Elements<R>> next(int i) {
            return this.values.isEmpty() ? (StreamProcessor.this.subscriberDelegate.isFinalized() || this.terminated) ? Completes.withSuccess(Elements.terminated()) : Completes.withSuccess(Elements.empty()) : Completes.withSuccess(Elements.of(nextValues(i)));
        }

        @Override // io.vlingo.xoom.reactivestreams.Source
        public Completes<Elements<R>> next(long j) {
            return next((int) StreamProcessor.this.requestThreshold);
        }

        @Override // io.vlingo.xoom.reactivestreams.Source
        public Completes<Elements<R>> next(long j, int i) {
            return next(i);
        }

        @Override // io.vlingo.xoom.reactivestreams.Source
        public Completes<Boolean> isSlow() {
            return Completes.withSuccess(false);
        }

        void enqueue(R r) {
            this.values.add(r);
        }

        void termiante() {
            this.terminated = true;
        }

        private R[] nextValues(long j) {
            R[] rArr = (R[]) new Object[(int) Math.min(this.values.size(), j)];
            for (int i = 0; i < rArr.length; i++) {
                rArr[i] = this.values.poll();
            }
            return rArr;
        }
    }

    public StreamProcessor(Operator<T, R> operator, long j, PublisherConfiguration publisherConfiguration) {
        this.requestThreshold = j;
        this.subscriberDelegate = new StreamSubscriberDelegate<>(new ConsumerSink(new ConsumerOperator(operator)), j, logger());
        this.publisherDelegate = new StreamPublisherDelegate<>(this.publisherSource, publisherConfiguration, (ControlledSubscription) selfAs(ControlledSubscription.class), scheduler(), (Scheduled) selfAs(Scheduled.class), (Stoppable) selfAs(Stoppable.class));
    }

    public void onSubscribe(Subscription subscription) {
        this.subscriberDelegate.onSubscribe(subscription);
    }

    public void onNext(T t) {
        this.subscriberDelegate.onNext(t);
    }

    public void onComplete() {
        this.subscriberDelegate.onComplete();
        this.publisherSource.termiante();
    }

    public void onError(Throwable th) {
        this.publisherDelegate.publish(th);
        this.subscriberDelegate.onError(th);
        this.publisherSource.termiante();
    }

    public void subscribe(Subscriber<? super R> subscriber) {
        this.publisherDelegate.subscribe(subscriber);
    }

    public void intervalSignal(Scheduled<Void> scheduled, Void r4) {
        this.publisherDelegate.processNext();
    }

    @Override // io.vlingo.xoom.reactivestreams.ControlledSubscription
    public void cancel(SubscriptionController<R> subscriptionController) {
        this.subscriberDelegate.cancelSubscription();
        this.publisherDelegate.cancel(subscriptionController);
    }

    @Override // io.vlingo.xoom.reactivestreams.ControlledSubscription
    public void request(SubscriptionController<R> subscriptionController, long j) {
        this.publisherDelegate.request(subscriptionController, j);
    }

    public void stop() {
        this.publisherSource.termiante();
        super.stop();
    }

    public /* bridge */ /* synthetic */ void intervalSignal(Scheduled scheduled, Object obj) {
        intervalSignal((Scheduled<Void>) scheduled, (Void) obj);
    }
}
