package org.apache.iotdb.db.pipe.extractor.dataregion.realtime;

import com.google.common.base.MoreObjects;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInsertionDataNodeListener;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeTimePartitionListener;
import org.apache.iotdb.db.pipe.metric.PipeDataRegionEventCounter;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.wal.node.WALNode;
import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.pipe.api.PipeExtractor;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.class */
public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeRealtimeDataRegionExtractor.class);
    protected String pipeName;
    protected long creationTime;
    protected String dataRegionId;
    protected PipeTaskMeta pipeTaskMeta;
    protected boolean shouldExtractInsertion;
    protected boolean shouldExtractDeletion;
    protected TreePattern treePattern;
    protected TablePattern tablePattern;
    private long startTimePartitionIdLowerBound;
    private long endTimePartitionIdUpperBound;
    private boolean shouldTransferModFile;
    private boolean sloppyTimeRange;
    private boolean sloppyPattern;
    private String taskID;
    private boolean isDbNameCoveredByPattern = false;
    protected long realtimeDataExtractionStartTime = Long.MIN_VALUE;
    protected long realtimeDataExtractionEndTime = WALNode.DEFAULT_SAFELY_DELETED_SEARCH_INDEX;
    private boolean disableCheckingDataRegionTimePartitionCovering = false;
    private final AtomicReference<Pair<Long, Long>> dataRegionTimePartitionIdBound = new AtomicReference<>();
    protected boolean isForwardingPipeRequests = true;
    protected final UnboundedBlockingPendingQueue<Event> pendingQueue = new UnboundedBlockingPendingQueue<>(new PipeDataRegionEventCounter());
    protected final AtomicBoolean isClosed = new AtomicBoolean(false);

    public void validate(PipeParameterValidator pipeParameterValidator) throws Exception {
        PipeParameters parameters = pipeParameterValidator.getParameters();
        try {
            this.realtimeDataExtractionStartTime = parameters.hasAnyAttributes(new String[]{"source.start-time", "extractor.start-time"}) ? DateTimeUtils.convertTimestampOrDatetimeStrToLongWithDefaultZone(parameters.getStringByKeys(new String[]{"source.start-time", "extractor.start-time"})) : Long.MIN_VALUE;
            this.realtimeDataExtractionEndTime = parameters.hasAnyAttributes(new String[]{"source.end-time", "extractor.end-time"}) ? DateTimeUtils.convertTimestampOrDatetimeStrToLongWithDefaultZone(parameters.getStringByKeys(new String[]{"source.end-time", "extractor.end-time"})) : WALNode.DEFAULT_SAFELY_DELETED_SEARCH_INDEX;
            if (this.realtimeDataExtractionStartTime > this.realtimeDataExtractionEndTime) {
                throw new PipeParameterNotValidException(String.format("%s (%s) [%s] should be less than or equal to %s (%s) [%s].", "source.start-time", "extractor.start-time", Long.valueOf(this.realtimeDataExtractionStartTime), "source.end-time", "extractor.end-time", Long.valueOf(this.realtimeDataExtractionEndTime)));
            }
            String trim = parameters.getStringOrDefault(Arrays.asList("extractor.realtime.loose-range", "source.realtime.loose-range"), "").trim();
            if ("all".equalsIgnoreCase(trim)) {
                this.sloppyTimeRange = true;
                this.sloppyPattern = true;
                return;
            }
            Set set = (Set) Arrays.stream(trim.split(",")).map((v0) -> {
                return v0.trim();
            }).filter(str -> {
                return !str.isEmpty();
            }).map((v0) -> {
                return v0.toLowerCase();
            }).collect(Collectors.toSet());
            this.sloppyTimeRange = set.remove("time");
            this.sloppyPattern = set.remove("path");
            if (!set.isEmpty()) {
                throw new PipeParameterNotValidException(String.format("Parameters in set %s are not allowed in 'realtime.loose-range'", set));
            }
        } catch (PipeParameterNotValidException e) {
            throw e;
        } catch (Exception e2) {
            throw new PipeParameterNotValidException(e2.getMessage());
        }
    }

    public void customize(PipeParameters pipeParameters, PipeExtractorRuntimeConfiguration pipeExtractorRuntimeConfiguration) throws Exception {
        String databaseName;
        PipeTaskExtractorRuntimeEnvironment runtimeEnvironment = pipeExtractorRuntimeConfiguration.getRuntimeEnvironment();
        Pair<Boolean, Boolean> parseInsertionDeletionListeningOptionPair = DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(pipeParameters);
        this.shouldExtractInsertion = ((Boolean) parseInsertionDeletionListeningOptionPair.getLeft()).booleanValue();
        this.shouldExtractDeletion = ((Boolean) parseInsertionDeletionListeningOptionPair.getRight()).booleanValue();
        this.pipeName = runtimeEnvironment.getPipeName();
        this.dataRegionId = String.valueOf(runtimeEnvironment.getRegionId());
        this.pipeTaskMeta = runtimeEnvironment.getPipeTaskMeta();
        this.creationTime = runtimeEnvironment.getCreationTime();
        this.taskID = this.pipeName + "_" + this.dataRegionId + "_" + this.creationTime;
        this.treePattern = TreePattern.parsePipePatternFromSourceParameters(pipeParameters);
        this.tablePattern = TablePattern.parsePipePatternFromSourceParameters(pipeParameters);
        DataRegion dataRegion = StorageEngine.getInstance().getDataRegion(new DataRegionId(runtimeEnvironment.getRegionId()));
        if (dataRegion != null && (databaseName = dataRegion.getDatabaseName()) != null) {
            this.isDbNameCoveredByPattern = this.treePattern.coversDb(databaseName) && this.tablePattern.coversDb(databaseName);
        }
        this.startTimePartitionIdLowerBound = this.realtimeDataExtractionStartTime % TimePartitionUtils.getTimePartitionInterval() == 0 ? TimePartitionUtils.getTimePartitionId(this.realtimeDataExtractionStartTime) : TimePartitionUtils.getTimePartitionId(this.realtimeDataExtractionStartTime) + 1;
        this.endTimePartitionIdUpperBound = this.realtimeDataExtractionEndTime % TimePartitionUtils.getTimePartitionInterval() == 0 ? TimePartitionUtils.getTimePartitionId(this.realtimeDataExtractionEndTime) : TimePartitionUtils.getTimePartitionId(this.realtimeDataExtractionEndTime) - 1;
        this.isForwardingPipeRequests = true;
        if (pipeParameters.hasAnyAttributes(new String[]{"extractor.mods", "source.mods"})) {
            this.shouldTransferModFile = pipeParameters.getBooleanOrDefault(Arrays.asList("extractor.mods", "source.mods"), this.shouldExtractDeletion);
        } else {
            this.shouldTransferModFile = pipeParameters.getBooleanOrDefault(Arrays.asList("source.mods.enable", "extractor.mods.enable"), this.shouldExtractDeletion);
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Pipe {}@{}: realtime data region extractor is initialized with parameters: {}.", new Object[]{this.pipeName, this.dataRegionId, pipeParameters});
        }
    }

    public void start() throws Exception {
        PipeTimePartitionListener.getInstance().startListen(this.dataRegionId, this);
        PipeInsertionDataNodeListener.getInstance().startListenAndAssign(this.dataRegionId, this);
    }

    public void close() throws Exception {
        if (Objects.nonNull(this.dataRegionId)) {
            PipeInsertionDataNodeListener.getInstance().stopListenAndAssign(this.dataRegionId, this);
            PipeTimePartitionListener.getInstance().stopListen(this.dataRegionId, this);
        }
        synchronized (this.isClosed) {
            clearPendingQueue();
            this.isClosed.set(true);
        }
    }

    private void clearPendingQueue() {
        ArrayList arrayList = new ArrayList(this.pendingQueue.size());
        UnboundedBlockingPendingQueue<Event> unboundedBlockingPendingQueue = this.pendingQueue;
        Objects.requireNonNull(arrayList);
        unboundedBlockingPendingQueue.forEach((v1) -> {
            r1.add(v1);
        });
        this.pendingQueue.clear();
        arrayList.forEach(event -> {
            if (event instanceof EnrichedEvent) {
                ((EnrichedEvent) event).clearReferenceCount(PipeRealtimeDataRegionExtractor.class.getName());
            }
        });
    }

    public final void extract(PipeRealtimeEvent pipeRealtimeEvent) {
        if (pipeRealtimeEvent.getEvent() instanceof ProgressReportEvent) {
            extractDirectly(pipeRealtimeEvent);
            return;
        }
        if (this.isDbNameCoveredByPattern) {
            pipeRealtimeEvent.skipParsingPattern();
        }
        if (!this.disableCheckingDataRegionTimePartitionCovering && Objects.nonNull(this.dataRegionTimePartitionIdBound.get())) {
            if (isDataRegionTimePartitionCoveredByTimeRange()) {
                pipeRealtimeEvent.skipParsingTime();
            } else {
                this.disableCheckingDataRegionTimePartitionCovering = true;
            }
        }
        if ((!pipeRealtimeEvent.shouldParseTime() || pipeRealtimeEvent.getEvent().mayEventTimeOverlappedWithTimeRange()) && (!pipeRealtimeEvent.shouldParsePattern() || pipeRealtimeEvent.getEvent().mayEventPathsOverlappedWithPattern())) {
            if (this.sloppyTimeRange) {
                pipeRealtimeEvent.skipParsingTime();
            }
            if (this.sloppyPattern) {
                pipeRealtimeEvent.skipParsingPattern();
            }
            doExtract(pipeRealtimeEvent);
        } else {
            pipeRealtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionExtractor.class.getName(), false);
        }
        synchronized (this.isClosed) {
            if (this.isClosed.get()) {
                clearPendingQueue();
            }
        }
    }

    protected abstract void doExtract(PipeRealtimeEvent pipeRealtimeEvent);

    /* JADX INFO: Access modifiers changed from: protected */
    public void extractHeartbeat(PipeRealtimeEvent pipeRealtimeEvent) {
        ((PipeHeartbeatEvent) pipeRealtimeEvent.getEvent()).recordExtractorQueueSize(this.pendingQueue);
        PipeRealtimeEvent peekLast = this.pendingQueue.peekLast();
        if ((peekLast instanceof PipeRealtimeEvent) && (peekLast.getEvent() instanceof PipeHeartbeatEvent) && (((PipeHeartbeatEvent) peekLast.getEvent()).isShouldPrintMessage() || !((PipeHeartbeatEvent) pipeRealtimeEvent.getEvent()).isShouldPrintMessage())) {
            pipeRealtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionExtractor.class.getName(), false);
        } else {
            if (this.pendingQueue.waitedOffer(pipeRealtimeEvent)) {
                return;
            }
            LOGGER.error("extract: pending queue of PipeRealtimeDataRegionHybridExtractor {} has reached capacity, discard heartbeat event {}", this, pipeRealtimeEvent);
            pipeRealtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionExtractor.class.getName(), false);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void extractDirectly(PipeRealtimeEvent pipeRealtimeEvent) {
        if (this.pendingQueue.waitedOffer(pipeRealtimeEvent)) {
            return;
        }
        String format = String.format("extract: pending queue of %s %s has reached capacity, discard event %s", getClass().getSimpleName(), this, pipeRealtimeEvent);
        LOGGER.error(format);
        PipeDataNodeAgent.runtime().report(this.pipeTaskMeta, (PipeRuntimeException) new PipeRuntimeNonCriticalException(format));
        pipeRealtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionExtractor.class.getName(), false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Event supplyHeartbeat(PipeRealtimeEvent pipeRealtimeEvent) {
        if (pipeRealtimeEvent.increaseReferenceCount(PipeRealtimeDataRegionExtractor.class.getName())) {
            return pipeRealtimeEvent.getEvent();
        }
        LOGGER.error("Heartbeat Event {} can not be supplied because the reference count can not be increased", pipeRealtimeEvent.getEvent());
        return null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Event supplyDirectly(PipeRealtimeEvent pipeRealtimeEvent) {
        if (pipeRealtimeEvent.increaseReferenceCount(PipeRealtimeDataRegionExtractor.class.getName())) {
            return pipeRealtimeEvent.getEvent();
        }
        String format = String.format("Event %s can not be supplied because the reference count can not be increased, the data represented by this event is lost", pipeRealtimeEvent.getEvent());
        LOGGER.error(format);
        PipeDataNodeAgent.runtime().report(this.pipeTaskMeta, (PipeRuntimeException) new PipeRuntimeNonCriticalException(format));
        return null;
    }

    public final String getPipeName() {
        return this.pipeName;
    }

    public final long getCreationTime() {
        return this.creationTime;
    }

    public final PipeTaskMeta getPipeTaskMeta() {
        return this.pipeTaskMeta;
    }

    public final boolean shouldExtractInsertion() {
        return this.shouldExtractInsertion;
    }

    public final boolean shouldExtractDeletion() {
        return this.shouldExtractDeletion;
    }

    public final TreePattern getTreePattern() {
        return this.treePattern;
    }

    public final TablePattern getTablePattern() {
        return this.tablePattern;
    }

    public final String getDataRegionId() {
        return this.dataRegionId;
    }

    public final long getRealtimeDataExtractionStartTime() {
        return this.realtimeDataExtractionStartTime;
    }

    public final long getRealtimeDataExtractionEndTime() {
        return this.realtimeDataExtractionEndTime;
    }

    public void setDataRegionTimePartitionIdBound(Pair<Long, Long> pair) {
        LOGGER.info("PipeRealtimeDataRegionExtractor({}) observed data region {} time partition growth, recording time partition id bound: {}.", new Object[]{this.taskID, this.dataRegionId, pair});
        this.dataRegionTimePartitionIdBound.set(pair);
    }

    private boolean isDataRegionTimePartitionCoveredByTimeRange() {
        Pair<Long, Long> pair = this.dataRegionTimePartitionIdBound.get();
        return this.startTimePartitionIdLowerBound <= ((Long) pair.left).longValue() && ((Long) pair.right).longValue() <= this.endTimePartitionIdUpperBound;
    }

    public final boolean isForwardingPipeRequests() {
        return this.isForwardingPipeRequests;
    }

    public abstract boolean isNeedListenToTsFile();

    public abstract boolean isNeedListenToInsertNode();

    public final boolean isShouldTransferModFile() {
        return this.shouldTransferModFile;
    }

    public String toString() {
        return MoreObjects.toStringHelper(this).add("pipeName", this.pipeName).add("creationTime", this.creationTime).add("dataRegionId", this.dataRegionId).add("pipeTaskMeta", this.pipeTaskMeta).add("shouldExtractInsertion", this.shouldExtractInsertion).add("shouldExtractDeletion", this.shouldExtractDeletion).add("treePattern", this.treePattern).add("tablePattern", this.tablePattern).add("isDbNameCoveredByPattern", this.isDbNameCoveredByPattern).add("realtimeDataExtractionStartTime", this.realtimeDataExtractionStartTime).add("realtimeDataExtractionEndTime", this.realtimeDataExtractionEndTime).add("disableCheckingDataRegionTimePartitionCovering", this.disableCheckingDataRegionTimePartitionCovering).add("startTimePartitionIdLowerBound", this.startTimePartitionIdLowerBound).add("endTimePartitionIdUpperBound", this.endTimePartitionIdUpperBound).add("dataRegionTimePartitionIdBound", this.dataRegionTimePartitionIdBound).add("isForwardingPipeRequests", this.isForwardingPipeRequests).add("shouldTransferModFile", this.shouldTransferModFile).add("sloppyTimeRange", this.sloppyTimeRange).add("sloppyPattern", this.sloppyPattern).add("pendingQueue", this.pendingQueue).add("isClosed", this.isClosed).add("taskID", this.taskID).toString();
    }

    public int getTabletInsertionEventCount() {
        return this.pendingQueue.getTabletInsertionEventCount();
    }

    public int getTsFileInsertionEventCount() {
        return this.pendingQueue.getTsFileInsertionEventCount();
    }

    public int getPipeHeartbeatEventCount() {
        return this.pendingQueue.getPipeHeartbeatEventCount();
    }

    public int getEventCount() {
        return this.pendingQueue.size();
    }

    public String getTaskID() {
        return this.taskID;
    }
}
