/*
 * Decompiled with CFR 0.152.
 */
package io.datakernel.stream;

import io.datakernel.async.SettableStage;
import io.datakernel.async.Stage;
import io.datakernel.async.Stages;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.stream.AbstractStreamProducer;
import io.datakernel.stream.ForwardingStreamProducer;
import io.datakernel.stream.StreamCapability;
import io.datakernel.stream.StreamConsumer;
import io.datakernel.stream.StreamDataReceiver;
import io.datakernel.stream.StreamProducer;
import io.datakernel.stream.StreamProducerModifier;
import io.datakernel.util.Preconditions;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;

public final class StreamProducers {
    private StreamProducers() {
    }

    public static <T> StreamProducerModifier<T, T> suppliedEndOfStream(final Function<Stage<Void>, Stage<Void>> endOfStreamSupplier) {
        return producer -> new ForwardingStreamProducer<T>(producer){
            final Stage endOfStream;
            {
                super(producer);
                this.endOfStream = (Stage)endOfStreamSupplier.apply(this.producer.getEndOfStream());
            }

            @Override
            public Stage<Void> getEndOfStream() {
                return this.endOfStream;
            }
        };
    }

    public static <T> StreamProducerModifier<T, T> suppliedEndOfStream(Stage<Void> suppliedEndOfStream) {
        return StreamProducers.suppliedEndOfStream((Stage<Void> actualEndOfStream) -> Stages.any((Stage)actualEndOfStream, (Stage)suppliedEndOfStream));
    }

    public static <T> StreamProducerModifier<T, T> decorator(final Decorator<T> decorator) {
        return producer -> new ForwardingStreamProducer<T>(producer){
            final SettableStage endOfStream;
            {
                super(producer);
                this.endOfStream = SettableStage.create();
                this.producer.getEndOfStream().whenComplete((arg_0, arg_1) -> ((SettableStage)this.endOfStream).trySet(arg_0, arg_1));
            }

            @Override
            public void produce(StreamDataReceiver<T> dataReceiver) {
                this.producer.produce(decorator.decorate(new Decorator.Context(){

                    @Override
                    public void endOfStream() {
                        endOfStream.trySet(null);
                    }

                    @Override
                    public void closeWithError(Throwable error) {
                        endOfStream.trySetException(error);
                    }
                }, dataReceiver));
            }

            @Override
            public Stage<Void> getEndOfStream() {
                return this.endOfStream;
            }
        };
    }

    public static <T> StreamProducerModifier<T, T> errorDecorator(Function<T, Throwable> errorFunction) {
        return StreamProducers.decorator((context, dataReceiver) -> item -> {
            Throwable error = (Throwable)errorFunction.apply(item);
            if (error == null) {
                dataReceiver.onData(item);
            } else {
                context.closeWithError(error);
            }
        });
    }

    public static <T> StreamProducerModifier<T, T> endOfStreamOnError(final Predicate<Throwable> endOfStreamPredicate) {
        return producer -> new ForwardingStreamProducer<T>(producer){
            final SettableStage endOfStream;
            {
                super(producer);
                this.endOfStream = SettableStage.create();
                this.producer.getEndOfStream().whenComplete(($, throwable) -> {
                    if (throwable == null) {
                        this.endOfStream.set(null);
                    } else if (endOfStreamPredicate.test(throwable)) {
                        this.endOfStream.set(null);
                    } else {
                        this.endOfStream.setException(throwable);
                    }
                });
            }

            @Override
            public Stage<Void> getEndOfStream() {
                return this.endOfStream;
            }
        };
    }

    public static <T> StreamProducerModifier<T, T> endOfStreamOnError() {
        return StreamProducers.endOfStreamOnError(throwable -> true);
    }

    public static <T> StreamProducerModifier<T, T> noEndOfStream() {
        return producer -> new ForwardingStreamProducer<T>(producer){
            final SettableStage endOfStream = SettableStage.create();
            {
                this.producer.getEndOfStream().whenException(arg_0 -> ((SettableStage)this.endOfStream).setException(arg_0));
            }

            @Override
            public Stage<Void> getEndOfStream() {
                return this.endOfStream;
            }
        };
    }

    public static interface Decorator<T> {
        public StreamDataReceiver<T> decorate(Context var1, StreamDataReceiver<T> var2);

        public static interface Context {
            public void endOfStream();

            public void closeWithError(Throwable var1);
        }
    }

    static class OfIteratorImpl<T>
    extends AbstractStreamProducer<T> {
        private final Iterator<T> iterator;

        public OfIteratorImpl(Iterator<T> iterator) {
            this.iterator = (Iterator)Preconditions.checkNotNull(iterator);
        }

        @Override
        protected void produce() {
            while (this.iterator.hasNext()) {
                StreamDataReceiver<T> dataReceiver = this.getCurrentDataReceiver();
                if (dataReceiver == null) {
                    return;
                }
                T item = this.iterator.next();
                dataReceiver.onData(item);
            }
            this.sendEndOfStream();
        }

        @Override
        protected void onError(Throwable t) {
        }

        @Override
        public Set<StreamCapability> getCapabilities() {
            return EnumSet.of(StreamCapability.LATE_BINDING);
        }
    }

    static final class IdleImpl<T>
    implements StreamProducer<T> {
        private final SettableStage<Void> endOfStream = SettableStage.create();

        IdleImpl() {
        }

        @Override
        public void setConsumer(StreamConsumer<T> consumer) {
            consumer.getEndOfStream().whenException(arg_0 -> this.endOfStream.setException(arg_0));
        }

        @Override
        public void produce(StreamDataReceiver<T> dataReceiver) {
        }

        @Override
        public void suspend() {
        }

        @Override
        public Stage<Void> getEndOfStream() {
            return this.endOfStream;
        }

        @Override
        public Set<StreamCapability> getCapabilities() {
            return EnumSet.of(StreamCapability.LATE_BINDING);
        }
    }

    static class ClosingImpl<T>
    implements StreamProducer<T> {
        private final SettableStage<Void> endOfStream = SettableStage.create();

        ClosingImpl() {
        }

        @Override
        public void setConsumer(StreamConsumer<T> consumer) {
            Eventloop.getCurrentEventloop().post(() -> this.endOfStream.set(null));
        }

        @Override
        public void produce(StreamDataReceiver<T> dataReceiver) {
        }

        @Override
        public void suspend() {
        }

        @Override
        public Stage<Void> getEndOfStream() {
            return this.endOfStream;
        }

        @Override
        public Set<StreamCapability> getCapabilities() {
            return EnumSet.of(StreamCapability.LATE_BINDING);
        }
    }

    static class ClosingWithErrorImpl<T>
    implements StreamProducer<T> {
        private final SettableStage<Void> endOfStream = SettableStage.create();
        private final Throwable exception;

        ClosingWithErrorImpl(Throwable exception) {
            this.exception = exception;
        }

        @Override
        public void setConsumer(StreamConsumer<T> consumer) {
            Eventloop.getCurrentEventloop().post(() -> this.endOfStream.setException(this.exception));
        }

        @Override
        public void produce(StreamDataReceiver<T> dataReceiver) {
        }

        @Override
        public void suspend() {
        }

        @Override
        public Stage<Void> getEndOfStream() {
            return this.endOfStream;
        }

        @Override
        public Set<StreamCapability> getCapabilities() {
            return EnumSet.of(StreamCapability.LATE_BINDING);
        }
    }
}

