package io.streamthoughts.kafka.connect.filepulse.reader;

import io.streamthoughts.kafka.connect.filepulse.annotation.VisibleForTesting;
import io.streamthoughts.kafka.connect.filepulse.data.Type;
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.data.TypedValue;
import io.streamthoughts.kafka.connect.filepulse.internal.Silent;
import io.streamthoughts.kafka.connect.filepulse.source.FileContext;
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
import io.streamthoughts.kafka.connect.filepulse.source.SourceOffset;
import io.streamthoughts.kafka.connect.filepulse.source.TypedFileRecord;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiFunction;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.utils.Time;

/* loaded from: input_file:io/streamthoughts/kafka/connect/filepulse/reader/AvroFileInputReader.class */
public class AvroFileInputReader extends AbstractFileInputReader {

    /* loaded from: input_file:io/streamthoughts/kafka/connect/filepulse/reader/AvroFileInputReader$AvroFileIterator.class */
    public static class AvroFileIterator extends AbstractFileInputIterator<TypedStruct> {
        private long recordsReadSinceLastSync;
        private long lastSync;
        private final GenericDatumReader reader;
        private DataFileReader<GenericRecord> dataFileReader;

        AvroFileIterator(IteratorManager iteratorManager, FileContext fileContext) {
            super(iteratorManager, fileContext);
            this.recordsReadSinceLastSync = 0L;
            this.lastSync = -1L;
            this.reader = new GenericDatumReader();
            this.dataFileReader = (DataFileReader) Silent.unchecked(() -> {
                return new DataFileReader(fileContext.file(), this.reader);
            }, (v1) -> {
                return new ReaderException(v1);
            });
        }

        public void seekTo(SourceOffset sourceOffset) {
            Objects.requireNonNull(sourceOffset, "offset can't be null");
            if (sourceOffset.position() != -1) {
                Silent.unchecked(() -> {
                    this.dataFileReader.seek(sourceOffset.position());
                }, (v1) -> {
                    return new ReaderException(v1);
                });
                this.recordsReadSinceLastSync = 0L;
                this.lastSync = this.dataFileReader.previousSync();
                skipRecordsUntil(sourceOffset.rows());
            }
        }

        private void skipRecordsUntil(long j) {
            while (this.recordsReadSinceLastSync < j) {
                nextRecord();
            }
        }

        private void updateContext() {
            this.context = this.context.withOffset(new SourceOffset(this.lastSync, this.recordsReadSinceLastSync, Time.SYSTEM.milliseconds()));
        }

        /* renamed from: next, reason: merged with bridge method [inline-methods] */
        public RecordsIterable<FileRecord<TypedStruct>> m5next() {
            try {
                RecordsIterable<FileRecord<TypedStruct>> of = RecordsIterable.of(new FileRecord[]{new TypedFileRecord(new AvroRecordOffset(this.lastSync, position(), this.recordsReadSinceLastSync), TypedStructConverter.fromGenericRecord(nextRecord()))});
                updateContext();
                return of;
            } catch (Throwable th) {
                updateContext();
                throw th;
            }
        }

        private GenericRecord nextRecord() {
            if (this.dataFileReader.previousSync() != this.lastSync) {
                this.lastSync = this.dataFileReader.previousSync();
                this.recordsReadSinceLastSync = 0L;
            }
            GenericRecord genericRecord = (GenericRecord) this.dataFileReader.next();
            this.recordsReadSinceLastSync++;
            return genericRecord;
        }

        private long position() {
            return ((Long) Silent.unchecked(() -> {
                return Long.valueOf(this.dataFileReader.tell());
            }, (v1) -> {
                return new ReaderException(v1);
            })).longValue();
        }

        public boolean hasNext() {
            return this.dataFileReader.hasNext();
        }

        @Override // io.streamthoughts.kafka.connect.filepulse.reader.AbstractFileInputIterator
        public void close() {
            if (isClose()) {
                return;
            }
            Silent.unchecked(() -> {
                this.dataFileReader.close();
            }, (v1) -> {
                return new ReaderException(v1);
            });
            super.close();
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:io/streamthoughts/kafka/connect/filepulse/reader/AvroFileInputReader$TypedStructConverter.class */
    public static class TypedStructConverter {
        private static final Map<Schema.Type, BiFunction<Schema, Object, TypedValue>> AVRO_TYPES_TO_CONVERTER = new HashMap();

        static TypedStruct fromGenericRecord(GenericRecord genericRecord) {
            TypedStruct create = TypedStruct.create();
            for (Schema.Field field : genericRecord.getSchema().getFields()) {
                String name = field.name();
                create = create.put(name, fromSchemaAndValue(field.schema(), genericRecord.get(name)));
            }
            return create;
        }

        private static TypedValue fromSchemaAndValue(Schema schema, Object obj) {
            Schema.Type type = schema.getType();
            BiFunction<Schema, Object, TypedValue> biFunction = AVRO_TYPES_TO_CONVERTER.get(type);
            if (biFunction == null) {
                throw new ReaderException("Unsupported avro type : " + type);
            }
            return biFunction.apply(schema, obj);
        }

        private static TypedValue convertEnum(Schema schema, Object obj) {
            return TypedValue.string(obj != null ? ((Enum) obj).name() : null);
        }

        private static TypedValue convertUnion(Schema schema, Object obj) {
            return fromSchemaAndValue((Schema) schema.getTypes().stream().filter(schema2 -> {
                return schema2.getType() != Schema.Type.NULL;
            }).findFirst().get(), obj);
        }

        private static TypedValue convertString(Schema schema, Object obj) {
            return TypedValue.string(obj != null ? obj.toString() : null);
        }

        private static TypedValue convertBytes(Schema schema, Object obj) {
            return obj != null ? TypedValue.any(obj).as(Type.BYTES) : TypedValue.of((Object) null, Type.BYTES);
        }

        private static TypedValue convertMap(Schema schema, Object obj) {
            Schema valueType = schema.getValueType();
            Type type = null;
            HashMap hashMap = new HashMap();
            for (Map.Entry entry : ((Map) obj).entrySet()) {
                TypedValue fromSchemaAndValue = fromSchemaAndValue(valueType, entry.getValue());
                hashMap.put(entry.getKey().toString(), fromSchemaAndValue.value());
                type = fromSchemaAndValue.type();
            }
            return type != null ? TypedValue.map(hashMap, type) : TypedValue.of(hashMap, Type.MAP);
        }

        private static TypedValue convertCollection(Schema schema, Object obj) {
            Collection collection = (Collection) obj;
            Schema elementType = schema.getElementType();
            Type type = null;
            ArrayList arrayList = new ArrayList(collection.size());
            Iterator it = collection.iterator();
            while (it.hasNext()) {
                TypedValue fromSchemaAndValue = fromSchemaAndValue(elementType, it.next());
                arrayList.add(fromSchemaAndValue.value());
                type = fromSchemaAndValue.type();
            }
            return type != null ? TypedValue.array(arrayList, type) : TypedValue.of(arrayList, Type.ARRAY);
        }

        static {
            AVRO_TYPES_TO_CONVERTER.put(Schema.Type.BYTES, TypedStructConverter::convertBytes);
            AVRO_TYPES_TO_CONVERTER.put(Schema.Type.ENUM, TypedStructConverter::convertEnum);
            AVRO_TYPES_TO_CONVERTER.put(Schema.Type.STRING, TypedStructConverter::convertString);
            AVRO_TYPES_TO_CONVERTER.put(Schema.Type.UNION, TypedStructConverter::convertUnion);
            AVRO_TYPES_TO_CONVERTER.put(Schema.Type.ARRAY, TypedStructConverter::convertCollection);
            AVRO_TYPES_TO_CONVERTER.put(Schema.Type.MAP, TypedStructConverter::convertMap);
            AVRO_TYPES_TO_CONVERTER.put(Schema.Type.BOOLEAN, (schema, obj) -> {
                return TypedValue.bool((Boolean) obj);
            });
            AVRO_TYPES_TO_CONVERTER.put(Schema.Type.INT, (schema2, obj2) -> {
                return TypedValue.int32((Integer) obj2);
            });
            AVRO_TYPES_TO_CONVERTER.put(Schema.Type.LONG, (schema3, obj3) -> {
                return TypedValue.int64((Long) obj3);
            });
            AVRO_TYPES_TO_CONVERTER.put(Schema.Type.FLOAT, (schema4, obj4) -> {
                return TypedValue.float32((Float) obj4);
            });
            AVRO_TYPES_TO_CONVERTER.put(Schema.Type.DOUBLE, (schema5, obj5) -> {
                return TypedValue.float64((Double) obj5);
            });
            AVRO_TYPES_TO_CONVERTER.put(Schema.Type.RECORD, (schema6, obj6) -> {
                return TypedValue.struct(fromGenericRecord((GenericRecord) obj6));
            });
        }
    }

    @Override // io.streamthoughts.kafka.connect.filepulse.reader.AbstractFileInputReader
    protected FileInputIterator<FileRecord<TypedStruct>> newIterator(FileContext fileContext, IteratorManager iteratorManager) {
        return new AvroFileIterator(iteratorManager, fileContext);
    }
}
