package ideal.sylph.plugins.hdfs;

import com.google.common.base.Preconditions;
import ideal.sylph.annotation.Description;
import ideal.sylph.annotation.Name;
import ideal.sylph.annotation.Version;
import ideal.sylph.etl.PluginConfig;
import ideal.sylph.etl.Row;
import ideal.sylph.etl.Schema;
import ideal.sylph.etl.SinkContext;
import ideal.sylph.etl.api.RealTimeSink;
import ideal.sylph.plugins.hdfs.factory.HDFSFactorys;
import ideal.sylph.plugins.hdfs.parquet.HDFSFactory;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.parquet.column.ParquetProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Name("hdfs")
@Description("this is hdfs RealTimeSink")
@Version("1.0.0")
/* loaded from: input_file:ideal/sylph/plugins/hdfs/HdfsSink.class */
public class HdfsSink implements RealTimeSink {
    private static final Logger logger = LoggerFactory.getLogger(HdfsSink.class);
    private final HdfsSinkConfig config;
    private final String sinkTable;
    private final Schema schema;
    private int eventTimeIndex;
    private HDFSFactory hdfsFactory;

    /* loaded from: input_file:ideal/sylph/plugins/hdfs/HdfsSink$HdfsSinkConfig.class */
    public static class HdfsSinkConfig extends PluginConfig {

        @Name("hdfs_write_dir")
        @Description("this is write dir")
        private String writeDir;

        @Name("eventTime_field")
        @Description("this is your data eventTime_field, 必须是13位时间戳")
        private String eventTimeName;

        @Name("format")
        @Description("this is write file type, text or parquet")
        private String format = "parquet";

        @Name("file.split.size")
        @Description("default:128MB")
        private long fileSplitSize = 128;

        @Name("batchBufferSize")
        @Description("default:5MB")
        private long batchBufferSize = 5;

        @Name("maxCloseMinute")
        @Description("default:30 Minute")
        private long maxCloseMinute = 30;

        public long getBatchBufferSize() {
            return this.batchBufferSize;
        }

        public long getFileSplitSize() {
            return this.fileSplitSize;
        }

        public String getEventTimeName() {
            return this.eventTimeName;
        }

        public String getFormat() {
            return this.format;
        }

        public String getWriteDir() {
            return this.writeDir;
        }

        public long getMaxCloseMinute() {
            return this.maxCloseMinute;
        }
    }

    public HdfsSink(HdfsSinkConfig hdfsSinkConfig, SinkContext sinkContext) {
        this.eventTimeIndex = -1;
        this.config = hdfsSinkConfig;
        this.sinkTable = sinkContext.getSinkTable();
        this.schema = sinkContext.getSchema();
        Preconditions.checkState(this.sinkTable.length() > 0, "sinkTable is " + this.sinkTable);
        int i = 0;
        while (true) {
            if (i >= this.schema.getFieldNames().size()) {
                break;
            }
            if (((String) this.schema.getFieldNames().get(i)).equalsIgnoreCase(hdfsSinkConfig.eventTimeName)) {
                this.eventTimeIndex = i;
                break;
            }
            i++;
        }
        Preconditions.checkState(this.eventTimeIndex != -1, "eventTime_field " + hdfsSinkConfig.eventTimeName + " does not exist,but only " + this.schema.getFieldNames());
        Preconditions.checkState("text".equals(hdfsSinkConfig.format.toLowerCase()) || "parquet".equals(hdfsSinkConfig.format.toLowerCase()), "Hdfs sink format only supports text and parquet");
    }

    public void process(Row row) {
        try {
            this.hdfsFactory.writeLine(((Long) row.getAs(this.eventTimeIndex)).longValue(), row);
        } catch (IOException e) {
            logger.error("", e);
        } catch (ClassCastException e2) {
            logger.error("eventTimeField {}, index [{}], but value is {}", new Object[]{this.config.eventTimeName, Integer.valueOf(this.eventTimeIndex), row.getAs(this.eventTimeIndex)});
            try {
                TimeUnit.MILLISECONDS.sleep(1L);
            } catch (InterruptedException e3) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public boolean open(long j, long j2) throws Exception {
        String lowerCase = this.config.format.toLowerCase();
        boolean z = -1;
        switch (lowerCase.hashCode()) {
            case -793011724:
                if (lowerCase.equals("parquet")) {
                    z = true;
                    break;
                }
                break;
            case 3556653:
                if (lowerCase.equals("text")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                this.hdfsFactory = HDFSFactorys.getTextFileWriter().tableName(this.sinkTable).schema(this.schema).partition(j).config(this.config).getOrCreate();
                return true;
            case true:
                this.hdfsFactory = HDFSFactorys.getParquetWriter().parquetVersion(ParquetProperties.WriterVersion.PARQUET_2_0).tableName(this.sinkTable).schema(this.schema).partition(j).config(this.config).getOrCreate();
                return true;
            default:
                throw new UnsupportedOperationException("Hdfs sink format only supports text and parquet");
        }
    }

    public void close(Throwable th) {
        try {
            this.hdfsFactory.close();
        } catch (IOException e) {
            logger.error("", e);
        }
    }
}
