package org.apache.beam.sdk.extensions.smb;

import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.NoSuchElementException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.smb.AvroFileOperations;
import org.apache.beam.sdk.extensions.smb.FileOperations;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

/* loaded from: input_file:org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperations.class */
public class ParquetAvroFileOperations<ValueT> extends FileOperations<ValueT> {
    static final CompressionCodecName DEFAULT_COMPRESSION = CompressionCodecName.GZIP;
    static final Configuration DEFAULT_CONFIGURATION = new Configuration();
    private final AvroFileOperations.SerializableSchemaSupplier schemaSupplier;
    private final CompressionCodecName compression;
    private final SerializableConfiguration conf;
    private final FilterPredicate predicate;

    /* loaded from: input_file:org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperations$ParquetAvroReader.class */
    private static class ParquetAvroReader<ValueT> extends FileOperations.Reader<ValueT> {
        private final AvroFileOperations.SerializableSchemaSupplier schemaSupplier;
        private final SerializableConfiguration conf;
        private final FilterPredicate predicate;
        private transient ParquetReader<ValueT> reader;
        private transient ValueT current;

        private ParquetAvroReader(AvroFileOperations.SerializableSchemaSupplier serializableSchemaSupplier, SerializableConfiguration serializableConfiguration, FilterPredicate filterPredicate) {
            this.schemaSupplier = serializableSchemaSupplier;
            this.conf = serializableConfiguration;
            this.predicate = filterPredicate;
        }

        @Override // org.apache.beam.sdk.extensions.smb.FileOperations.Reader
        public void prepareRead(ReadableByteChannel readableByteChannel) throws IOException {
            Schema m2get = this.schemaSupplier.m2get();
            Configuration configuration = this.conf.get();
            AvroReadSupport.setAvroReadSchema(configuration, m2get);
            AvroReadSupport.setRequestedProjection(configuration, m2get);
            ParquetReader.Builder withConf = AvroParquetReader.builder(new ParquetInputFile(readableByteChannel)).withConf(configuration);
            if (this.predicate != null) {
                withConf = withConf.withFilter(FilterCompat.get(this.predicate));
            }
            this.reader = withConf.build();
            this.current = (ValueT) this.reader.read();
        }

        @Override // org.apache.beam.sdk.extensions.smb.FileOperations.Reader
        public ValueT readNext() throws IOException, NoSuchElementException {
            ValueT valuet = this.current;
            this.current = (ValueT) this.reader.read();
            return valuet;
        }

        @Override // org.apache.beam.sdk.extensions.smb.FileOperations.Reader
        public boolean hasNextElement() throws IOException {
            return this.current != null;
        }

        @Override // org.apache.beam.sdk.extensions.smb.FileOperations.Reader
        public void finishRead() throws IOException {
            this.reader.close();
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperations$ParquetAvroSink.class */
    private static class ParquetAvroSink<ValueT> implements FileIO.Sink<ValueT> {
        private final AvroFileOperations.SerializableSchemaSupplier schemaSupplier;
        private final CompressionCodecName compression;
        private final SerializableConfiguration conf;
        private transient ParquetWriter<ValueT> writer;

        private ParquetAvroSink(AvroFileOperations.SerializableSchemaSupplier serializableSchemaSupplier, CompressionCodecName compressionCodecName, SerializableConfiguration serializableConfiguration) {
            this.schemaSupplier = serializableSchemaSupplier;
            this.compression = compressionCodecName;
            this.conf = serializableConfiguration;
        }

        public void open(WritableByteChannel writableByteChannel) throws IOException {
            this.writer = AvroParquetWriter.builder(new ParquetOutputFile(writableByteChannel)).withSchema(this.schemaSupplier.m2get()).withCompressionCodec(this.compression).withConf(this.conf.get()).withRowGroupSize(this.conf.get().getInt("parquet.block.size", 134217728)).build();
        }

        public void write(ValueT valuet) throws IOException {
            this.writer.write(valuet);
        }

        public void flush() throws IOException {
            this.writer.close();
        }
    }

    private ParquetAvroFileOperations(Schema schema, CompressionCodecName compressionCodecName, Configuration configuration, FilterPredicate filterPredicate) {
        super(Compression.UNCOMPRESSED, "application/octet-stream");
        this.schemaSupplier = new AvroFileOperations.SerializableSchemaSupplier(schema);
        this.compression = compressionCodecName;
        this.conf = new SerializableConfiguration(configuration);
        this.predicate = filterPredicate;
    }

    public static <V extends GenericRecord> ParquetAvroFileOperations<V> of(Schema schema) {
        return of(schema, DEFAULT_COMPRESSION);
    }

    public static <V extends GenericRecord> ParquetAvroFileOperations<V> of(Schema schema, CompressionCodecName compressionCodecName) {
        return of(schema, compressionCodecName, DEFAULT_CONFIGURATION);
    }

    public static <V extends GenericRecord> ParquetAvroFileOperations<V> of(Schema schema, CompressionCodecName compressionCodecName, Configuration configuration) {
        return new ParquetAvroFileOperations<>(schema, compressionCodecName, configuration, null);
    }

    public static <V extends GenericRecord> ParquetAvroFileOperations<V> of(Schema schema, FilterPredicate filterPredicate) {
        return of(schema, filterPredicate, DEFAULT_CONFIGURATION);
    }

    public static <V extends GenericRecord> ParquetAvroFileOperations<V> of(Schema schema, FilterPredicate filterPredicate, Configuration configuration) {
        return new ParquetAvroFileOperations<>(schema, DEFAULT_COMPRESSION, configuration, filterPredicate);
    }

    @Override // org.apache.beam.sdk.extensions.smb.FileOperations
    public void populateDisplayData(DisplayData.Builder builder) {
        super.populateDisplayData(builder);
        builder.add(DisplayData.item("compressionCodecName", this.compression.name()));
        builder.add(DisplayData.item("schema", this.schemaSupplier.m2get().getFullName()));
    }

    @Override // org.apache.beam.sdk.extensions.smb.FileOperations
    protected FileOperations.Reader<ValueT> createReader() {
        return new ParquetAvroReader(this.schemaSupplier, this.conf, this.predicate);
    }

    @Override // org.apache.beam.sdk.extensions.smb.FileOperations
    protected FileIO.Sink<ValueT> createSink() {
        return new ParquetAvroSink(this.schemaSupplier, this.compression, this.conf);
    }

    @Override // org.apache.beam.sdk.extensions.smb.FileOperations
    public Coder<ValueT> getCoder() {
        return AvroCoder.of(getSchema());
    }

    Schema getSchema() {
        return this.schemaSupplier.m2get();
    }
}
