package io.datakernel.etl;

import io.datakernel.async.process.AsyncCollector;
import io.datakernel.common.Preconditions;
import io.datakernel.datastream.AbstractStreamConsumer;
import io.datakernel.datastream.AbstractStreamSupplier;
import io.datakernel.datastream.StreamConsumer;
import io.datakernel.datastream.StreamConsumerWithResult;
import io.datakernel.datastream.StreamDataAcceptor;
import io.datakernel.datastream.StreamInput;
import io.datakernel.datastream.StreamOutputs;
import io.datakernel.datastream.StreamSupplier;
import io.datakernel.promise.Promise;
import io.datakernel.promise.Promises;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:io/datakernel/etl/LogDataConsumerSplitter.class */
public abstract class LogDataConsumerSplitter<T, D> implements LogDataConsumer<T, D> {
    private final List<LogDataConsumer<?, D>> logDataConsumers = new ArrayList();

    @Nullable
    private Iterator<? extends StreamDataAcceptor<?>> receivers;

    /* loaded from: input_file:io/datakernel/etl/LogDataConsumerSplitter$Splitter.class */
    final class Splitter implements StreamInput<T>, StreamOutputs {
        private StreamDataAcceptor<T> inputAcceptor;
        private final List<LogDataConsumerSplitter<T, D>.Splitter.Output<?>> outputs = new ArrayList();
        private int ready = 0;
        private final LogDataConsumerSplitter<T, D>.Splitter.Input input = new Input();

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:io/datakernel/etl/LogDataConsumerSplitter$Splitter$Input.class */
        public final class Input extends AbstractStreamConsumer<T> {
            Input() {
            }

            protected Promise<Void> onEndOfStream() {
                return Promises.all(Splitter.this.outputs.stream().map((v0) -> {
                    return v0.sendEndOfStream();
                }));
            }

            protected void onError(Throwable th) {
                Splitter.this.outputs.forEach(output -> {
                    output.close(th);
                });
            }
        }

        /* loaded from: input_file:io/datakernel/etl/LogDataConsumerSplitter$Splitter$Output.class */
        final class Output<X> extends AbstractStreamSupplier<X> {
            private StreamDataAcceptor<?> dataAcceptor;

            Output() {
            }

            /* JADX WARN: Multi-variable type inference failed */
            protected void onProduce(@NotNull StreamDataAcceptor<X> streamDataAcceptor) {
                this.dataAcceptor = streamDataAcceptor;
                if (Splitter.access$104(Splitter.this) == Splitter.this.outputs.size()) {
                    if (Splitter.this.inputAcceptor == null) {
                        LogDataConsumerSplitter.this.receivers = Splitter.this.outputs.stream().map(output -> {
                            return output.dataAcceptor;
                        }).iterator();
                        Splitter.this.inputAcceptor = LogDataConsumerSplitter.this.createSplitter();
                        Preconditions.checkState(!LogDataConsumerSplitter.this.receivers.hasNext(), "Receivers must correspond to outputs");
                        LogDataConsumerSplitter.this.receivers = null;
                    }
                    Splitter.this.input.getSupplier().resume(Splitter.this.inputAcceptor);
                }
            }

            protected void onSuspended() {
                Splitter.access$106(Splitter.this);
                Splitter.this.input.getSupplier().suspend();
            }

            protected void onError(Throwable th) {
                Splitter.this.input.close(th);
            }
        }

        Splitter() {
        }

        public StreamConsumer<T> getInput() {
            return this.input;
        }

        public List<? extends StreamSupplier<?>> getOutputs() {
            return this.outputs;
        }

        static /* synthetic */ int access$104(Splitter splitter) {
            int i = splitter.ready + 1;
            splitter.ready = i;
            return i;
        }

        static /* synthetic */ int access$106(Splitter splitter) {
            int i = splitter.ready - 1;
            splitter.ready = i;
            return i;
        }
    }

    @Override // io.datakernel.etl.LogDataConsumer
    public StreamConsumerWithResult<T, List<D>> consume() {
        if (this.logDataConsumers.isEmpty()) {
            createSplitter();
            Preconditions.checkState(!this.logDataConsumers.isEmpty(), "addOutput() should be called at least once");
        }
        AsyncCollector create = AsyncCollector.create(new ArrayList());
        Splitter splitter = new Splitter();
        Iterator<LogDataConsumer<?, D>> it = this.logDataConsumers.iterator();
        while (it.hasNext()) {
            StreamConsumerWithResult<?, List<D>> consume = it.next().consume();
            create.addPromise(consume.getResult(), (v0, v1) -> {
                v0.addAll(v1);
            });
            splitter.getClass();
            Splitter.Output output = new Splitter.Output();
            splitter.outputs.add(output);
            output.streamTo(consume.getConsumer());
        }
        return StreamConsumerWithResult.of(splitter.getInput(), create.run().get());
    }

    protected abstract StreamDataAcceptor<T> createSplitter();

    /* JADX WARN: Multi-variable type inference failed */
    @Nullable
    protected final <X> StreamDataAcceptor<X> addOutput(LogDataConsumer<X, D> logDataConsumer) {
        if (this.receivers != null) {
            return this.receivers.next();
        }
        this.logDataConsumers.add(logDataConsumer);
        return null;
    }
}
