package ideal.sylph.plugins.hdfs.parquet;

import ideal.sylph.etl.Row;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.collections.map.CaseInsensitiveMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroup;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ideal/sylph/plugins/hdfs/parquet/ApacheParquet.class */
public class ApacheParquet implements FileWriter {
    private final ParquetWriter<Group> writer;
    private final SimpleGroupFactory groupFactory;
    private final MessageType schema;
    private final String outputPath;
    private long createTime;
    private long lastTime;
    private Queue<String> errField;
    private static final Logger logger = LoggerFactory.getLogger(ApacheParquet.class);
    private static MessageType mapTopSchema = MessageTypeParser.parseMessageType("message row {\n  repeated group key_value {\n    required binary key (UTF8);\n    optional binary value (UTF8);\n  }\n}\n");
    private static MessageType kvSchema = MessageTypeParser.parseMessageType("message row {\n  required binary key (UTF8);\n  optional binary value (UTF8);\n}\n");

    /* loaded from: input_file:ideal/sylph/plugins/hdfs/parquet/ApacheParquet$Builder.class */
    public static class Builder {
        private ParquetProperties.WriterVersion parquetVersion = ParquetProperties.WriterVersion.PARQUET_2_0;
        private String writePath;
        private MessageType schema;

        public Builder schema(MessageType messageType) {
            this.schema = messageType;
            return this;
        }

        public Builder parquetVersion(ParquetProperties.WriterVersion writerVersion) {
            this.parquetVersion = writerVersion;
            return this;
        }

        public Builder writePath(String str) {
            this.writePath = str;
            return this;
        }

        public ApacheParquet get() throws IOException {
            return new ApacheParquet(this.writePath, this.schema, this.parquetVersion);
        }
    }

    private ApacheParquet(String str, MessageType messageType, ParquetProperties.WriterVersion writerVersion) throws IOException {
        this.createTime = System.currentTimeMillis();
        this.lastTime = this.createTime;
        this.errField = new ConcurrentLinkedQueue();
        this.schema = messageType;
        this.outputPath = str;
        Configuration configuration = new Configuration();
        GroupWriteSupport.setSchema(messageType, configuration);
        this.writer = ExampleParquetWriter.builder(new Path(str)).withType(messageType).withConf(configuration).withPageSize(1048576).withDictionaryPageSize(1048576).withDictionaryEncoding(true).withValidation(false).withWriterVersion(writerVersion).withRowGroupSize(134217728).withCompressionCodec(CompressionCodecName.UNCOMPRESSED).build();
        this.groupFactory = new SimpleGroupFactory(this.schema);
    }

    @Override // ideal.sylph.plugins.hdfs.parquet.FileWriter
    public long getCooldownTime() {
        return System.currentTimeMillis() - this.lastTime;
    }

    @Override // ideal.sylph.plugins.hdfs.parquet.FileWriter
    public long getCreatedTime() {
        return this.createTime;
    }

    @Override // ideal.sylph.plugins.hdfs.parquet.FileWriter
    public String getWritePath() {
        return this.outputPath;
    }

    @Override // ideal.sylph.plugins.hdfs.parquet.FileWriter
    public long getDataSize() {
        return this.writer.getDataSize();
    }

    @Override // ideal.sylph.plugins.hdfs.parquet.FileWriter
    public void writeLine(Collection<Object> collection) {
        Group newGroup = this.groupFactory.newGroup();
        List columns = this.schema.getColumns();
        int i = 0;
        Iterator<Object> it = collection.iterator();
        while (it.hasNext()) {
            addValueToGroup(((ColumnDescriptor) columns.get(i)).getType().javaType, newGroup, i, it.next());
            i++;
        }
        try {
            writeGroup(newGroup);
        } catch (IOException e) {
            logger.error("", e);
        }
    }

    @Override // ideal.sylph.plugins.hdfs.parquet.FileWriter
    public void writeLine(Row row) {
        Group newGroup = this.groupFactory.newGroup();
        List columns = this.schema.getColumns();
        for (int i = 0; i < row.size(); i++) {
            addValueToGroup(((ColumnDescriptor) columns.get(i)).getType().javaType, newGroup, i, row.getAs(i));
        }
        try {
            writeGroup(newGroup);
        } catch (IOException e) {
            logger.error("", e);
        }
    }

    @Override // ideal.sylph.plugins.hdfs.parquet.FileWriter
    public void writeLine(Map<String, Object> map) {
        CaseInsensitiveMap caseInsensitiveMap = new CaseInsensitiveMap(map);
        Group newGroup = this.groupFactory.newGroup();
        int i = 0;
        for (Type type : this.schema.getFields()) {
            OriginalType originalType = type.getOriginalType();
            Class<Map> cls = (originalType == null || !originalType.name().equals("MAP")) ? type.asPrimitiveType().getPrimitiveTypeName().javaType : Map.class;
            Object obj = caseInsensitiveMap.get(type.getName());
            try {
                int i2 = i;
                i++;
                addValueToGroup(cls, newGroup, i2, obj);
            } catch (Exception e) {
                if (!this.errField.contains(type.getName())) {
                    this.errField.offer(type.getName());
                    logger.warn("错误字段:{}:{} 原因:{} file={}", new Object[]{type.getName(), obj, e.getMessage(), this.outputPath});
                }
            }
        }
        try {
            writeGroup(newGroup);
        } catch (Exception e2) {
            logger.warn("错误行:{} err:", map, e2);
        }
    }

    private void writeGroup(Group group) throws IOException {
        if (group == null) {
            return;
        }
        this.lastTime = System.currentTimeMillis();
        this.writer.write(group);
    }

    @Override // ideal.sylph.plugins.hdfs.parquet.FileWriter
    public void close() throws IOException {
        try {
            this.writer.close();
            FileSystem.get(URI.create(this.outputPath), new Configuration()).rename(new Path(this.outputPath), new Path(this.outputPath.replace("_tmp_", "file_") + ".parquet"));
        } catch (IOException e) {
            logger.error("关闭Parquet输出流异常", e);
            FileSystem.get(URI.create(this.outputPath), new Configuration()).rename(new Path(this.outputPath), new Path(this.outputPath + ".err"));
        }
    }

    private void addValueToGroup(Class<?> cls, Group group, int i, Object obj) {
        if (obj == null || "".equals(obj)) {
            return;
        }
        if (cls == Binary.class) {
            group.add(i, obj.toString());
            return;
        }
        if (cls == Byte.TYPE) {
            group.add(i, Byte.valueOf(obj.toString()).byteValue());
            return;
        }
        if (cls == Short.TYPE) {
            group.add(i, Short.valueOf(obj.toString()).shortValue());
            return;
        }
        if (cls == Integer.TYPE) {
            group.add(i, Integer.valueOf(obj.toString()).intValue());
            return;
        }
        if (cls == Long.TYPE) {
            group.add(i, Long.parseLong(obj.toString()));
            return;
        }
        if (cls == Double.TYPE) {
            group.add(i, Double.valueOf(obj.toString()).doubleValue());
            return;
        }
        if (cls == Float.TYPE) {
            group.add(i, Float.valueOf(obj.toString()).floatValue());
            return;
        }
        if (cls != Map.class) {
            group.add(i, obj.toString());
            return;
        }
        int i2 = 0;
        SimpleGroup simpleGroup = new SimpleGroup(mapTopSchema);
        for (Map.Entry entry : ((Map) obj).entrySet()) {
            SimpleGroup simpleGroup2 = new SimpleGroup(kvSchema);
            String str = (String) entry.getKey();
            Object value = entry.getValue();
            if (value != null) {
                simpleGroup2.add("key", str);
                int length = i2 + str.length();
                simpleGroup2.add("value", value.toString());
                i2 = length + value.toString().length();
                simpleGroup.add("key_value", simpleGroup2);
            }
        }
        group.add(i, simpleGroup);
    }

    public static Builder create() {
        return new Builder();
    }
}
