package io.activej.csp;

import io.activej.bytebuf.ByteBuf;
import io.activej.bytebuf.ByteBufPool;
import io.activej.common.MemSize;
import io.activej.common.Utils;
import io.activej.common.api.Recyclable;
import io.activej.common.collection.CollectionUtils;
import io.activej.common.collection.Try;
import io.activej.common.exception.StacklessException;
import io.activej.common.exception.UncheckedException;
import io.activej.csp.queue.ChannelBuffer;
import io.activej.csp.queue.ChannelZeroBuffer;
import io.activej.eventloop.Eventloop;
import io.activej.eventloop.util.RunnableWithContext;
import io.activej.promise.Promise;
import io.activej.promise.Promises;
import io.activej.promise.SettablePromise;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:io/activej/csp/ChannelSuppliers.class */
public final class ChannelSuppliers {

    /* loaded from: input_file:io/activej/csp/ChannelSuppliers$ChannelSupplierEmpty.class */
    public static class ChannelSupplierEmpty<T> extends AbstractChannelSupplier<T> {
        @Override // io.activej.csp.AbstractChannelSupplier
        protected Promise<T> doGet() {
            return Promise.of((Object) null);
        }
    }

    /* loaded from: input_file:io/activej/csp/ChannelSuppliers$ChannelSupplierOfException.class */
    public static final class ChannelSupplierOfException<T> extends AbstractChannelSupplier<T> {
        private final Throwable e;

        public ChannelSupplierOfException(Throwable th) {
            this.e = th;
        }

        @Override // io.activej.csp.AbstractChannelSupplier
        protected Promise<T> doGet() {
            return Promise.ofException(this.e);
        }
    }

    /* loaded from: input_file:io/activej/csp/ChannelSuppliers$ChannelSupplierOfIterator.class */
    public static final class ChannelSupplierOfIterator<T> extends AbstractChannelSupplier<T> {
        private final Iterator<? extends T> iterator;

        public ChannelSupplierOfIterator(Iterator<? extends T> it) {
            this.iterator = it;
        }

        @Override // io.activej.csp.AbstractChannelSupplier
        protected Promise<T> doGet() {
            return Promise.of(this.iterator.hasNext() ? this.iterator.next() : null);
        }

        protected void onCleanup() {
            Recyclable.deepRecycle(this.iterator);
        }
    }

    /* loaded from: input_file:io/activej/csp/ChannelSuppliers$ChannelSupplierOfValue.class */
    public static final class ChannelSupplierOfValue<T> extends AbstractChannelSupplier<T> {
        private T item;

        public T getValue() {
            return this.item;
        }

        public T takeValue() {
            T t = this.item;
            this.item = null;
            return t;
        }

        public ChannelSupplierOfValue(@NotNull T t) {
            this.item = t;
        }

        @Override // io.activej.csp.AbstractChannelSupplier
        protected Promise<T> doGet() {
            return Promise.of(takeValue());
        }

        protected void onCleanup() {
            this.item = (T) Utils.nullify(this.item, Recyclable::tryRecycle);
        }
    }

    public static <T> ChannelSupplier<T> concat(ChannelSupplier<? extends T> channelSupplier, ChannelSupplier<? extends T> channelSupplier2) {
        return concat(CollectionUtils.asIterator(channelSupplier, channelSupplier2));
    }

    @SafeVarargs
    public static <T> ChannelSupplier<T> concat(ChannelSupplier<? extends T>... channelSupplierArr) {
        return concat(CollectionUtils.asIterator(channelSupplierArr));
    }

    public static <T> ChannelSupplier<T> concat(final Iterator<? extends ChannelSupplier<? extends T>> it) {
        return new AbstractChannelSupplier<T>() { // from class: io.activej.csp.ChannelSuppliers.1
            ChannelSupplier<? extends T> current = ChannelSupplier.of();

            @Override // io.activej.csp.AbstractChannelSupplier
            protected Promise<T> doGet() {
                Promise<? extends T> promise = this.current.get();
                Iterator it2 = it;
                return promise.thenEx((obj, th) -> {
                    if (th != null) {
                        while (it2.hasNext()) {
                            ((ChannelSupplier) it2.next()).closeEx(th);
                        }
                        return Promise.ofException(th);
                    }
                    if (obj != null) {
                        return Promise.of(obj);
                    }
                    if (!it2.hasNext()) {
                        return Promise.of((Object) null);
                    }
                    this.current = (ChannelSupplier) it2.next();
                    return get();
                });
            }

            protected void onClosed(@NotNull Throwable th) {
                this.current.closeEx(th);
                while (it.hasNext()) {
                    ((ChannelSupplier) it.next()).closeEx(th);
                }
            }
        };
    }

    public static <T, A, R> Promise<R> collect(ChannelSupplier<T> channelSupplier, A a, BiConsumer<A, T> biConsumer, Function<A, R> function) {
        return Promise.ofCallback(settablePromise -> {
            toCollectorImpl(channelSupplier, a, biConsumer, function, settablePromise);
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Multi-variable type inference failed */
    public static <T, A, R> void toCollectorImpl(ChannelSupplier<T> channelSupplier, A a, BiConsumer<A, T> biConsumer, Function<A, R> function, SettablePromise<R> settablePromise) {
        Promise<T> promise;
        Object result;
        while (true) {
            promise = channelSupplier.get();
            if (promise.isResult() && (result = promise.getResult()) != null) {
                try {
                    biConsumer.accept(a, result);
                } catch (UncheckedException e) {
                    Throwable cause = e.getCause();
                    channelSupplier.closeEx(cause);
                    settablePromise.setException(cause);
                    return;
                }
            }
        }
        promise.whenComplete((obj, th) -> {
            if (th != null) {
                Recyclable.deepRecycle(function.apply(a));
                settablePromise.setException(th);
            } else {
                if (obj == null) {
                    settablePromise.set(function.apply(a));
                    return;
                }
                try {
                    biConsumer.accept(a, obj);
                    toCollectorImpl(channelSupplier, a, biConsumer, function, settablePromise);
                } catch (UncheckedException e2) {
                    Throwable cause2 = e2.getCause();
                    channelSupplier.closeEx(cause2);
                    settablePromise.setException(cause2);
                }
            }
        });
    }

    public static <T> Promise<Void> streamTo(Promise<ChannelSupplier<T>> promise, Promise<ChannelConsumer<T>> promise2) {
        return Promises.toTuple(promise.toTry(), promise2.toTry()).then(tuple2 -> {
            return streamTo((Try) tuple2.getValue1(), (Try) tuple2.getValue2());
        });
    }

    public static <T> Promise<Void> streamTo(Try<ChannelSupplier<T>> r5, Try<ChannelConsumer<T>> r6) {
        if (r5.isSuccess() && r6.isSuccess()) {
            return streamTo((ChannelSupplier) r5.get(), (ChannelConsumer) r6.get());
        }
        StacklessException stacklessException = new StacklessException("Channel stream failed");
        Consumer consumer = (v0) -> {
            v0.close();
        };
        Objects.requireNonNull(stacklessException);
        r5.consume(consumer, stacklessException::addSuppressed);
        Consumer consumer2 = (v0) -> {
            v0.close();
        };
        Objects.requireNonNull(stacklessException);
        r6.consume(consumer2, stacklessException::addSuppressed);
        return Promise.ofException(stacklessException);
    }

    public static <T> Promise<Void> streamTo(ChannelSupplier<T> channelSupplier, ChannelConsumer<T> channelConsumer) {
        return Promise.ofCallback(settablePromise -> {
            streamToImpl(channelSupplier, channelConsumer, settablePromise);
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Multi-variable type inference failed */
    public static <T> void streamToImpl(ChannelSupplier<T> channelSupplier, ChannelConsumer<T> channelConsumer, SettablePromise<Void> settablePromise) {
        Object result;
        Promise<Void> accept;
        do {
            Promise<T> promise = channelSupplier.get();
            if (!promise.isResult() || (result = promise.getResult()) == null) {
                promise.whenComplete((obj, th) -> {
                    if (th == null) {
                        channelConsumer.accept(obj).whenComplete((r8, th) -> {
                            if (th != null) {
                                channelSupplier.closeEx(th);
                                settablePromise.trySetException(th);
                            } else if (obj != null) {
                                streamToImpl(channelSupplier, channelConsumer, settablePromise);
                            } else {
                                settablePromise.trySet((Object) null);
                            }
                        });
                    } else {
                        channelConsumer.closeEx(th);
                        settablePromise.trySetException(th);
                    }
                });
                return;
            }
            accept = channelConsumer.accept(result);
        } while (accept.isResult());
        accept.whenComplete((r7, th2) -> {
            if (th2 == null) {
                streamToImpl(channelSupplier, channelConsumer, settablePromise);
            } else {
                channelSupplier.closeEx(th2);
                settablePromise.trySetException(th2);
            }
        });
    }

    public static <T> ChannelSupplier<T> prefetch(int i, ChannelSupplier<T> channelSupplier) {
        ChannelBuffer channelBuffer = new ChannelBuffer(i);
        channelSupplier.streamTo(channelBuffer.getConsumer());
        return channelBuffer.getSupplier();
    }

    public static <T> ChannelSupplier<T> prefetch(ChannelSupplier<T> channelSupplier) {
        ChannelZeroBuffer channelZeroBuffer = new ChannelZeroBuffer();
        channelSupplier.streamTo(channelZeroBuffer.getConsumer());
        return channelZeroBuffer.getSupplier();
    }

    public static <T, V> ChannelSupplier<V> remap(final ChannelSupplier<T> channelSupplier, final Function<? super T, ? extends Iterator<? extends V>> function) {
        return new AbstractChannelSupplier<V>(channelSupplier) { // from class: io.activej.csp.ChannelSuppliers.2
            Iterator<? extends V> iterator = CollectionUtils.emptyIterator();
            boolean endOfStream;

            @Override // io.activej.csp.AbstractChannelSupplier
            protected Promise<V> doGet() {
                return this.iterator.hasNext() ? Promise.of(this.iterator.next()) : Promise.ofCallback(this::next);
            }

            private void next(SettablePromise<V> settablePromise) {
                if (this.endOfStream) {
                    settablePromise.set((Object) null);
                    return;
                }
                Promise promise = channelSupplier.get();
                Function function2 = function;
                promise.whenComplete((obj, th) -> {
                    if (th != null) {
                        settablePromise.setException(th);
                        return;
                    }
                    if (obj == null) {
                        this.endOfStream = true;
                    }
                    this.iterator = (Iterator) function2.apply(obj);
                    if (this.iterator.hasNext()) {
                        settablePromise.set(this.iterator.next());
                    } else {
                        next(settablePromise);
                    }
                });
            }
        };
    }

    public static ChannelSupplier<ByteBuf> inputStreamAsChannelSupplier(Executor executor, MemSize memSize, InputStream inputStream) {
        return inputStreamAsChannelSupplier(executor, memSize.toInt(), inputStream);
    }

    public static ChannelSupplier<ByteBuf> inputStreamAsChannelSupplier(final Executor executor, final int i, final InputStream inputStream) {
        return new AbstractChannelSupplier<ByteBuf>() { // from class: io.activej.csp.ChannelSuppliers.3
            @Override // io.activej.csp.AbstractChannelSupplier
            protected Promise<ByteBuf> doGet() {
                Executor executor2 = executor;
                int i2 = i;
                InputStream inputStream2 = inputStream;
                return Promise.ofBlockingCallable(executor2, () -> {
                    ByteBuf allocate = ByteBufPool.allocate(i2);
                    try {
                        int read = inputStream2.read(allocate.array(), 0, i2);
                        if (read != -1) {
                            allocate.moveTail(read);
                            return allocate;
                        }
                        allocate.recycle();
                        return null;
                    } catch (IOException e) {
                        throw new UncheckedException(e);
                    }
                });
            }

            protected void onClosed(@NotNull Throwable th) {
                Executor executor2 = executor;
                InputStream inputStream2 = inputStream;
                executor2.execute(() -> {
                    try {
                        inputStream2.close();
                    } catch (IOException e) {
                    }
                });
            }
        };
    }

    public static InputStream channelSupplierAsInputStream(final Eventloop eventloop, final ChannelSupplier<ByteBuf> channelSupplier) {
        return new InputStream() { // from class: io.activej.csp.ChannelSuppliers.4

            @Nullable
            ByteBuf current = null;

            @Override // java.io.InputStream
            public int read() throws IOException {
                return doRead((v0) -> {
                    return v0.readByte();
                });
            }

            @Override // java.io.InputStream
            public int read(@NotNull byte[] bArr, int i, int i2) throws IOException {
                return doRead(byteBuf -> {
                    return byteBuf.read(bArr, i, Math.min(byteBuf.readRemaining(), i2));
                });
            }

            /* JADX WARN: Code restructure failed: missing block: B:11:0x009e, code lost:
            
                r7 = r0;
             */
            /* JADX WARN: Code restructure failed: missing block: B:14:0x0095, code lost:
            
                return -1;
             */
            /* JADX WARN: Code restructure failed: missing block: B:17:0x002c, code lost:
            
                r10 = move-exception;
             */
            /* JADX WARN: Code restructure failed: missing block: B:18:0x002e, code lost:
            
                r0 = r4;
                r1 = r5;
                r2 = r5;
                java.util.Objects.requireNonNull(r2);
                r0.execute(io.activej.eventloop.util.RunnableWithContext.wrapContext(r1, r2::close));
             */
            /* JADX WARN: Code restructure failed: missing block: B:19:0x0053, code lost:
            
                throw new java.io.IOException(r10);
             */
            /* JADX WARN: Code restructure failed: missing block: B:20:0x0054, code lost:
            
                r10 = move-exception;
             */
            /* JADX WARN: Code restructure failed: missing block: B:21:0x0056, code lost:
            
                r0 = r10.getCause();
             */
            /* JADX WARN: Code restructure failed: missing block: B:22:0x0062, code lost:
            
                if ((r0 instanceof java.io.IOException) != false) goto L12;
             */
            /* JADX WARN: Code restructure failed: missing block: B:24:0x006a, code lost:
            
                throw ((java.io.IOException) r0);
             */
            /* JADX WARN: Code restructure failed: missing block: B:26:0x0070, code lost:
            
                if ((r0 instanceof java.lang.RuntimeException) != false) goto L16;
             */
            /* JADX WARN: Code restructure failed: missing block: B:28:0x0078, code lost:
            
                throw ((java.lang.RuntimeException) r0);
             */
            /* JADX WARN: Code restructure failed: missing block: B:2:0x0006, code lost:
            
                if (r7 == null) goto L4;
             */
            /* JADX WARN: Code restructure failed: missing block: B:30:0x007e, code lost:
            
                if ((r0 instanceof java.lang.Exception) != false) goto L20;
             */
            /* JADX WARN: Code restructure failed: missing block: B:32:0x008a, code lost:
            
                throw new java.io.IOException(r0);
             */
            /* JADX WARN: Code restructure failed: missing block: B:34:0x0090, code lost:
            
                throw ((java.lang.Error) r0);
             */
            /* JADX WARN: Code restructure failed: missing block: B:35:0x00a0, code lost:
            
                r0 = r6.applyAsInt(r7);
             */
            /* JADX WARN: Code restructure failed: missing block: B:36:0x00ac, code lost:
            
                if (r7.canRead() == false) goto L34;
             */
            /* JADX WARN: Code restructure failed: missing block: B:37:0x00af, code lost:
            
                r5.current = r7;
             */
            /* JADX WARN: Code restructure failed: missing block: B:39:0x00c1, code lost:
            
                return r0;
             */
            /* JADX WARN: Code restructure failed: missing block: B:3:0x0009, code lost:
            
                r0 = r4;
                r1 = r5;
                java.util.Objects.requireNonNull(r1);
             */
            /* JADX WARN: Code restructure failed: missing block: B:40:0x00b7, code lost:
            
                r5.current = null;
                r7.recycle();
             */
            /* JADX WARN: Code restructure failed: missing block: B:5:0x0020, code lost:
            
                r0 = (io.activej.bytebuf.ByteBuf) r0.submit(r1::get).get();
             */
            /* JADX WARN: Code restructure failed: missing block: B:7:0x0092, code lost:
            
                if (r0 != null) goto L28;
             */
            /* JADX WARN: Code restructure failed: missing block: B:9:0x009b, code lost:
            
                if (r0.canRead() == false) goto L41;
             */
            /*
                Code decompiled incorrectly, please refer to instructions dump.
                To view partially-correct add '--show-bad-code' argument
            */
            private int doRead(java.util.function.ToIntFunction<io.activej.bytebuf.ByteBuf> r6) throws java.io.IOException {
                /*
                    r5 = this;
                    r0 = r5
                    io.activej.bytebuf.ByteBuf r0 = r0.current
                    r7 = r0
                    r0 = r7
                    if (r0 != 0) goto La0
                L9:
                    r0 = r5
                    io.activej.eventloop.Eventloop r0 = r4
                    r1 = r5
                    io.activej.csp.ChannelSupplier r1 = r5
                    r2 = r1
                    java.lang.Object r2 = java.util.Objects.requireNonNull(r2)
                    int r1 = r1::get
                    java.util.concurrent.CompletableFuture r0 = r0.submit(r1)
                    r9 = r0
                    r0 = r9
                    java.lang.Object r0 = r0.get()     // Catch: java.lang.InterruptedException -> L2c java.util.concurrent.ExecutionException -> L54
                    io.activej.bytebuf.ByteBuf r0 = (io.activej.bytebuf.ByteBuf) r0     // Catch: java.lang.InterruptedException -> L2c java.util.concurrent.ExecutionException -> L54
                    r8 = r0
                    goto L91
                L2c:
                    r10 = move-exception
                    r0 = r5
                    io.activej.eventloop.Eventloop r0 = r4
                    r1 = r5
                    io.activej.csp.ChannelSupplier r1 = r5
                    r2 = r5
                    io.activej.csp.ChannelSupplier r2 = r5
                    r3 = r2
                    java.lang.Object r3 = java.util.Objects.requireNonNull(r3)
                    int r2 = r2::close
                    java.lang.Runnable r1 = io.activej.eventloop.util.RunnableWithContext.wrapContext(r1, r2)
                    r0.execute(r1)
                    java.io.IOException r0 = new java.io.IOException
                    r1 = r0
                    r2 = r10
                    r1.<init>(r2)
                    throw r0
                L54:
                    r10 = move-exception
                    r0 = r10
                    java.lang.Throwable r0 = r0.getCause()
                    r11 = r0
                    r0 = r11
                    boolean r0 = r0 instanceof java.io.IOException
                    if (r0 == 0) goto L6b
                    r0 = r11
                    java.io.IOException r0 = (java.io.IOException) r0
                    throw r0
                L6b:
                    r0 = r11
                    boolean r0 = r0 instanceof java.lang.RuntimeException
                    if (r0 == 0) goto L79
                    r0 = r11
                    java.lang.RuntimeException r0 = (java.lang.RuntimeException) r0
                    throw r0
                L79:
                    r0 = r11
                    boolean r0 = r0 instanceof java.lang.Exception
                    if (r0 == 0) goto L8b
                    java.io.IOException r0 = new java.io.IOException
                    r1 = r0
                    r2 = r11
                    r1.<init>(r2)
                    throw r0
                L8b:
                    r0 = r11
                    java.lang.Error r0 = (java.lang.Error) r0
                    throw r0
                L91:
                    r0 = r8
                    if (r0 != 0) goto L97
                    r0 = -1
                    return r0
                L97:
                    r0 = r8
                    boolean r0 = r0.canRead()
                    if (r0 == 0) goto L9
                    r0 = r8
                    r7 = r0
                La0:
                    r0 = r6
                    r1 = r7
                    int r0 = r0.applyAsInt(r1)
                    r8 = r0
                    r0 = r7
                    boolean r0 = r0.canRead()
                    if (r0 == 0) goto Lb7
                    r0 = r5
                    r1 = r7
                    r0.current = r1
                    goto Lc0
                Lb7:
                    r0 = r5
                    r1 = 0
                    r0.current = r1
                    r0 = r7
                    r0.recycle()
                Lc0:
                    r0 = r8
                    return r0
                */
                throw new UnsupportedOperationException("Method not decompiled: io.activej.csp.ChannelSuppliers.AnonymousClass4.doRead(java.util.function.ToIntFunction):int");
            }

            @Override // java.io.InputStream, java.io.Closeable, java.lang.AutoCloseable
            public void close() {
                this.current = (ByteBuf) Utils.nullify(this.current, (v0) -> {
                    v0.recycle();
                });
                Eventloop eventloop2 = eventloop;
                ChannelSupplier channelSupplier2 = channelSupplier;
                ChannelSupplier channelSupplier3 = channelSupplier;
                Objects.requireNonNull(channelSupplier3);
                eventloop2.execute(RunnableWithContext.wrapContext(channelSupplier2, channelSupplier3::close));
            }
        };
    }
}
