/*
 * Decompiled with CFR 0.152.
 */
package io.datakernel.stream;

import io.datakernel.async.SettableStage;
import io.datakernel.async.Stage;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.stream.AbstractStreamConsumer;
import io.datakernel.stream.DataStreams;
import io.datakernel.stream.StreamCapability;
import io.datakernel.stream.StreamConsumer;
import io.datakernel.stream.StreamDataReceiver;
import io.datakernel.stream.StreamProducer;
import io.datakernel.stream.StreamStatus;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Set;

public final class StreamConsumerSwitcher<T>
extends AbstractStreamConsumer<T>
implements StreamDataReceiver<T> {
    private InternalProducer currentInternalProducer;

    private StreamConsumerSwitcher() {
    }

    public static <T> StreamConsumerSwitcher<T> create() {
        return StreamConsumerSwitcher.create(StreamConsumer.idle());
    }

    public static <T> StreamConsumerSwitcher<T> create(StreamConsumer<T> consumer) {
        StreamConsumerSwitcher<T> switcher = new StreamConsumerSwitcher<T>();
        switcher.switchTo(consumer);
        return switcher;
    }

    @Override
    public final void onData(T item) {
        this.currentInternalProducer.onData(item);
    }

    @Override
    protected final void onEndOfStream() {
        this.switchTo(StreamConsumer.idle());
    }

    @Override
    protected final void onError(Throwable t) {
        this.switchTo(StreamConsumer.idle());
    }

    @Override
    public Set<StreamCapability> getCapabilities() {
        return this.currentInternalProducer == null ? Collections.emptySet() : this.currentInternalProducer.consumer.getCapabilities();
    }

    public void switchTo(StreamConsumer<T> newConsumer) {
        if (this.getStatus() == StreamStatus.CLOSED_WITH_ERROR) {
            if (this.currentInternalProducer != null) {
                this.currentInternalProducer.sendError(this.getException());
            }
            this.currentInternalProducer = new InternalProducer(this.eventloop, StreamConsumer.idle());
            DataStreams.bind(StreamProducer.closingWithError(this.getException()), newConsumer);
        } else if (this.getStatus() == StreamStatus.END_OF_STREAM) {
            if (this.currentInternalProducer != null) {
                this.currentInternalProducer.sendEndOfStream();
            }
            this.currentInternalProducer = new InternalProducer(this.eventloop, StreamConsumer.idle());
            DataStreams.bind(StreamProducer.of(new Object[0]), newConsumer);
        } else {
            if (this.currentInternalProducer != null) {
                this.currentInternalProducer.sendEndOfStream();
            }
            this.currentInternalProducer = new InternalProducer(this.eventloop, newConsumer);
            DataStreams.bind(this.currentInternalProducer, newConsumer);
        }
    }

    private class InternalProducer
    implements StreamProducer<T> {
        private final Eventloop eventloop;
        private final StreamConsumer<T> consumer;
        private final SettableStage<Void> endOfStream = SettableStage.create();
        private StreamDataReceiver<T> lastDataReceiver;
        private boolean suspended;
        private ArrayList<T> pendingItems;
        private boolean pendingEndOfStream;

        public InternalProducer(Eventloop eventloop, StreamConsumer<T> consumer) {
            this.eventloop = eventloop;
            this.consumer = consumer;
        }

        @Override
        public void setConsumer(StreamConsumer<T> consumer) {
            assert (consumer == this.consumer);
            consumer.getEndOfStream().whenException(this::closeWithError);
        }

        @Override
        public void produce(StreamDataReceiver<T> dataReceiver) {
            StreamProducer producer;
            this.lastDataReceiver = dataReceiver;
            this.suspended = false;
            if (this.pendingItems != null) {
                this.eventloop.post(() -> {
                    if (this.pendingItems.isEmpty()) {
                        return;
                    }
                    for (Object item : this.pendingItems) {
                        this.lastDataReceiver.onData(item);
                    }
                    this.pendingItems = null;
                    if (this.pendingEndOfStream) {
                        this.endOfStream.trySet(null);
                    }
                    if (StreamConsumerSwitcher.this.currentInternalProducer == this) {
                        if (!this.suspended) {
                            StreamConsumerSwitcher.this.getProducer().produce(StreamConsumerSwitcher.this);
                        } else {
                            StreamConsumerSwitcher.this.getProducer().suspend();
                        }
                    }
                });
            } else if (StreamConsumerSwitcher.this.currentInternalProducer == this && (producer = StreamConsumerSwitcher.this.getProducer()) != null) {
                producer.produce(StreamConsumerSwitcher.this);
            }
        }

        @Override
        public void suspend() {
            this.suspended = true;
            if (StreamConsumerSwitcher.this.currentInternalProducer == this) {
                StreamConsumerSwitcher.this.getProducer().suspend();
            }
        }

        public void closeWithError(Throwable t) {
            StreamConsumerSwitcher.this.closeWithError(t);
        }

        @Override
        public Stage<Void> getEndOfStream() {
            return this.endOfStream;
        }

        @Override
        public Set<StreamCapability> getCapabilities() {
            return StreamConsumerSwitcher.this.getProducer().getCapabilities();
        }

        public void onData(T item) {
            if (this.lastDataReceiver != null) {
                this.lastDataReceiver.onData(item);
            } else {
                if (this.pendingItems == null) {
                    this.pendingItems = new ArrayList();
                    StreamConsumerSwitcher.this.getProducer().suspend();
                }
                this.pendingItems.add(item);
            }
        }

        public void sendError(Throwable exception) {
            this.lastDataReceiver = item -> {};
            this.endOfStream.trySetException(exception);
        }

        public void sendEndOfStream() {
            if (this.pendingItems == null) {
                this.endOfStream.trySet(null);
            } else {
                this.pendingEndOfStream = true;
            }
        }
    }
}

