package ideal.sylph.plugins.hdfs.parquet;

import com.google.common.collect.ImmutableList;
import ideal.sylph.etl.Row;
import ideal.sylph.plugins.hdfs.factory.HDFSFactorys;
import ideal.sylph.plugins.hdfs.factory.TimeParser;
import ideal.sylph.plugins.hdfs.utils.CommonUtil;
import ideal.sylph.plugins.hdfs.utils.MemoryUtil;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.schema.MessageType;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ideal/sylph/plugins/hdfs/parquet/ParquetFactory.class */
public class ParquetFactory implements HDFSFactory {
    private static final short TIME_Granularity = 5;
    private final String writeTableDir;
    private final String table;
    private final MessageType schema;
    private final ParquetProperties.WriterVersion parquetVersion;
    private static final Logger logger = LoggerFactory.getLogger(ParquetFactory.class);
    private static final List<CheckHandler> filterDefaultFuncs = ImmutableList.builder().add((str, str2, fileWriter) -> {
        return str.equals(str2) && new TimeParser(Long.valueOf(fileWriter.getCreatedTime())).getWriterKey().equals(str2);
    }).add((str3, str4, fileWriter2) -> {
        return str3.compareTo(str4) <= 0 && (fileWriter2.getCooldownTime() / 1000) / 60 >= 2;
    }).add((str5, str6, fileWriter3) -> {
        return ((System.currentTimeMillis() - fileWriter3.getCreatedTime()) / 1000) / 60 >= 6;
    }).build();
    private final BlockingQueue<Runnable> streamData = new LinkedBlockingQueue(1000);
    private final BlockingQueue<Runnable> monitorEvent = new ArrayBlockingQueue(1000);
    private final ExecutorService executorPool = Executors.newFixedThreadPool(300);
    private final Map<String, ApacheParquet> parquetManager = new HashMap();
    private final String hostName = CommonUtil.getHostNameOrPid();
    private volatile boolean closed = false;
    private final Runnable shutdownHook = () -> {
        this.closed = true;
        synchronized (this.parquetManager) {
            ((Stream) this.parquetManager.entrySet().stream().parallel()).forEach(entry -> {
                String str = (String) entry.getKey();
                try {
                    ((ApacheParquet) entry.getValue()).close();
                } catch (IOException e) {
                    logger.error("addShutdownHook close textFile Writer failed {}", str, e);
                }
            });
        }
    };
    public Runnable closeHalf = () -> {
        int size = this.parquetManager.size() / 3;
        AtomicInteger atomicInteger = new AtomicInteger(0);
        ((Stream) this.parquetManager.entrySet().stream().sorted((entry, entry2) -> {
            return (int) (((ApacheParquet) entry.getValue()).getDataSize() - ((ApacheParquet) entry2.getValue()).getDataSize());
        }).parallel()).forEach(entry3 -> {
            String str = (String) entry3.getKey();
            if (atomicInteger.getAndIncrement() < size) {
                this.parquetManager.remove(str);
                try {
                    ((ApacheParquet) entry3.getValue()).close();
                } catch (IOException e) {
                    logger.info("parquet关闭失败 path:{}", ((ApacheParquet) entry3.getValue()).getWritePath(), e);
                }
            }
        });
    };
    private final Callable<Void> monitor = () -> {
        Thread.currentThread().setName("Parquet_Factory_Monitor");
        while (!this.closed) {
            try {
                TimeUnit.SECONDS.sleep(5L);
                checkflushRule();
                if (MemoryUtil.checkMemory()) {
                    this.monitorEvent.put(this.closeHalf);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        return null;
    };

    /* loaded from: input_file:ideal/sylph/plugins/hdfs/parquet/ParquetFactory$CheckHandler.class */
    public interface CheckHandler {
        boolean apply(String str, String str2, FileWriter fileWriter);
    }

    public ParquetFactory(String str, String str2, ParquetProperties.WriterVersion writerVersion, MessageType messageType) {
        Objects.requireNonNull(str, "writeTableDir is null");
        this.writeTableDir = str.endsWith("/") ? str : str + "/";
        this.table = (String) Objects.requireNonNull(str2, "table is null");
        this.schema = (MessageType) Objects.requireNonNull(messageType, "schema is null");
        this.parquetVersion = (ParquetProperties.WriterVersion) Objects.requireNonNull(writerVersion, "parquetVersion is null");
        this.executorPool.submit(() -> {
            Thread.currentThread().setName("Parquet_Factory_Consumer");
            while (!this.closed) {
                try {
                    Runnable poll = this.streamData.poll();
                    if (poll != null) {
                        poll.run();
                    }
                    Runnable poll2 = this.monitorEvent.poll();
                    if (poll2 != null) {
                        poll2.run();
                    }
                    if (poll == null && poll2 == null) {
                        TimeUnit.MILLISECONDS.sleep(1L);
                    }
                } catch (Exception e) {
                    logger.error("Parquet_Factory_Consumer error", e);
                    System.exit(-1);
                    return null;
                }
            }
            return null;
        });
        this.executorPool.submit(this.monitor);
        Runtime.getRuntime().addShutdownHook(new Thread(this.shutdownHook));
    }

    private void checkflushRule() {
        String str = this.table + HDFSFactorys.getRowKey(this.table, new TimeParser(new DateTime().minusMinutes(6)));
        List list = (List) this.parquetManager.entrySet().stream().filter(entry -> {
            String str2 = (String) entry.getKey();
            ApacheParquet apacheParquet = (ApacheParquet) entry.getValue();
            return ((Boolean) filterDefaultFuncs.stream().map(checkHandler -> {
                return Boolean.valueOf(checkHandler.apply(str2, str, apacheParquet));
            }).reduce((bool, bool2) -> {
                return Boolean.valueOf(bool.booleanValue() || bool2.booleanValue());
            }).orElse(false)).booleanValue();
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            return;
        }
        this.monitorEvent.offer(() -> {
            list.forEach(entry2 -> {
                this.parquetManager.remove(entry2.getKey());
                ApacheParquet apacheParquet = (ApacheParquet) entry2.getValue();
                this.executorPool.submit(() -> {
                    logger.info("正在关闭流个数:" + ((ThreadPoolExecutor) this.executorPool).getActiveCount() + " 添加关闭流:" + apacheParquet.getWritePath());
                    try {
                        apacheParquet.close();
                    } catch (IOException e) {
                        throw new RuntimeException("流关闭出错:", e);
                    }
                });
            });
        });
        logger.info(MemoryUtil.getMemoryInfo(this.hostName));
    }

    @Override // ideal.sylph.plugins.hdfs.parquet.HDFSFactory
    public void writeLine(long j, Map<String, Object> map) {
        try {
            this.streamData.put(() -> {
                getParquetWriter(j).writeLine((Map<String, Object>) map);
            });
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Override // ideal.sylph.plugins.hdfs.parquet.HDFSFactory
    public void writeLine(long j, Collection<Object> collection) {
        try {
            this.streamData.put(() -> {
                getParquetWriter(j).writeLine((Collection<Object>) collection);
            });
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Override // ideal.sylph.plugins.hdfs.parquet.HDFSFactory
    public void writeLine(long j, Row row) {
        try {
            this.streamData.put(() -> {
                getParquetWriter(j).writeLine(row);
            });
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Override // ideal.sylph.plugins.hdfs.parquet.HDFSFactory
    public void close() {
        this.closed = true;
        this.shutdownHook.run();
    }

    @Override // ideal.sylph.plugins.hdfs.parquet.HDFSFactory
    public String getWriteDir() {
        return this.writeTableDir;
    }

    private ApacheParquet getParquetWriter(String str, Supplier<ApacheParquet> supplier) {
        ApacheParquet computeIfAbsent;
        ApacheParquet apacheParquet = this.parquetManager.get(str);
        if (apacheParquet != null) {
            return apacheParquet;
        }
        synchronized (this.parquetManager) {
            computeIfAbsent = this.parquetManager.computeIfAbsent(str, str2 -> {
                return (ApacheParquet) supplier.get();
            });
        }
        return computeIfAbsent;
    }

    private ApacheParquet getParquetWriter(long j) {
        TimeParser timeParser = new TimeParser(Long.valueOf(j));
        String str = this.writeTableDir + timeParser.getPartitionPath();
        return getParquetWriter(HDFSFactorys.getRowKey(this.table, timeParser), () -> {
            try {
                return ApacheParquet.create().parquetVersion(this.parquetVersion).schema(this.schema).writePath(str).get();
            } catch (IOException e) {
                throw new RuntimeException("parquet writer create failed", e);
            }
        });
    }
}
