package org.apache.skywalking.apm.collector.analysis.worker.model.impl;

import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractLocalAsyncWorker;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerException;
import org.apache.skywalking.apm.collector.analysis.worker.model.impl.data.MergeDataCache;
import org.apache.skywalking.apm.collector.analysis.worker.model.impl.data.MergeDataCollection;
import org.apache.skywalking.apm.collector.core.data.StreamData;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/skywalking/apm/collector/analysis/worker/model/impl/AggregationWorker.class */
public abstract class AggregationWorker<INPUT extends StreamData, OUTPUT extends StreamData> extends AbstractLocalAsyncWorker<INPUT, OUTPUT> {
    private final Logger logger;
    private MergeDataCache<OUTPUT> mergeDataCache;
    private int messageNum;

    public AggregationWorker(ModuleManager moduleManager) {
        super(moduleManager);
        this.logger = LoggerFactory.getLogger(AggregationWorker.class);
        this.mergeDataCache = new MergeDataCache<>();
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected OUTPUT transform(INPUT input) {
        return input;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractWorker
    public void onWork(INPUT input) throws WorkerException {
        OUTPUT transform = transform(input);
        this.messageNum++;
        aggregate(transform);
        if (this.messageNum >= 100) {
            sendToNext();
            this.messageNum = 0;
        }
        if (input.getEndOfBatchContext().isEndOfBatch()) {
            sendToNext();
        }
    }

    private void sendToNext() throws WorkerException {
        this.mergeDataCache.switchPointer();
        while (((MergeDataCollection) this.mergeDataCache.getLast()).isWriting()) {
            try {
                Thread.sleep(10L);
            } catch (InterruptedException e) {
                throw new WorkerException(e.getMessage(), e);
            }
        }
        ((MergeDataCollection) this.mergeDataCache.getLast()).m1collection().forEach((str, streamData) -> {
            this.logger.debug(streamData.toString());
            onNext(streamData);
        });
        this.mergeDataCache.finishReadingLast();
    }

    private void aggregate(OUTPUT output) {
        this.mergeDataCache.writing();
        if (this.mergeDataCache.containsKey(output.getId())) {
            this.mergeDataCache.get(output.getId()).mergeAndFormulaCalculateData(output);
        } else {
            output.calculateFormula();
            this.mergeDataCache.put(output.getId(), output);
        }
        this.mergeDataCache.finishWriting();
    }
}
