/*
 * Decompiled with CFR 0.152.
 */
package org.finos.tracdap.common.concurrent;

import io.netty.util.concurrent.OrderedEventExecutor;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Stream;
import org.finos.tracdap.common.concurrent.IExecutionContext;
import org.finos.tracdap.common.concurrent.flow.ConcatProcessor;
import org.finos.tracdap.common.concurrent.flow.DelayedSubscriber;
import org.finos.tracdap.common.concurrent.flow.EventLoopProcessor;
import org.finos.tracdap.common.concurrent.flow.FutureFirstItemSubscriber;
import org.finos.tracdap.common.concurrent.flow.FutureResultPublisher;
import org.finos.tracdap.common.concurrent.flow.HubProcessor;
import org.finos.tracdap.common.concurrent.flow.InterceptProcessor;
import org.finos.tracdap.common.concurrent.flow.MapProcessor;
import org.finos.tracdap.common.concurrent.flow.ReduceProcessor;
import org.finos.tracdap.common.concurrent.flow.SourcePublisher;
import org.finos.tracdap.common.exception.EUnexpected;

public class Flows {
    public static <T> Flow.Publisher<T> publish(List<T> source) {
        return new SourcePublisher<T>(source);
    }

    public static <T> Flow.Publisher<T> publish(Stream<T> source) {
        return new SourcePublisher<T>(source);
    }

    public static <T> Flow.Publisher<T> publish(CompletionStage<T> source) {
        return new FutureResultPublisher<T>(source);
    }

    public static <T> Flow.Processor<T, T> passThrough() {
        return new InterceptProcessor(null);
    }

    public static <T> Flow.Processor<T, T> interceptResult(Flow.Publisher<T> source, BiConsumer<T, Throwable> resultHandler) {
        InterceptProcessor<T> interceptor = new InterceptProcessor<T>(resultHandler);
        source.subscribe(interceptor);
        return interceptor;
    }

    public static <T> Flow.Subscriber<T> waitForSignal(Flow.Subscriber<T> target, CompletionStage<?> signal) {
        return new DelayedSubscriber<T>(target, signal);
    }

    public static <T, U> Flow.Publisher<U> map(Flow.Publisher<T> source, Function<T, U> mapping) {
        MapProcessor<T, U> map = new MapProcessor<T, U>(mapping);
        source.subscribe(map);
        return map;
    }

    public static <T> CompletionStage<T> reduce(Flow.Publisher<T> source, BiFunction<T, T, T> func) {
        CompletableFuture result = new CompletableFuture();
        ReduceProcessor<T, T> reduce = new ReduceProcessor<T, T>(func, result, Function.identity());
        source.subscribe(reduce);
        return result;
    }

    public static <T, U> CompletionStage<U> fold(Flow.Publisher<T> source, BiFunction<U, T, U> func, U acc) {
        CompletableFuture result = new CompletableFuture();
        ReduceProcessor<T, U> fold = new ReduceProcessor<T, U>(func, result, acc);
        source.subscribe(fold);
        return result;
    }

    public static <T> Flow.Processor<T, T> hub(IExecutionContext execCtx) {
        return new HubProcessor(execCtx.eventLoopExecutor());
    }

    public static <T> CompletionStage<T> first(Flow.Publisher<T> publisher) {
        CompletableFuture firstFuture = new CompletableFuture();
        FutureFirstItemSubscriber subscriber = new FutureFirstItemSubscriber(firstFuture);
        publisher.subscribe(subscriber);
        return firstFuture;
    }

    public static <T> Flow.Publisher<T> concat(List<Flow.Publisher<T>> publishers) {
        if (publishers.isEmpty()) {
            throw new EUnexpected();
        }
        ConcatProcessor<T> concat = new ConcatProcessor<T>(publishers);
        publishers.get(0).subscribe(concat);
        return concat;
    }

    public static <T> Flow.Publisher<T> concat(CompletionStage<T> head, Flow.Publisher<T> tail) {
        Flow.Publisher headStream = Flows.publish(head);
        ArrayList publishers = new ArrayList(2);
        publishers.add(headStream);
        publishers.add(tail);
        ConcatProcessor concat = new ConcatProcessor(publishers);
        headStream.subscribe(concat);
        return concat;
    }

    public static <T> CompletionStage<List<T>> toList(Flow.Publisher<T> source) {
        return Flows.fold(source, (xs, x) -> {
            xs.add(x);
            return xs;
        }, new ArrayList());
    }

    public static <T> Flow.Publisher<T> onEventLoop(Flow.Publisher<T> publisher, OrderedEventExecutor executor) {
        EventLoopProcessor relay = new EventLoopProcessor(executor);
        publisher.subscribe(relay);
        return relay;
    }
}

