/*
 * Decompiled with CFR 0.152.
 */
package io.activej.csp.consumer;

import io.activej.async.process.AsyncCloseable;
import io.activej.async.process.AsyncExecutor;
import io.activej.common.exception.FatalErrorHandler;
import io.activej.common.function.FunctionEx;
import io.activej.common.recycle.Recyclers;
import io.activej.csp.consumer.AbstractChannelConsumer;
import io.activej.csp.process.transformer.ChannelConsumerTransformer;
import io.activej.promise.Promise;
import io.activej.promise.SettableCallback;
import io.activej.promise.SettablePromise;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import org.jetbrains.annotations.Nullable;

public interface ChannelConsumer<T>
extends AsyncCloseable {
    public Promise<Void> accept(@Nullable T var1);

    default public Promise<Void> acceptEndOfStream() {
        return this.accept(null);
    }

    default public Promise<Void> acceptAll(T ... items) {
        return this.acceptAll(Arrays.asList(items));
    }

    default public Promise<Void> acceptAll(Iterator<? extends T> it) {
        if (!it.hasNext()) {
            return Promise.complete();
        }
        return Promise.ofCallback(cb -> ChannelConsumer.acceptAllImpl(this, it, false, (SettableCallback<Void>)cb));
    }

    default public Promise<Void> acceptAll(List<T> list) {
        if (list.isEmpty()) {
            return Promise.complete();
        }
        return Promise.ofCallback(cb -> ChannelConsumer.acceptAllImpl(this, list.iterator(), true, (SettableCallback<Void>)cb));
    }

    default public <R> R transformWith(ChannelConsumerTransformer<T, R> fn) {
        return fn.transform(this);
    }

    default public ChannelConsumer<T> async() {
        return new AbstractChannelConsumer<T>(this){

            @Override
            protected Promise<Void> doAccept(T value) {
                return ChannelConsumer.this.accept(value).async();
            }
        };
    }

    default public ChannelConsumer<T> withExecutor(final AsyncExecutor executor) {
        return new AbstractChannelConsumer<T>(this){

            @Override
            protected Promise<Void> doAccept(T value) {
                return executor.execute(() -> ChannelConsumer.this.accept(value));
            }
        };
    }

    default public ChannelConsumer<T> peek(final Consumer<? super T> fn) {
        return new AbstractChannelConsumer<T>(this){

            @Override
            protected Promise<Void> doAccept(T value) {
                if (value != null) {
                    fn.accept(value);
                }
                return ChannelConsumer.this.accept(value);
            }
        };
    }

    default public <V> ChannelConsumer<V> map(final FunctionEx<? super V, ? extends T> fn) {
        return new AbstractChannelConsumer<V>(this){

            @Override
            protected Promise<Void> doAccept(V value) {
                if (value != null) {
                    Object newValue;
                    try {
                        newValue = fn.apply(value);
                    }
                    catch (Exception ex) {
                        FatalErrorHandler.handleError((Throwable)ex, (Object)fn);
                        ChannelConsumer.this.closeEx(ex);
                        return Promise.ofException((Exception)ex);
                    }
                    return ChannelConsumer.this.accept(newValue);
                }
                return ChannelConsumer.this.acceptEndOfStream();
            }
        };
    }

    default public <V> ChannelConsumer<V> mapAsync(final Function<? super V, Promise<T>> fn) {
        return new AbstractChannelConsumer<V>(this){

            @Override
            protected Promise<Void> doAccept(V value) {
                return value != null ? ((Promise)fn.apply(value)).then(ChannelConsumer.this::accept) : ChannelConsumer.this.acceptEndOfStream();
            }
        };
    }

    default public ChannelConsumer<T> filter(final Predicate<? super T> predicate) {
        return new AbstractChannelConsumer<T>(this){

            @Override
            protected Promise<Void> doAccept(T value) {
                if (value != null && predicate.test(value)) {
                    return ChannelConsumer.this.accept(value);
                }
                Recyclers.recycle(value);
                return Promise.complete();
            }
        };
    }

    default public ChannelConsumer<T> withAcknowledgement(UnaryOperator<Promise<Void>> fn) {
        final SettablePromise acknowledgement = new SettablePromise();
        final Promise newAcknowledgement = (Promise)fn.apply((Promise<Void>)acknowledgement);
        return new AbstractChannelConsumer<T>(this){

            @Override
            protected Promise<Void> doAccept(@Nullable T value) {
                if (value != null) {
                    return ChannelConsumer.this.accept(value).then(Promise::of, e -> {
                        acknowledgement.trySetException(e);
                        return newAcknowledgement;
                    });
                }
                ChannelConsumer.this.accept(null).subscribe((arg_0, arg_1) -> ((SettablePromise)acknowledgement).trySet(arg_0, arg_1));
                return newAcknowledgement;
            }

            protected void onClosed(Exception e) {
                acknowledgement.trySetException(e);
            }
        };
    }

    private static <T> void acceptAllImpl(ChannelConsumer<T> output, Iterator<? extends T> it, boolean ownership, SettableCallback<Void> cb) {
        while (it.hasNext()) {
            Promise<Void> accept = output.accept(it.next());
            if (accept.isResult()) continue;
            accept.subscribe(($, e) -> {
                if (e == null) {
                    ChannelConsumer.acceptAllImpl(output, it, ownership, cb);
                } else {
                    if (ownership) {
                        it.forEachRemaining(Recyclers::recycle);
                    } else {
                        Recyclers.recycle((Object)it);
                    }
                    cb.setException(e);
                }
            });
            return;
        }
        cb.set(null);
    }
}

