package io.datakernel.stream;

import io.datakernel.async.SettableStage;
import io.datakernel.async.Stage;
import io.datakernel.eventloop.Eventloop;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;

/* loaded from: input_file:io/datakernel/stream/StreamConsumerSwitcher.class */
public final class StreamConsumerSwitcher<T> extends AbstractStreamConsumer<T> implements StreamDataReceiver<T> {
    private StreamConsumerSwitcher<T>.InternalProducer currentInternalProducer;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/datakernel/stream/StreamConsumerSwitcher$InternalProducer.class */
    public class InternalProducer implements StreamProducer<T> {
        private final Eventloop eventloop;
        private final StreamConsumer<T> consumer;
        private final SettableStage<Void> endOfStream = SettableStage.create();
        private StreamDataReceiver<T> lastDataReceiver;
        private boolean suspended;
        private ArrayList<T> pendingItems;
        private boolean pendingEndOfStream;
        static final /* synthetic */ boolean $assertionsDisabled;

        public InternalProducer(Eventloop eventloop, StreamConsumer<T> streamConsumer) {
            this.eventloop = eventloop;
            this.consumer = streamConsumer;
        }

        @Override // io.datakernel.stream.StreamProducer
        public void setConsumer(StreamConsumer<T> streamConsumer) {
            if (!$assertionsDisabled && streamConsumer != this.consumer) {
                throw new AssertionError();
            }
            streamConsumer.getEndOfStream().whenException(this::closeWithError);
        }

        @Override // io.datakernel.stream.StreamProducer
        public void produce(StreamDataReceiver<T> streamDataReceiver) {
            StreamProducer<T> producer;
            this.lastDataReceiver = streamDataReceiver;
            this.suspended = false;
            if (this.pendingItems != null) {
                this.eventloop.post(() -> {
                    if (this.pendingItems.isEmpty()) {
                        return;
                    }
                    Iterator<T> it = this.pendingItems.iterator();
                    while (it.hasNext()) {
                        this.lastDataReceiver.onData(it.next());
                    }
                    this.pendingItems = null;
                    if (this.pendingEndOfStream) {
                        this.endOfStream.trySet((Object) null);
                    }
                    if (StreamConsumerSwitcher.this.currentInternalProducer == this) {
                        if (this.suspended) {
                            StreamConsumerSwitcher.this.getProducer().suspend();
                        } else {
                            StreamConsumerSwitcher.this.getProducer().produce(StreamConsumerSwitcher.this);
                        }
                    }
                });
            } else {
                if (StreamConsumerSwitcher.this.currentInternalProducer != this || (producer = StreamConsumerSwitcher.this.getProducer()) == null) {
                    return;
                }
                producer.produce(StreamConsumerSwitcher.this);
            }
        }

        @Override // io.datakernel.stream.StreamProducer
        public void suspend() {
            this.suspended = true;
            if (StreamConsumerSwitcher.this.currentInternalProducer == this) {
                StreamConsumerSwitcher.this.getProducer().suspend();
            }
        }

        public void closeWithError(Throwable th) {
            StreamConsumerSwitcher.this.closeWithError(th);
        }

        @Override // io.datakernel.stream.StreamProducer
        public Stage<Void> getEndOfStream() {
            return this.endOfStream;
        }

        @Override // io.datakernel.stream.StreamProducer
        public Set<StreamCapability> getCapabilities() {
            return StreamConsumerSwitcher.this.getProducer().getCapabilities();
        }

        public void onData(T t) {
            if (this.lastDataReceiver != null) {
                this.lastDataReceiver.onData(t);
                return;
            }
            if (this.pendingItems == null) {
                this.pendingItems = new ArrayList<>();
                StreamConsumerSwitcher.this.getProducer().suspend();
            }
            this.pendingItems.add(t);
        }

        public void sendError(Throwable th) {
            this.lastDataReceiver = obj -> {
            };
            this.endOfStream.trySetException(th);
        }

        public void sendEndOfStream() {
            if (this.pendingItems == null) {
                this.endOfStream.trySet((Object) null);
            } else {
                this.pendingEndOfStream = true;
            }
        }

        static {
            $assertionsDisabled = !StreamConsumerSwitcher.class.desiredAssertionStatus();
        }
    }

    private StreamConsumerSwitcher() {
    }

    public static <T> StreamConsumerSwitcher<T> create() {
        return create(StreamConsumer.idle());
    }

    public static <T> StreamConsumerSwitcher<T> create(StreamConsumer<T> streamConsumer) {
        StreamConsumerSwitcher<T> streamConsumerSwitcher = new StreamConsumerSwitcher<>();
        streamConsumerSwitcher.switchTo(streamConsumer);
        return streamConsumerSwitcher;
    }

    @Override // io.datakernel.stream.StreamDataReceiver
    public final void onData(T t) {
        this.currentInternalProducer.onData(t);
    }

    @Override // io.datakernel.stream.AbstractStreamConsumer
    protected final void onEndOfStream() {
        switchTo(StreamConsumer.idle());
    }

    @Override // io.datakernel.stream.AbstractStreamConsumer
    protected final void onError(Throwable th) {
        switchTo(StreamConsumer.idle());
    }

    @Override // io.datakernel.stream.AbstractStreamConsumer, io.datakernel.stream.StreamConsumer
    public Set<StreamCapability> getCapabilities() {
        return this.currentInternalProducer == null ? Collections.emptySet() : ((InternalProducer) this.currentInternalProducer).consumer.getCapabilities();
    }

    public void switchTo(StreamConsumer<T> streamConsumer) {
        if (getStatus() == StreamStatus.CLOSED_WITH_ERROR) {
            if (this.currentInternalProducer != null) {
                this.currentInternalProducer.sendError(getException());
            }
            this.currentInternalProducer = new InternalProducer(this.eventloop, StreamConsumer.idle());
            StreamProducer.closingWithError(getException()).streamTo(streamConsumer);
            return;
        }
        if (getStatus() == StreamStatus.END_OF_STREAM) {
            if (this.currentInternalProducer != null) {
                this.currentInternalProducer.sendEndOfStream();
            }
            this.currentInternalProducer = new InternalProducer(this.eventloop, StreamConsumer.idle());
            StreamProducer.of(new Object[0]).streamTo(streamConsumer);
            return;
        }
        if (this.currentInternalProducer != null) {
            this.currentInternalProducer.sendEndOfStream();
        }
        this.currentInternalProducer = new InternalProducer(this.eventloop, streamConsumer);
        this.currentInternalProducer.streamTo(streamConsumer);
    }
}
