package ideal.sylph.plugins.hdfs.txt;

import com.github.harbby.gadtry.base.MoreObjects;
import com.github.harbby.gadtry.base.Throwables;
import com.hadoop.compression.lzo.LzopCodec;
import ideal.sylph.etl.Row;
import ideal.sylph.etl.Schema;
import ideal.sylph.plugins.hdfs.HdfsSink;
import ideal.sylph.plugins.hdfs.factory.HDFSFactorys;
import ideal.sylph.plugins.hdfs.parquet.HDFSFactory;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ideal/sylph/plugins/hdfs/txt/TextFileFactory.class */
public class TextFileFactory implements HDFSFactory {
    private static final Logger logger = LoggerFactory.getLogger(TextFileFactory.class);
    private final Map<String, FileChannel> writerManager = new HashCache();
    private final Set<String> needClose = new HashSet();
    private final String writeTableDir;
    private final String table;
    private final long partition;
    private final int batchSize;
    private final long fileSplitSize;
    private final int maxCloseMinute;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:ideal/sylph/plugins/hdfs/txt/TextFileFactory$FileChannel.class */
    public class FileChannel {
        private final FileSystem hdfs;
        private final String filePath;
        private final OutputStream outputStream;
        private long bufferSize;
        private final long split;
        private final long createTime = System.currentTimeMillis();
        private long writeSize = 0;

        public FileChannel(String str, long j, CompressionCodec compressionCodec, Configuration configuration) throws IOException {
            Path path = new Path(str);
            this.hdfs = path.getFileSystem(configuration);
            FSDataOutputStream append = this.hdfs.exists(path) ? this.hdfs.append(path) : this.hdfs.create(path, false);
            this.filePath = str;
            this.split = j;
            this.outputStream = compressionCodec.createOutputStream(append);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void write(byte[] bArr) throws IOException {
            this.outputStream.write(bArr);
            this.bufferSize += bArr.length;
            this.writeSize += bArr.length;
            if (this.bufferSize > TextFileFactory.this.batchSize) {
                this.outputStream.flush();
                this.bufferSize = 0L;
            }
        }

        public String getFilePath() {
            return this.filePath;
        }

        public long getCreateTime() {
            return this.createTime;
        }

        public long getWriteSize() {
            return this.writeSize;
        }

        public long getSplit() {
            return this.split;
        }

        public void close() throws IOException {
            this.outputStream.close();
            this.hdfs.rename(new Path(this.filePath), new Path(this.filePath.replace("_tmp_", "text_")));
        }
    }

    /* loaded from: input_file:ideal/sylph/plugins/hdfs/txt/TextFileFactory$HashCache.class */
    private static class HashCache extends LinkedHashMap<String, FileChannel> {
        private static final int CACHE_SIZE = 1024;
        private static final int INIT_SIZE = 64;
        private static final float LOAD_FACTOR = 0.6f;
        private static final long serialVersionUID = 1;

        HashCache() {
            super(INIT_SIZE, LOAD_FACTOR);
        }

        @Override // java.util.LinkedHashMap
        protected boolean removeEldestEntry(Map.Entry<String, FileChannel> entry) {
            if (size() <= CACHE_SIZE) {
                return false;
            }
            try {
                entry.getValue().close();
                TextFileFactory.logger.info("close textFile: {}", entry.getKey());
                return true;
            } catch (IOException e) {
                throw Throwables.throwsException(e);
            }
        }
    }

    public TextFileFactory(String str, Schema schema, HdfsSink.HdfsSinkConfig hdfsSinkConfig, long j) {
        this.partition = j;
        this.writeTableDir = hdfsSinkConfig.getWriteDir().endsWith("/") ? hdfsSinkConfig.getWriteDir() : hdfsSinkConfig.getWriteDir() + "/";
        this.table = (String) Objects.requireNonNull(str, "table is null");
        this.batchSize = ((int) hdfsSinkConfig.getBatchBufferSize()) * 1024 * 1024;
        this.fileSplitSize = hdfsSinkConfig.getFileSplitSize() * 1024 * 1024 * 8;
        MoreObjects.checkState(hdfsSinkConfig.getMaxCloseMinute() >= 5, "maxCloseMinute must > 5Minute");
        this.maxCloseMinute = ((int) hdfsSinkConfig.getMaxCloseMinute()) * 60000;
        new Thread(() -> {
            Thread.currentThread().setName("TextFileFactory_TimeChecker");
            while (true) {
                try {
                    synchronized (this.needClose) {
                        long currentTimeMillis = System.currentTimeMillis();
                        this.writerManager.forEach((str2, fileChannel) -> {
                            if (currentTimeMillis - fileChannel.getCreateTime() > ((long) this.maxCloseMinute)) {
                                this.needClose.add(str2);
                            }
                        });
                    }
                    TimeUnit.SECONDS.sleep(1L);
                } catch (InterruptedException e) {
                    return;
                } catch (Exception e2) {
                    logger.error("check Thread error:", e2);
                }
            }
        }).start();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            ((Stream) this.writerManager.entrySet().stream().parallel()).forEach(entry -> {
                String str2 = (String) entry.getKey();
                try {
                    ((FileChannel) entry.getValue()).close();
                } catch (IOException e) {
                    logger.error("addShutdownHook close textFile Writer failed {}", str2, e);
                }
            });
        }));
    }

    private FileChannel getTxtFileWriter(long j) throws IOException {
        FileChannel createOutputStream;
        FileChannel createOutputStream2;
        if (!this.needClose.isEmpty()) {
            synchronized (this.needClose) {
                Iterator<String> it = this.needClose.iterator();
                while (it.hasNext()) {
                    FileChannel remove = this.writerManager.remove(it.next());
                    if (remove != null) {
                        remove.close();
                        logger.info("close >MaxFileMinute[{}] textFile: {}, size:{}, createTime {}", new Object[]{Integer.valueOf(this.maxCloseMinute), remove.getFilePath(), Long.valueOf(remove.getWriteSize()), Long.valueOf(remove.getCreateTime())});
                    }
                }
                this.needClose.clear();
            }
        }
        TextTimeParser textTimeParser = new TextTimeParser(Long.valueOf(j));
        String str = HDFSFactorys.getRowKey(this.table, textTimeParser) + "\u0001" + this.partition;
        FileChannel fileChannel = this.writerManager.get(str);
        if (fileChannel == null) {
            synchronized (this.needClose) {
                createOutputStream2 = createOutputStream(str, textTimeParser, 0L);
                this.writerManager.put(str, createOutputStream2);
            }
            return createOutputStream2;
        }
        if (fileChannel.getWriteSize() <= this.fileSplitSize) {
            return fileChannel;
        }
        synchronized (this.needClose) {
            this.writerManager.remove(str);
            fileChannel.close();
            logger.info("close >MaxSplitSize[{}] textFile: {}, size:{}, createTime {}", new Object[]{Long.valueOf(this.fileSplitSize), fileChannel.getFilePath(), Long.valueOf(fileChannel.getWriteSize()), Long.valueOf(fileChannel.getCreateTime())});
            createOutputStream = createOutputStream(str, textTimeParser, fileChannel.getSplit() + 1);
            this.writerManager.put(str, createOutputStream);
        }
        return createOutputStream;
    }

    private FileChannel createOutputStream(String str, TextTimeParser textTimeParser, long j) {
        Configuration configuration = new Configuration();
        CompressionCodec compressionCodec = (CompressionCodec) ReflectionUtils.newInstance(LzopCodec.class, configuration);
        String str2 = this.writeTableDir + textTimeParser.getPartitionPath() + "_partition_" + this.partition + "_split" + j + compressionCodec.getDefaultExtension();
        logger.info("create {} text file {}", str, str2);
        try {
            return new FileChannel(str2, j, compressionCodec, configuration);
        } catch (IOException e) {
            throw new RuntimeException("textFile " + str2 + " writer create failed", e);
        }
    }

    @Override // ideal.sylph.plugins.hdfs.parquet.HDFSFactory
    public String getWriteDir() {
        return this.writeTableDir;
    }

    @Override // ideal.sylph.plugins.hdfs.parquet.HDFSFactory
    public void writeLine(long j, Map<String, Object> map) throws IOException {
        writeLine(j, map.values());
    }

    @Override // ideal.sylph.plugins.hdfs.parquet.HDFSFactory
    public void writeLine(long j, Collection<Object> collection) throws IOException {
        StringBuilder sb = new StringBuilder();
        int i = 0;
        for (Object obj : collection) {
            if (i != 0) {
                sb.append("\u0001");
            }
            if (obj != null) {
                sb.append(obj.toString());
            }
            i++;
        }
        writeLine(j, sb.toString());
    }

    @Override // ideal.sylph.plugins.hdfs.parquet.HDFSFactory
    public void writeLine(long j, Row row) throws IOException {
        writeLine(j, row.mkString("\u0001"));
    }

    private void writeLine(long j, String str) throws IOException {
        getTxtFileWriter(j).write((str + "\n").getBytes(StandardCharsets.UTF_8));
    }

    @Override // ideal.sylph.plugins.hdfs.parquet.HDFSFactory
    public void close() throws IOException {
        this.writerManager.forEach((str, fileChannel) -> {
            try {
                fileChannel.close();
            } catch (IOException e) {
                logger.error("close {}", str, e);
            }
        });
    }
}
