package org.apache.wayang.api.sql.sources.fs;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.commons.io.IOUtils;
import org.apache.wayang.basic.channels.FileChannel;
import org.apache.wayang.basic.data.Record;
import org.apache.wayang.core.api.exception.WayangException;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
import org.apache.wayang.core.plan.wayangplan.UnarySource;
import org.apache.wayang.core.platform.ChannelDescriptor;
import org.apache.wayang.core.platform.ChannelInstance;
import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
import org.apache.wayang.core.types.DataSetType;
import org.apache.wayang.core.util.Tuple;
import org.apache.wayang.core.util.fs.FileSystem;
import org.apache.wayang.core.util.fs.FileSystems;
import org.apache.wayang.core.util.fs.FileUtils;
import org.apache.wayang.java.channels.StreamChannel;
import org.apache.wayang.java.execution.JavaExecutor;
import org.apache.wayang.java.operators.JavaExecutionOperator;

/* loaded from: input_file:org/apache/wayang/api/sql/sources/fs/JavaCSVTableSource.class */
public class JavaCSVTableSource<T> extends UnarySource<T> implements JavaExecutionOperator {
    private final String sourcePath;
    private final List<RelDataType> fieldTypes;
    private char separator;
    static final /* synthetic */ boolean $assertionsDisabled;

    public JavaCSVTableSource(String str, DataSetType dataSetType, List<RelDataType> list) {
        super(dataSetType);
        this.separator = ';';
        this.sourcePath = str;
        this.fieldTypes = list;
    }

    public JavaCSVTableSource(String str, DataSetType dataSetType, List<RelDataType> list, char c) {
        super(dataSetType);
        this.separator = ';';
        this.sourcePath = str;
        this.fieldTypes = list;
        this.separator = c;
    }

    public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate(ChannelInstance[] channelInstanceArr, ChannelInstance[] channelInstanceArr2, JavaExecutor javaExecutor, OptimizationContext.OperatorContext operatorContext) {
        String str;
        if (!$assertionsDisabled && channelInstanceArr2.length != getNumOutputs()) {
            throw new AssertionError();
        }
        if (this.sourcePath == null) {
            str = ((FileChannel.Instance) channelInstanceArr[0]).getSinglePath();
        } else {
            if (!$assertionsDisabled && channelInstanceArr.length != 0) {
                throw new AssertionError();
            }
            str = this.sourcePath;
        }
        ((StreamChannel.Instance) channelInstanceArr2[0]).accept(createStream(FileSystems.findActualSingleInputPath(str)));
        return ExecutionOperator.modelLazyExecution(channelInstanceArr, channelInstanceArr2, operatorContext);
    }

    private Stream<T> createStream(String str) {
        return (Stream<T>) streamLines(str).map(this::parseLine);
    }

    private T parseLine(String str) {
        Class typeClass = getType().getDataUnitType().getTypeClass();
        if (!$assertionsDisabled && typeClass != Record.class) {
            throw new AssertionError();
        }
        try {
            String[] parseLine = CsvRowConverter.parseLine(str, this.separator);
            if (parseLine.length != this.fieldTypes.size()) {
                throw new IllegalStateException(String.format("Error while parsing CSV file %s at line %s", this.sourcePath, str));
            }
            Object[] objArr = new Object[parseLine.length];
            for (int i = 0; i < parseLine.length; i++) {
                objArr[i] = CsvRowConverter.convert(this.fieldTypes.get(i), parseLine[i]);
            }
            return (T) new Record(objArr);
        } catch (IOException e) {
            System.out.println(e.getMessage());
            throw new IllegalStateException(String.format("Error while parsing CSV file %s at line %s", this.sourcePath, str));
        }
    }

    public List<ChannelDescriptor> getSupportedInputChannels(int i) {
        return Collections.singletonList(FileChannel.HDFS_TSV_DESCRIPTOR);
    }

    public List<ChannelDescriptor> getSupportedOutputChannels(int i) {
        if ($assertionsDisabled || i <= getNumOutputs() || (i == 0 && getNumOutputs() == 0)) {
            return Collections.singletonList(StreamChannel.DESCRIPTOR);
        }
        throw new AssertionError();
    }

    private static Stream<String> streamLines(String str) {
        try {
            Iterator<String> createLineIterator = createLineIterator((FileSystem) FileSystems.getFileSystem(str).orElseThrow(() -> {
                return new IllegalStateException(String.format("No file system found for %s", str));
            }), str);
            createLineIterator.next();
            return StreamSupport.stream(Spliterators.spliteratorUnknownSize(createLineIterator, 0), false);
        } catch (IOException e) {
            throw new WayangException(String.format("%s failed to read %s.", FileUtils.class, str), e);
        }
    }

    private static Iterator<String> createLineIterator(FileSystem fileSystem, String str) throws IOException {
        final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(fileSystem.open(str), "UTF-8"));
        return new Iterator<String>() { // from class: org.apache.wayang.api.sql.sources.fs.JavaCSVTableSource.1
            String next;
            static final /* synthetic */ boolean $assertionsDisabled;

            {
                advance();
            }

            private void advance() {
                try {
                    try {
                        this.next = bufferedReader.readLine();
                        if (this.next == null) {
                            IOUtils.closeQuietly(bufferedReader);
                        }
                    } catch (IOException e) {
                        this.next = null;
                        throw new UncheckedIOException(e);
                    }
                } catch (Throwable th) {
                    if (this.next == null) {
                        IOUtils.closeQuietly(bufferedReader);
                    }
                    throw th;
                }
            }

            @Override // java.util.Iterator
            public boolean hasNext() {
                return this.next != null;
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.Iterator
            public String next() {
                if (!$assertionsDisabled && !hasNext()) {
                    throw new AssertionError();
                }
                String str2 = this.next;
                advance();
                return str2;
            }

            static {
                $assertionsDisabled = !JavaCSVTableSource.class.desiredAssertionStatus();
            }
        };
    }

    static {
        $assertionsDisabled = !JavaCSVTableSource.class.desiredAssertionStatus();
    }
}
