/*
 * Decompiled with CFR 0.152.
 */
package io.datakernel.stream;

import io.datakernel.async.SettableStage;
import io.datakernel.async.Stage;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.stream.AbstractStreamConsumer;
import io.datakernel.stream.ForwardingStreamConsumer;
import io.datakernel.stream.ForwardingStreamProducer;
import io.datakernel.stream.StreamCapability;
import io.datakernel.stream.StreamConsumer;
import io.datakernel.stream.StreamConsumerModifier;
import io.datakernel.stream.StreamDataReceiver;
import io.datakernel.stream.StreamProducer;
import java.util.EnumSet;
import java.util.Random;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;

public final class StreamConsumers {
    private StreamConsumers() {
    }

    public static <T> StreamConsumerModifier<T, T> decorator(final Decorator<T> decorator) {
        return consumer -> new ForwardingStreamConsumer<T>(consumer){
            final SettableStage endOfStream;
            {
                super(consumer2);
                this.endOfStream = SettableStage.create();
                consumer.getEndOfStream().whenComplete((arg_0, arg_1) -> ((SettableStage)this.endOfStream).trySet(arg_0, arg_1));
            }

            @Override
            public void setProducer(StreamProducer<T> producer) {
                super.setProducer(new ForwardingStreamProducer<T>(producer){

                    @Override
                    public void produce(StreamDataReceiver<T> dataReceiver) {
                        final StreamDataReceiver[] dataReceiverHolder = new StreamDataReceiver[1];
                        Decorator.Context context = new Decorator.Context(){
                            final Eventloop eventloop = Eventloop.getCurrentEventloop();

                            @Override
                            public void suspend() {
                                producer.suspend();
                            }

                            @Override
                            public void resume() {
                                this.eventloop.post(() -> producer.produce(dataReceiverHolder[0]));
                            }

                            @Override
                            public void closeWithError(Throwable error) {
                                endOfStream.trySetException(error);
                            }
                        };
                        dataReceiverHolder[0] = decorator.decorate(context, dataReceiver);
                        super.produce(dataReceiverHolder[0]);
                    }
                });
            }

            @Override
            public Stage<Void> getEndOfStream() {
                return this.endOfStream;
            }
        };
    }

    public static <T> StreamConsumerModifier<T, T> errorDecorator(Function<T, Throwable> errorFunction) {
        return StreamConsumers.decorator((context, dataReceiver) -> item -> {
            Throwable error = (Throwable)errorFunction.apply(item);
            if (error == null) {
                dataReceiver.onData(item);
            } else {
                context.closeWithError(error);
            }
        });
    }

    public static <T> StreamConsumerModifier<T, T> suspendDecorator(Predicate<T> predicate, Consumer<Decorator.Context> resumer) {
        return StreamConsumers.decorator((context, dataReceiver) -> item -> {
            dataReceiver.onData(item);
            if (predicate.test(item)) {
                context.suspend();
                resumer.accept(context);
            }
        });
    }

    public static <T> StreamConsumerModifier<T, T> suspendDecorator(Predicate<T> predicate) {
        return StreamConsumers.suspendDecorator(predicate, Decorator.Context::resume);
    }

    public static <T> StreamConsumerModifier<T, T> oneByOne() {
        return StreamConsumers.suspendDecorator(item -> true);
    }

    public static <T> StreamConsumerModifier<T, T> randomlySuspending(Random random, double probability) {
        return StreamConsumers.suspendDecorator(item -> random.nextDouble() < probability);
    }

    public static <T> StreamConsumerModifier<T, T> randomlySuspending(double probability) {
        return StreamConsumers.randomlySuspending(new Random(), probability);
    }

    public static <T> StreamConsumerModifier<T, T> randomlySuspending() {
        return StreamConsumers.randomlySuspending(0.5);
    }

    public static interface Decorator<T> {
        public StreamDataReceiver<T> decorate(Context var1, StreamDataReceiver<T> var2);

        public static interface Context {
            public void suspend();

            public void resume();

            public void closeWithError(Throwable var1);
        }
    }

    static final class IdleImpl<T>
    implements StreamConsumer<T> {
        private final SettableStage<Void> endOfStream = SettableStage.create();

        IdleImpl() {
        }

        @Override
        public void setProducer(StreamProducer<T> producer) {
            producer.getEndOfStream().whenComplete((arg_0, arg_1) -> this.endOfStream.set(arg_0, arg_1));
            producer.produce($ -> {});
        }

        @Override
        public Stage<Void> getEndOfStream() {
            return this.endOfStream;
        }

        @Override
        public Set<StreamCapability> getCapabilities() {
            return EnumSet.of(StreamCapability.LATE_BINDING);
        }
    }

    static final class OfConsumerImpl<T>
    extends AbstractStreamConsumer<T> {
        private final Consumer<T> consumer;

        OfConsumerImpl(Consumer<T> consumer) {
            this.consumer = consumer;
        }

        @Override
        protected void onStarted() {
            this.getProducer().produce(this.consumer::accept);
        }

        @Override
        protected void onEndOfStream() {
        }

        @Override
        protected void onError(Throwable t) {
        }

        @Override
        public Set<StreamCapability> getCapabilities() {
            return EnumSet.of(StreamCapability.LATE_BINDING);
        }
    }

    static final class ClosingWithErrorImpl<T>
    implements StreamConsumer<T> {
        private final Throwable exception;
        private final SettableStage<Void> endOfStream = SettableStage.create();

        ClosingWithErrorImpl(Throwable exception) {
            this.exception = exception;
        }

        @Override
        public void setProducer(StreamProducer<T> producer) {
            Eventloop.getCurrentEventloop().post(() -> this.endOfStream.setException(this.exception));
        }

        @Override
        public Stage<Void> getEndOfStream() {
            return this.endOfStream;
        }

        @Override
        public Set<StreamCapability> getCapabilities() {
            return EnumSet.of(StreamCapability.LATE_BINDING);
        }
    }
}

