package org.frozenarc.datapipe;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import org.frozenarc.datapipe.joiner.JoinerComponents;
import org.frozenarc.datapipe.joiner.JoinerFuture;
import org.frozenarc.datapipe.joiner.StreamJoiner;
import org.frozenarc.datapipe.reader.ReaderComponents;
import org.frozenarc.datapipe.reader.ReaderFuture;
import org.frozenarc.datapipe.reader.StreamReader;
import org.frozenarc.datapipe.writer.StreamWriter;
import org.frozenarc.datapipe.writer.WriterComponents;
import org.frozenarc.datapipe.writer.WriterFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/frozenarc/datapipe/DataPipe.class */
public class DataPipe {
    private static final Logger log = LoggerFactory.getLogger(DataPipe.class);
    private final WriterComponents writerComps;
    private final JoinerComponents[] joinersComps;
    private final ReaderComponents readerComps;

    /* loaded from: input_file:org/frozenarc/datapipe/DataPipe$Builder.class */
    public static class Builder {
        private WriterComponents writerComps;
        private final List<JoinerComponents> joinerComps = new ArrayList();
        private ReaderComponents readerComps;

        public Builder streamWriter(StreamWriter streamWriter) {
            this.writerComps = new WriterComponents(streamWriter, null);
            return this;
        }

        public Builder streamWriter(StreamWriter streamWriter, Consumer<Exception> consumer) {
            this.writerComps = new WriterComponents(streamWriter, consumer);
            return this;
        }

        public Builder addStreamJoiner(StreamJoiner streamJoiner) {
            this.joinerComps.add(new JoinerComponents(streamJoiner, null));
            return this;
        }

        public Builder addStreamJoiner(StreamJoiner streamJoiner, Consumer<Exception> consumer) {
            this.joinerComps.add(new JoinerComponents(streamJoiner, consumer));
            return this;
        }

        public Builder streamReader(StreamReader streamReader) {
            this.readerComps = new ReaderComponents(streamReader, null);
            return this;
        }

        public Builder streamReader(StreamReader streamReader, Consumer<Exception> consumer) {
            this.readerComps = new ReaderComponents(streamReader, consumer);
            return this;
        }

        public DataPipe build() {
            return new DataPipe(this.writerComps, (JoinerComponents[]) this.joinerComps.toArray(new JoinerComponents[0]), this.readerComps);
        }
    }

    private DataPipe(WriterComponents writerComponents, JoinerComponents[] joinerComponentsArr, ReaderComponents readerComponents) {
        this.writerComps = writerComponents;
        this.joinersComps = joinerComponentsArr;
        this.readerComps = readerComponents;
    }

    public void doStream() throws DataPipeException {
        ExecutorService executorService = null;
        try {
            executorService = Executors.newFixedThreadPool(this.joinersComps.length + 2);
            doStream(executorService);
            if (executorService != null) {
                executorService.shutdown();
                log.debug("Executor has been shutdown");
            }
        } catch (Throwable th) {
            if (executorService != null) {
                executorService.shutdown();
                log.debug("Executor has been shutdown");
            }
            throw th;
        }
    }

    public void doStream(ExecutorService executorService) throws DataPipeException {
        ArrayList arrayList = new ArrayList();
        List synchronizedList = Collections.synchronizedList(new ArrayList());
        for (int i = 0; i < this.joinersComps.length + 1; i++) {
            try {
                arrayList.add(new PipedStream());
            } catch (Throwable th) {
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    ((PipedStream) it.next()).close();
                    log.debug("all piped streams are closed");
                }
                throw th;
            }
        }
        ArrayList arrayList2 = new ArrayList();
        int i2 = 0;
        WriterFuture writerFuture = writerFuture((PipedStream) arrayList.get(0), executorService);
        StreamWriter writer = this.writerComps.getWriter();
        Optional map = Optional.ofNullable(this.writerComps.getExpConsumer()).map(consumer -> {
            return getCombinedConsumer(synchronizedList, consumer);
        });
        Objects.requireNonNull(synchronizedList);
        arrayList2.add(writerFuture.getFuture(writer, (Consumer) map.orElse((v1) -> {
            r4.add(v1);
        })));
        while (i2 < this.joinersComps.length) {
            JoinerFuture joinerFuture = joinerFuture((PipedStream) arrayList.get(i2), (PipedStream) arrayList.get(i2 + 1), executorService);
            StreamJoiner joiner = this.joinersComps[i2].getJoiner();
            Optional map2 = Optional.ofNullable(this.joinersComps[i2].getExpConsumer()).map(consumer2 -> {
                return getCombinedConsumer(synchronizedList, consumer2);
            });
            Objects.requireNonNull(synchronizedList);
            arrayList2.add(joinerFuture.getFuture(joiner, (Consumer) map2.orElse((v1) -> {
                r4.add(v1);
            })));
            i2++;
        }
        ReaderFuture readerFuture = readerFuture((PipedStream) arrayList.get(i2), executorService);
        StreamReader reader = this.readerComps.getReader();
        Optional map3 = Optional.ofNullable(this.readerComps.getExpConsumer()).map(consumer3 -> {
            return getCombinedConsumer(synchronizedList, consumer3);
        });
        Objects.requireNonNull(synchronizedList);
        arrayList2.add(readerFuture.getFuture(reader, (Consumer) map3.orElse((v1) -> {
            r4.add(v1);
        })));
        log.debug("all components are set.. streaming is started");
        CompletableFuture.allOf((CompletableFuture[]) arrayList2.toArray(new CompletableFuture[0])).join();
        log.debug("streaming is done");
        if (synchronizedList.isEmpty()) {
            log.debug("doStream end");
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                ((PipedStream) it2.next()).close();
                log.debug("all piped streams are closed");
            }
            return;
        }
        DataPipeException dataPipeException = new DataPipeException((Throwable) synchronizedList.get(0));
        for (int i3 = 1; i3 < synchronizedList.size(); i3++) {
            dataPipeException.addSuppressed((Throwable) synchronizedList.get(i3));
        }
        throw dataPipeException;
    }

    private CompletableFuture<Void> writerFuture(PipedStream pipedStream, Executor executor, StreamWriter streamWriter, Consumer<Exception> consumer) {
        return CompletableFuture.runAsync(() -> {
            boolean z = false;
            try {
                try {
                    log.debug("StreamWriter: start writing to output stream");
                    streamWriter.writeTo(pipedStream.getOutputStream());
                    log.debug("StreamWriter: written to output stream");
                    pipedStream.closeAfterWrite(false);
                } catch (Exception e) {
                    z = true;
                    consumer.accept(e);
                    log.error("StreamWriter: Error during writing to output stream", e);
                    pipedStream.closeAfterWrite(true);
                }
            } catch (Throwable th) {
                pipedStream.closeAfterWrite(z);
                throw th;
            }
        }, executor);
    }

    private WriterFuture writerFuture(PipedStream pipedStream, Executor executor) {
        return (streamWriter, consumer) -> {
            return writerFuture(pipedStream, executor, streamWriter, consumer);
        };
    }

    private CompletableFuture<Void> readerFuture(PipedStream pipedStream, Executor executor, StreamReader streamReader, Consumer<Exception> consumer) {
        return CompletableFuture.runAsync(() -> {
            boolean z = false;
            try {
                try {
                    log.debug("StreamReader: start reading from input stream");
                    streamReader.readFrom(pipedStream.getInputStream());
                    log.debug("StreamReader: read from input stream");
                    pipedStream.closeAfterRead(false);
                } catch (Exception e) {
                    z = true;
                    consumer.accept(e);
                    log.error("StreamReader: Error during reading from input stream", e);
                    pipedStream.closeAfterRead(true);
                }
            } catch (Throwable th) {
                pipedStream.closeAfterRead(z);
                throw th;
            }
        }, executor);
    }

    private ReaderFuture readerFuture(PipedStream pipedStream, Executor executor) {
        return (streamReader, consumer) -> {
            return readerFuture(pipedStream, executor, streamReader, consumer);
        };
    }

    private CompletableFuture<Void> joinerFuture(PipedStream pipedStream, PipedStream pipedStream2, Executor executor, StreamJoiner streamJoiner, Consumer<Exception> consumer) {
        return CompletableFuture.runAsync(() -> {
            boolean z = false;
            try {
                try {
                    log.debug("StreamJoiner: start reading from input stream and writing to output stream");
                    streamJoiner.join(pipedStream.getInputStream(), pipedStream2.getOutputStream());
                    log.debug("StreamJoiner: read from input stream and written to output stream");
                    pipedStream.closeAfterRead(false);
                    pipedStream2.closeAfterWrite(false);
                } catch (Exception e) {
                    z = true;
                    consumer.accept(e);
                    log.error("StreamJoiner: Error during joining of input stream and output stream", e);
                    pipedStream.closeAfterRead(true);
                    pipedStream2.closeAfterWrite(true);
                }
            } catch (Throwable th) {
                pipedStream.closeAfterRead(z);
                pipedStream2.closeAfterWrite(z);
                throw th;
            }
        }, executor);
    }

    private JoinerFuture joinerFuture(PipedStream pipedStream, PipedStream pipedStream2, Executor executor) {
        return (streamJoiner, consumer) -> {
            return joinerFuture(pipedStream, pipedStream2, executor, streamJoiner, consumer);
        };
    }

    private Consumer<Exception> getCombinedConsumer(List<Exception> list, Consumer<Exception> consumer) {
        return exc -> {
            list.add(exc);
            consumer.accept(exc);
        };
    }

    public static Builder builder() {
        return new Builder();
    }
}
