package io.vlingo.symbio.store.state;

import io.vlingo.actors.Stage;
import io.vlingo.common.Completes;
import io.vlingo.reactivestreams.Elements;
import io.vlingo.reactivestreams.PublisherConfiguration;
import io.vlingo.reactivestreams.Sink;
import io.vlingo.reactivestreams.Source;
import io.vlingo.reactivestreams.Stream;
import io.vlingo.reactivestreams.StreamPublisher;
import io.vlingo.reactivestreams.StreamSubscriber;
import io.vlingo.reactivestreams.Streams;
import io.vlingo.symbio.State;
import io.vlingo.symbio.StateAdapterProvider;
import io.vlingo.symbio.StateBundle;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:io/vlingo/symbio/store/state/StateStream.class */
public class StateStream<RS extends State<?>> implements Stream {
    private long flowElementsRate;
    private Publisher<RS> publisher;
    private final Stage stage;
    private final Map<String, RS> states;
    private final StateAdapterProvider stateAdapterProvider;
    private StateStreamSubscriber<RS> subscriber;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/vlingo/symbio/store/state/StateStream$StateSource.class */
    public static final class StateSource<RS extends State<?>> implements Source<RS> {
        private final long flowElementsRate;
        private final Iterator<String> iterator;
        private final Map<String, RS> states;
        private final StateAdapterProvider stateAdapterProvider;

        public StateSource(Map<String, RS> map, StateAdapterProvider stateAdapterProvider, long j) {
            this.states = map;
            this.iterator = map.keySet().iterator();
            this.stateAdapterProvider = stateAdapterProvider;
            this.flowElementsRate = j;
        }

        public Completes<Elements<RS>> next() {
            if (!this.iterator.hasNext()) {
                return Completes.withSuccess(Elements.terminated());
            }
            int i = 0;
            ArrayList arrayList = new ArrayList();
            while (this.iterator.hasNext()) {
                int i2 = i;
                i++;
                if (i2 >= this.flowElementsRate) {
                    break;
                }
                RS rs = this.states.get(this.iterator.next());
                arrayList.add(new StateBundle(rs, this.stateAdapterProvider.fromRaw(rs)));
            }
            return Completes.withSuccess(Elements.of(arrayFrom(arrayList)));
        }

        public Completes<Elements<RS>> next(int i) {
            return next();
        }

        public Completes<Elements<RS>> next(long j) {
            return next();
        }

        public Completes<Elements<RS>> next(long j, int i) {
            return next();
        }

        public Completes<Boolean> isSlow() {
            return Completes.withSuccess(false);
        }

        private StateBundle[] arrayFrom(List<StateBundle> list) {
            return (StateBundle[]) list.toArray(new StateBundle[list.size()]);
        }
    }

    /* loaded from: input_file:io/vlingo/symbio/store/state/StateStream$StateStreamSubscriber.class */
    public static class StateStreamSubscriber<RS extends State<?>> extends StreamSubscriber<RS> {
        Subscription subscriptionHook;

        public StateStreamSubscriber(Sink<RS> sink, long j, StateStream<RS> stateStream) {
            super(sink, j);
            ((StateStream) stateStream).subscriber = this;
        }

        public void onComplete() {
            super.onComplete();
        }

        public void onSubscribe(Subscription subscription) {
            this.subscriptionHook = subscription;
            super.onSubscribe(subscription);
        }
    }

    public StateStream(Stage stage, Map<String, RS> map, StateAdapterProvider stateAdapterProvider) {
        this.stage = stage;
        this.stateAdapterProvider = stateAdapterProvider;
        this.states = map;
    }

    public void request(long j) {
        this.flowElementsRate = j;
        this.subscriber.subscriptionHook.request(this.flowElementsRate);
    }

    public <S> void flowInto(Sink<S> sink) {
        flowInto(sink, 100L, DefaultProbeInterval);
    }

    public <S> void flowInto(Sink<S> sink, long j) {
        flowInto(sink, j, DefaultProbeInterval);
    }

    public <S> void flowInto(Sink<S> sink, long j, int i) {
        this.flowElementsRate = j;
        this.publisher = (Publisher) this.stage.actorFor(Publisher.class, StreamPublisher.class, new Object[]{new StateSource(this.states, this.stateAdapterProvider, j), PublisherConfiguration.with(i, -1, 256, Streams.OverflowPolicy.DropCurrent)});
        this.publisher.subscribe((Subscriber) this.stage.actorFor(Subscriber.class, StateStreamSubscriber.class, new Object[]{sink, Long.valueOf(j), this}));
    }

    public void stop() {
        this.subscriber.subscriptionHook.cancel();
    }
}
