package org.apache.gravitino.iceberg.service.metrics;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
import org.apache.gravitino.iceberg.service.IcebergRestUtils;
import org.apache.iceberg.metrics.MetricsReport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gravitino/iceberg/service/metrics/IcebergMetricsManager.class */
public class IcebergMetricsManager {
    private static final Logger LOG = LoggerFactory.getLogger(IcebergCatalogWrapper.class);
    private static final ImmutableMap<String, String> ICEBERG_METRICS_STORE_NAMES = ImmutableMap.of(DummyMetricsStore.ICEBERG_METRICS_STORE_DUMMY_NAME, DummyMetricsStore.class.getCanonicalName());
    private final IcebergMetricsStore icebergMetricsStore;
    private final int retainDays;
    private BlockingQueue<MetricsReport> queue;
    private Thread metricsWriterThread;
    private Optional<ScheduledExecutorService> metricsCleanerExecutor;
    private volatile boolean isClosed = false;
    private final IcebergMetricsFormatter icebergMetricsFormatter = new IcebergMetricsFormatter();

    public IcebergMetricsManager(IcebergConfig icebergConfig) {
        this.metricsCleanerExecutor = Optional.empty();
        this.icebergMetricsStore = loadIcebergMetricsStore((String) icebergConfig.get(IcebergConfig.ICEBERG_METRICS_STORE));
        try {
            this.icebergMetricsStore.init(icebergConfig.getAllConfig());
            this.retainDays = ((Integer) icebergConfig.get(IcebergConfig.ICEBERG_METRICS_STORE_RETAIN_DAYS)).intValue();
            if (this.retainDays > 0) {
                this.metricsCleanerExecutor = Optional.of(new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Iceberg-metrics-cleaner").setUncaughtExceptionHandler((thread, th) -> {
                    LOG.error("Uncaught exception in thread {}.", thread, th);
                }).build()));
            }
            this.queue = new LinkedBlockingQueue(((Integer) icebergConfig.get(IcebergConfig.ICEBERG_METRICS_QUEUE_CAPACITY)).intValue());
            this.metricsWriterThread = new Thread(() -> {
                writeMetrics();
            });
            this.metricsWriterThread.setName("Iceberg-metrics-writer");
            this.metricsWriterThread.setDaemon(true);
        } catch (IOException e) {
            LOG.warn("Iceberg metrics store init failed.", e);
            throw new RuntimeException(e);
        }
    }

    public void start() {
        this.metricsWriterThread.start();
        this.metricsCleanerExecutor.ifPresent(scheduledExecutorService -> {
            scheduledExecutorService.scheduleAtFixedRate(() -> {
                Instant calculateNewTimestamp = IcebergRestUtils.calculateNewTimestamp(Instant.now(), (-24) * this.retainDays);
                LOG.info("Try clean Iceberg expired metrics, {}.", calculateNewTimestamp);
                try {
                    this.icebergMetricsStore.clean(calculateNewTimestamp);
                } catch (Exception e) {
                    LOG.warn("Clean Iceberg metrics failed.", e);
                }
            }, 0L, 1L, TimeUnit.HOURS);
        });
    }

    public void recordMetric(MetricsReport metricsReport) {
        if (this.isClosed) {
            logMetrics("Drop Iceberg metrics because Iceberg Metrics Manager is closed.", metricsReport);
        } else {
            if (this.queue.offer(metricsReport)) {
                return;
            }
            logMetrics("Drop Iceberg metrics because metrics queue is full.", metricsReport);
        }
    }

    public void close() {
        this.isClosed = true;
        this.metricsCleanerExecutor.ifPresent(scheduledExecutorService -> {
            scheduledExecutorService.shutdownNow();
        });
        if (this.icebergMetricsStore != null) {
            try {
                this.icebergMetricsStore.close();
            } catch (IOException e) {
                LOG.warn("Close Iceberg metrics store failed.", e);
            }
        }
        if (this.metricsWriterThread != null) {
            this.metricsWriterThread.interrupt();
            try {
                this.metricsWriterThread.join();
            } catch (InterruptedException e2) {
                LOG.warn("Iceberg metrics manager is interrupted while join metrics writer thread.");
            }
        }
    }

    @VisibleForTesting
    IcebergMetricsStore getIcebergMetricsStore() {
        return this.icebergMetricsStore;
    }

    private void writeMetrics() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                MetricsReport take = this.queue.take();
                if (take != null) {
                    doRecordMetric(take);
                }
            } catch (InterruptedException e) {
                LOG.warn("Iceberg Metrics writer thread is interrupted.");
            }
        }
        MetricsReport poll = this.queue.poll();
        while (true) {
            MetricsReport metricsReport = poll;
            if (metricsReport == null) {
                return;
            }
            logMetrics("Drop Iceberg metrics because it's time to close metrics store.", metricsReport);
            poll = this.queue.poll();
        }
    }

    private IcebergMetricsStore loadIcebergMetricsStore(String str) {
        if (str == null) {
            str = DummyMetricsStore.ICEBERG_METRICS_STORE_DUMMY_NAME;
        }
        String str2 = (String) ICEBERG_METRICS_STORE_NAMES.getOrDefault(str, str);
        LOG.info("Load Iceberg metrics store: {}.", str2);
        try {
            return (IcebergMetricsStore) Class.forName(str2).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        } catch (Exception e) {
            LOG.error("Failed to create and initialize Iceberg metrics store by name {}.", str, e);
            throw new RuntimeException(e);
        }
    }

    private void logMetrics(String str, MetricsReport metricsReport) {
        LOG.info("{} {}.", str, this.icebergMetricsFormatter.toPrintableString(metricsReport));
    }

    private void doRecordMetric(MetricsReport metricsReport) {
        try {
            this.icebergMetricsStore.recordMetric(metricsReport);
        } catch (Exception e) {
            LOG.warn("Write Iceberg metrics failed.", e);
        }
    }
}
