/*
 * Decompiled with CFR 0.152.
 */
package net.lecousin.framework.concurrent.util;

import java.util.function.Function;
import net.lecousin.framework.concurrent.Task;
import net.lecousin.framework.concurrent.async.Async;
import net.lecousin.framework.concurrent.async.AsyncSupplier;
import net.lecousin.framework.concurrent.async.IAsync;
import net.lecousin.framework.concurrent.util.AsyncConsumer;
import net.lecousin.framework.util.Runnables;

public interface AsyncProducer<T, TError extends Exception> {
    public AsyncSupplier<T, TError> produce();

    default public AsyncSupplier<T, TError> produce(String description, byte priority) {
        AsyncSupplier result = new AsyncSupplier();
        new Task.Cpu.FromRunnable(description, priority, () -> this.produce().forward(result)).start();
        return result;
    }

    default public Async<TError> toConsumer(final AsyncConsumer<T, TError> consumer, final String description, final byte priority) {
        final Async<Exception> result = new Async<Exception>();
        Runnables.ConsumerThrows consume = new Runnables.ConsumerThrows<T, TError>(){

            @Override
            public void accept(T data) {
                1 that = this;
                if (data == null) {
                    consumer.end().onDone(result);
                    return;
                }
                AsyncSupplier nextProduction = AsyncProducer.this.produce(description, priority);
                IAsync consumption = consumer.consume(data);
                consumption.onDone(() -> nextProduction.thenStart(new Task.Cpu.Parameter.FromConsumerThrows(description, priority, that), result), result);
            }
        };
        this.produce().thenStart(new Task.Cpu.Parameter.FromConsumerThrows(description, priority, consume), result);
        result.onError(consumer::error);
        return result;
    }

    default public <T2> Async<TError> toConsumer(final Function<T, AsyncSupplier<T2, TError>> converter, final AsyncConsumer<T2, TError> consumer, final String description, final byte priority) {
        final Async<Exception> result = new Async<Exception>();
        Runnables.ConsumerThrows consume = new Runnables.ConsumerThrows<T, TError>(){

            @Override
            public void accept(T data) {
                2 that = this;
                if (data == null) {
                    consumer.end().onDone(result);
                    return;
                }
                AsyncSupplier nextProduction = AsyncProducer.this.produce(description, priority);
                AsyncSupplier conversion = (AsyncSupplier)converter.apply(data);
                conversion.thenStart(new Task.Cpu.Parameter.FromConsumerThrows(description, priority, converted -> {
                    IAsync consumption = consumer.consume(converted);
                    consumption.onDone(() -> nextProduction.thenStart(new Task.Cpu.Parameter.FromConsumerThrows(description, priority, that), result), result);
                }), result);
            }
        };
        this.produce().thenStart(new Task.Cpu.Parameter.FromConsumerThrows(description, priority, consume), result);
        result.onError(consumer::error);
        return result;
    }

    public static class Empty<T, TError extends Exception>
    implements AsyncProducer<T, TError> {
        @Override
        public AsyncSupplier<T, TError> produce() {
            return new AsyncSupplier<Object, Object>(null, null);
        }
    }

    public static class SingleData<T, TError extends Exception>
    implements AsyncProducer<T, TError> {
        private T data;

        public SingleData(T data) {
            this.data = data;
        }

        @Override
        public AsyncSupplier<T, TError> produce() {
            T d = this.data;
            this.data = null;
            return new AsyncSupplier<T, Object>(d, null);
        }
    }
}

