package org.apache.iotdb.db.pipe.agent.task.subtask.processor;

import com.google.common.util.concurrent.ListeningExecutorService;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
import org.apache.iotdb.commons.pipe.agent.task.connection.EventSupplier;
import org.apache.iotdb.commons.pipe.agent.task.execution.PipeSubtaskScheduler;
import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeReportableSubtask;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector;
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
import org.apache.iotdb.db.pipe.metric.PipeProcessorMetrics;
import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.utils.ErrorHandlingUtils;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.class */
public class PipeProcessorSubtask extends PipeReportableSubtask {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeProcessorSubtask.class);
    private static final AtomicReference<PipeProcessorSubtaskWorkerManager> subtaskWorkerManager = new AtomicReference<>();
    private final String pipeName;
    private final String pipeNameWithCreationTime;
    private final int regionId;
    private final EventSupplier inputEventSupplier;
    private final PipeProcessor pipeProcessor;
    private final PipeEventCollector outputEventCollector;
    private final long subtaskCreationTime;

    public PipeProcessorSubtask(String str, String str2, long j, int i, EventSupplier eventSupplier, PipeProcessor pipeProcessor, PipeEventCollector pipeEventCollector) {
        super(str, j);
        this.pipeName = str2;
        this.pipeNameWithCreationTime = str2 + "_" + j;
        this.regionId = i;
        this.inputEventSupplier = eventSupplier;
        this.pipeProcessor = pipeProcessor;
        this.outputEventCollector = pipeEventCollector;
        this.subtaskCreationTime = System.currentTimeMillis();
        if (StorageEngine.getInstance().getAllDataRegionIds().contains(new DataRegionId(i))) {
            PipeProcessorMetrics.getInstance().register(this);
        }
    }

    public void bindExecutors(ListeningExecutorService listeningExecutorService, ExecutorService executorService, PipeSubtaskScheduler pipeSubtaskScheduler) {
        this.subtaskWorkerThreadPoolExecutor = listeningExecutorService;
        this.subtaskScheduler = pipeSubtaskScheduler;
        if (subtaskWorkerManager.get() == null) {
            synchronized (PipeProcessorSubtaskWorkerManager.class) {
                if (subtaskWorkerManager.get() == null) {
                    subtaskWorkerManager.set(new PipeProcessorSubtaskWorkerManager(listeningExecutorService));
                }
            }
        }
        subtaskWorkerManager.get().schedule(this);
    }

    protected boolean executeOnce() throws Exception {
        if (this.isClosed.get()) {
            return false;
        }
        Event maybeOf = this.lastEvent != null ? this.lastEvent : UserDefinedEnrichedEvent.maybeOf(this.inputEventSupplier.supply());
        setLastEvent(maybeOf);
        if (Objects.isNull(maybeOf)) {
            return false;
        }
        this.outputEventCollector.resetFlags();
        try {
            if (!this.isClosed.get()) {
                if (maybeOf instanceof TabletInsertionEvent) {
                    this.pipeProcessor.process((TabletInsertionEvent) maybeOf, this.outputEventCollector);
                    PipeProcessorMetrics.getInstance().markTabletEvent(this.taskID);
                } else if (maybeOf instanceof TsFileInsertionEvent) {
                    this.pipeProcessor.process((TsFileInsertionEvent) maybeOf, this.outputEventCollector);
                    PipeProcessorMetrics.getInstance().markTsFileEvent(this.taskID);
                    PipeDataNodeRemainingEventAndTimeMetrics.getInstance().markTsFileCollectInvocationCount(this.pipeNameWithCreationTime, this.outputEventCollector.getCollectInvocationCount());
                } else if (maybeOf instanceof PipeHeartbeatEvent) {
                    this.pipeProcessor.process(maybeOf, this.outputEventCollector);
                    ((PipeHeartbeatEvent) maybeOf).onProcessed();
                    PipeProcessorMetrics.getInstance().markPipeHeartbeatEvent(this.taskID);
                } else {
                    this.pipeProcessor.process(maybeOf instanceof UserDefinedEnrichedEvent ? ((UserDefinedEnrichedEvent) maybeOf).getUserDefinedEvent() : maybeOf, this.outputEventCollector);
                }
            }
            boolean z = (this.isClosed.get() || !this.outputEventCollector.hasNoGeneratedEvent() || this.outputEventCollector.isFailedToIncreaseReferenceCount() || (this.pipeProcessor instanceof PipeConsensusProcessor)) ? false : true;
            if (z && (maybeOf instanceof EnrichedEvent) && this.outputEventCollector.hasNoCollectInvocationAfterReset()) {
                PipeEventCommitManager.getInstance().enrichWithCommitterKeyAndCommitId((EnrichedEvent) maybeOf, this.creationTime, this.regionId);
            }
            decreaseReferenceCountAndReleaseLastEvent(maybeOf, z);
            return true;
        } catch (PipeRuntimeOutOfMemoryCriticalException e) {
            LOGGER.info("Temporarily out of memory in pipe event processing, will wait for the memory to release.", e);
            return false;
        } catch (Exception e2) {
            if (this.isClosed.get()) {
                LOGGER.info("Exception in pipe event processing, ignored because pipe is dropped.", e2);
                clearReferenceCountAndReleaseLastEvent(maybeOf);
                return true;
            }
            Object[] objArr = new Object[3];
            objArr[0] = this.taskID;
            objArr[1] = this.lastEvent instanceof EnrichedEvent ? this.lastEvent.coreReportMessage() : this.lastEvent;
            objArr[2] = ErrorHandlingUtils.getRootCause(e2).getMessage();
            throw new PipeException(String.format("Exception in pipe process, subtask: %s, last event: %s, root cause: %s", objArr), e2);
        }
    }

    public void submitSelf() {
    }

    public boolean isStoppedByException() {
        return (this.lastEvent instanceof EnrichedEvent) && this.retryCount.get() > 5;
    }

    public void close() {
        PipeProcessorMetrics.getInstance().deregister(this.taskID);
        try {
            this.isClosed.set(true);
            this.pipeProcessor.close();
        } catch (Exception e) {
            LOGGER.info("Exception occurred when closing pipe processor subtask {}, root cause: {}", new Object[]{this.taskID, ErrorHandlingUtils.getRootCause(e).getMessage(), e});
        } finally {
            super.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isClosed() {
        return this.isClosed.get();
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        PipeProcessorSubtask pipeProcessorSubtask = (PipeProcessorSubtask) obj;
        return Objects.equals(this.taskID, pipeProcessorSubtask.taskID) && Objects.equals(Long.valueOf(this.subtaskCreationTime), Long.valueOf(pipeProcessorSubtask.subtaskCreationTime));
    }

    public int hashCode() {
        return Objects.hash(this.taskID, Long.valueOf(this.subtaskCreationTime));
    }

    public String getPipeName() {
        return this.pipeName;
    }

    public int getRegionId() {
        return this.regionId;
    }

    public int getEventCount(boolean z) {
        EnrichedEvent enrichedEvent = this.lastEvent instanceof EnrichedEvent ? (EnrichedEvent) this.lastEvent : null;
        return (!Objects.nonNull(enrichedEvent) || (z && (enrichedEvent instanceof PipeHeartbeatEvent))) ? 0 : 1;
    }

    protected String getRootCause(Throwable th) {
        return ErrorHandlingUtils.getRootCause(th).getMessage();
    }

    protected void report(EnrichedEvent enrichedEvent, PipeRuntimeException pipeRuntimeException) {
        PipeDataNodeAgent.runtime().report(enrichedEvent, pipeRuntimeException);
    }
}
