package io.vlingo.xoom.reactivestreams;

import io.vlingo.xoom.actors.Stoppable;
import io.vlingo.xoom.common.Cancellable;
import io.vlingo.xoom.common.Scheduled;
import io.vlingo.xoom.common.Scheduler;
import java.util.HashMap;
import java.util.Map;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

/* loaded from: input_file:io/vlingo/xoom/reactivestreams/StreamPublisherDelegate.class */
public class StreamPublisherDelegate<T> implements Publisher<T>, ControlledSubscription<T> {
    private Cancellable cancellable;
    private final PublisherConfiguration configuration;
    private boolean slow;
    private final Source<T> source;
    private final Map<Integer, SubscriptionController<T>> subscriptions = new HashMap(2);
    private final ControlledSubscription<T> controlledSubscription;
    private final Scheduler scheduler;
    private final Scheduled<Void> scheduled;
    private final Stoppable stoppable;
    private boolean flushed;

    public StreamPublisherDelegate(Source<T> source, PublisherConfiguration publisherConfiguration, ControlledSubscription<T> controlledSubscription, Scheduler scheduler, Scheduled<Void> scheduled, Stoppable stoppable) {
        this.source = source;
        this.configuration = publisherConfiguration;
        this.controlledSubscription = controlledSubscription;
        this.scheduler = scheduler;
        this.scheduled = scheduled;
        this.stoppable = stoppable;
        determineIfSlow();
    }

    public void subscribe(Subscriber<? super T> subscriber) {
        schedule(true);
        SubscriptionController<T> subscriptionController = new SubscriptionController<>(subscriber, this.controlledSubscription, this.configuration);
        this.subscriptions.putIfAbsent(Integer.valueOf(subscriptionController.id()), subscriptionController);
        subscriber.onSubscribe(subscriptionController);
    }

    @Override // io.vlingo.xoom.reactivestreams.ControlledSubscription
    public void cancel(SubscriptionController<T> subscriptionController) {
        subscriptionController.cancelSubscription();
        this.subscriptions.remove(Integer.valueOf(subscriptionController.id()));
    }

    @Override // io.vlingo.xoom.reactivestreams.ControlledSubscription
    public void request(SubscriptionController<T> subscriptionController, long j) {
        subscriptionController.requestFlow(subscriptionController.accumulate(j));
        publish(subscriptionController, null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void processNext() {
        if (this.subscriptions.isEmpty()) {
            return;
        }
        try {
            this.source.next().andThen(elements -> {
                if (!elements.terminated) {
                    publish((Object[]) elements.values);
                    schedule(false);
                    return elements;
                }
                if (flush()) {
                    return elements;
                }
                completeAll();
                this.stoppable.stop();
                return elements;
            }).andFinally();
        } catch (Throwable th) {
            publish(th);
        }
    }

    void publish(T t) {
        this.subscriptions.values().forEach(subscriptionController -> {
            subscriptionController.onNext(t);
        });
    }

    void publish(SubscriptionController<T> subscriptionController, T t) {
        subscriptionController.onNext(t);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void publish(Throwable th) {
        this.subscriptions.values().forEach(subscriptionController -> {
            subscriptionController.onError(th);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        this.cancellable.cancel();
        completeAll();
    }

    private void completeAll() {
        this.subscriptions.values().forEach(subscriptionController -> {
            subscriptionController.subscriber().onComplete();
        });
        this.subscriptions.clear();
    }

    private void determineIfSlow() {
        this.slow = ((Boolean) this.source.isSlow().await()).booleanValue();
    }

    private boolean flush() {
        this.flushed = false;
        this.subscriptions.values().forEach(subscriptionController -> {
            if (subscriptionController.hasBufferedElements()) {
                subscriptionController.onNext(null);
                this.flushed = true;
            }
        });
        return this.flushed;
    }

    private T[] publish(T[] tArr) {
        if (tArr.length > 0) {
            for (T t : tArr) {
                publish((StreamPublisherDelegate<T>) t);
            }
        }
        return tArr;
    }

    private void schedule(boolean z) {
        if (this.slow) {
            this.cancellable = this.scheduler.scheduleOnce(this.scheduled, (Object) null, 0L, this.configuration.probeInterval);
        } else if (z && this.cancellable == null) {
            this.cancellable = this.scheduler.schedule(this.scheduled, (Object) null, 0L, this.configuration.probeInterval);
        }
    }
}
