package org.bboxdb.storage.tuplestore;

import io.prometheus.client.Gauge;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.bboxdb.commons.ServiceState;
import org.bboxdb.commons.concurrent.ThreadHelper;
import org.bboxdb.misc.BBoxDBConfiguration;
import org.bboxdb.misc.BBoxDBService;
import org.bboxdb.storage.entity.MemtableAndTupleStoreManagerPair;
import org.bboxdb.storage.memtable.MemtableWriterThread;
import org.bboxdb.storage.sstable.SSTableCheckpointThread;
import org.bboxdb.storage.sstable.compact.SSTableCompactorThread;
import org.bboxdb.storage.tuplestore.manager.TupleStoreManagerRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/bboxdb/storage/tuplestore/DiskStorage.class */
public class DiskStorage implements BBoxDBService {
    public final List<Thread> runningThreads = new ArrayList();
    protected final ServiceState serviceState = new ServiceState();
    protected final BlockingQueue<MemtableAndTupleStoreManagerPair> memtablesToFlush = new ArrayBlockingQueue(20);
    protected final File basedir;
    protected int flushThreadsPerStorage;
    protected final TupleStoreManagerRegistry tupleStoreManagerRegistry;
    protected final String performanceCounterLabel;
    protected static final Gauge unflushedMemtablesTotal = Gauge.build().name("bboxdb_unflushed_memtables_total").help("Total unflushed memtables").labelNames(new String[]{"storage"}).register();
    protected static final Gauge unflushedMemtablesBytes = Gauge.build().name("bboxdb_unflushed_memtables_bytes").help("Unflushed memtable bytes").labelNames(new String[]{"storage"}).register();
    private static final Logger logger = LoggerFactory.getLogger(DiskStorage.class);

    public DiskStorage(TupleStoreManagerRegistry tupleStoreManagerRegistry, File file, int i) {
        this.tupleStoreManagerRegistry = tupleStoreManagerRegistry;
        this.basedir = file;
        this.flushThreadsPerStorage = i;
        this.performanceCounterLabel = file.toString();
    }

    @Override // org.bboxdb.misc.BBoxDBService
    public void init() {
        if (this.serviceState.isInRunningState()) {
            logger.warn("Unable to init service, is already in {} state", this.serviceState);
            return;
        }
        this.serviceState.dipatchToStarting();
        this.memtablesToFlush.clear();
        startFlushThreads();
        startCompactThread();
        startCheckpointThread();
        this.serviceState.dispatchToRunning();
    }

    protected void startFlushThreads() {
        for (int i = 0; i < this.flushThreadsPerStorage; i++) {
            String str = i + ". Memtable write thread for storage: " + this.basedir;
            Thread thread = new Thread((Runnable) new MemtableWriterThread(this, this.basedir));
            thread.setName(str);
            thread.start();
            this.runningThreads.add(thread);
        }
    }

    protected void startCompactThread() {
        Thread thread = new Thread((Runnable) new SSTableCompactorThread(this));
        thread.setName("Compact thread for: " + this.basedir);
        thread.start();
        this.runningThreads.add(thread);
    }

    protected void startCheckpointThread() {
        BBoxDBConfiguration configuration = this.tupleStoreManagerRegistry.getConfiguration();
        if (configuration.getStorageCheckpointInterval() > 0) {
            Thread thread = new Thread((Runnable) new SSTableCheckpointThread(this, configuration.getStorageCheckpointInterval()));
            thread.setName("Checkpoint thread for: " + this.basedir);
            thread.start();
            this.runningThreads.add(thread);
        }
    }

    @Override // org.bboxdb.misc.BBoxDBService
    public void shutdown() {
        if (!this.serviceState.isInRunningState()) {
            logger.warn("Unable to stop service, is already in {} state", this.serviceState);
            return;
        }
        this.serviceState.dispatchToStopping();
        logger.info("Stop running threads");
        ThreadHelper.stopThreads(this.runningThreads);
        this.runningThreads.clear();
        this.serviceState.dispatchToTerminated();
    }

    @Override // org.bboxdb.misc.BBoxDBService
    public String getServicename() {
        return "Storage instance for: " + this.basedir.getAbsolutePath();
    }

    public void scheduleMemtableFlush(MemtableAndTupleStoreManagerPair memtableAndTupleStoreManagerPair) {
        try {
            this.memtablesToFlush.put(memtableAndTupleStoreManagerPair);
            ((Gauge.Child) unflushedMemtablesTotal.labels(new String[]{this.performanceCounterLabel})).inc();
            ((Gauge.Child) unflushedMemtablesBytes.labels(new String[]{this.performanceCounterLabel})).inc(memtableAndTupleStoreManagerPair.getMemtable().getSize());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public MemtableAndTupleStoreManagerPair takeNextUnflushedMemtable() throws InterruptedException {
        MemtableAndTupleStoreManagerPair take = this.memtablesToFlush.take();
        ((Gauge.Child) unflushedMemtablesTotal.labels(new String[]{this.performanceCounterLabel})).dec();
        ((Gauge.Child) unflushedMemtablesBytes.labels(new String[]{this.performanceCounterLabel})).dec(take.getMemtable().getSize());
        return take;
    }

    public File getBasedir() {
        return this.basedir;
    }

    public TupleStoreManagerRegistry getTupleStoreManagerRegistry() {
        return this.tupleStoreManagerRegistry;
    }
}
