package org.apache.skywalking.apm.collector.analysis.worker.model.impl;

import java.util.LinkedList;
import java.util.List;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractLocalAsyncWorker;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.core.cache.Collection;
import org.apache.skywalking.apm.collector.core.cache.Window;
import org.apache.skywalking.apm.collector.core.data.StreamData;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.core.util.StringUtils;
import org.apache.skywalking.apm.collector.storage.base.dao.IBatchDAO;
import org.apache.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/skywalking/apm/collector/analysis/worker/model/impl/PersistenceWorker.class */
public abstract class PersistenceWorker<INPUT_AND_OUTPUT extends StreamData, COLLECTION extends Collection> extends AbstractLocalAsyncWorker<INPUT_AND_OUTPUT, INPUT_AND_OUTPUT> {
    private final Logger logger;
    private final IBatchDAO batchDAO;
    private final int blockBatchPersistenceSize;

    /* JADX INFO: Access modifiers changed from: package-private */
    public PersistenceWorker(ModuleManager moduleManager) {
        super(moduleManager);
        this.logger = LoggerFactory.getLogger(PersistenceWorker.class);
        this.batchDAO = moduleManager.find("storage").getService(IBatchDAO.class);
        if (StringUtils.isNotEmpty(System.getProperty("batchSize"))) {
            this.blockBatchPersistenceSize = Integer.valueOf(System.getProperty("batchSize")).intValue();
        } else {
            this.blockBatchPersistenceSize = 500000;
        }
    }

    public boolean flushAndSwitch() {
        try {
            boolean trySwitchPointer = getCache().trySwitchPointer();
            if (trySwitchPointer) {
                getCache().switchPointer();
            }
            return trySwitchPointer;
        } finally {
            getCache().trySwitchPointerFinally();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractWorker
    public void onWork(INPUT_AND_OUTPUT input_and_output) {
        if (getCache().currentCollectionSize() >= this.blockBatchPersistenceSize) {
            try {
                if (getCache().trySwitchPointer()) {
                    getCache().switchPointer();
                    this.batchDAO.batchPersistence(buildBatchCollection());
                }
            } finally {
                getCache().trySwitchPointerFinally();
            }
        }
        cacheData(input_and_output);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r3v0, types: [org.apache.skywalking.apm.collector.analysis.worker.model.impl.PersistenceWorker<INPUT_AND_OUTPUT extends org.apache.skywalking.apm.collector.core.data.StreamData, COLLECTION extends org.apache.skywalking.apm.collector.core.cache.Collection>, org.apache.skywalking.apm.collector.analysis.worker.model.impl.PersistenceWorker] */
    @GraphComputingMetric(name = "/persistence/buildBatchCollection/")
    public final List<?> buildBatchCollection() {
        List linkedList = new LinkedList();
        while (getCache().getLast().isWriting()) {
            try {
                try {
                    Thread.sleep(10L);
                } catch (InterruptedException e) {
                    this.logger.warn("thread wake up");
                }
            } finally {
                getCache().finishReadingLast();
            }
        }
        if (getCache().getLast().collection() != null) {
            linkedList = prepareBatch(getCache().getLast());
        }
        return linkedList;
    }

    protected abstract List<Object> prepareBatch(COLLECTION collection);

    protected abstract Window<COLLECTION> getCache();

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract IPersistenceDAO<?, ?, INPUT_AND_OUTPUT> persistenceDAO();

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract boolean needMergeDBData();

    protected abstract void cacheData(INPUT_AND_OUTPUT input_and_output);
}
