package org.apache.iotdb.db.pipe.extractor.dataregion.realtime;

import java.util.Objects;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch;
import org.apache.iotdb.db.pipe.metric.PipeDataRegionExtractorMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.class */
public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegionExtractor {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeRealtimeDataRegionHybridExtractor.class);

    @Override // org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor
    protected void doExtract(PipeRealtimeEvent pipeRealtimeEvent) {
        EnrichedEvent event = pipeRealtimeEvent.getEvent();
        if (event instanceof TabletInsertionEvent) {
            extractTabletInsertion(pipeRealtimeEvent);
            return;
        }
        if (event instanceof TsFileInsertionEvent) {
            extractTsFileInsertion(pipeRealtimeEvent);
        } else if (event instanceof PipeHeartbeatEvent) {
            extractHeartbeat(pipeRealtimeEvent);
        } else {
            if (!(event instanceof PipeDeleteDataNodeEvent)) {
                throw new UnsupportedOperationException(String.format("Unsupported event type %s for hybrid realtime extractor %s", event.getClass(), this));
            }
            extractDirectly(pipeRealtimeEvent);
        }
    }

    @Override // org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor
    public boolean isNeedListenToTsFile() {
        return this.shouldExtractInsertion;
    }

    @Override // org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor
    public boolean isNeedListenToInsertNode() {
        return this.shouldExtractInsertion;
    }

    private void extractTabletInsertion(PipeRealtimeEvent pipeRealtimeEvent) {
        if (canNotUseTabletAnyMore()) {
            pipeRealtimeEvent.getTsFileEpoch().migrateState(this, state -> {
                switch (state) {
                    case USING_TSFILE:
                    case EMPTY:
                        return TsFileEpoch.State.USING_TSFILE;
                    case USING_TABLET:
                    case USING_BOTH:
                    default:
                        return TsFileEpoch.State.USING_BOTH;
                }
            });
        }
        TsFileEpoch.State state2 = pipeRealtimeEvent.getTsFileEpoch().getState(this);
        switch (state2) {
            case USING_TSFILE:
                pipeRealtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName(), false);
                return;
            case EMPTY:
            case USING_TABLET:
            case USING_BOTH:
                if (this.pendingQueue.waitedOffer(pipeRealtimeEvent)) {
                    return;
                }
                String format = String.format("extractTabletInsertion: pending queue of PipeRealtimeDataRegionHybridExtractor %s has reached capacity, discard tablet event %s, current state %s", this, pipeRealtimeEvent, pipeRealtimeEvent.getTsFileEpoch().getState(this));
                LOGGER.error(format);
                PipeDataNodeAgent.runtime().report(this.pipeTaskMeta, (PipeRuntimeException) new PipeRuntimeNonCriticalException(format));
                pipeRealtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName(), false);
                return;
            default:
                throw new UnsupportedOperationException(String.format("Unsupported state %s for hybrid realtime extractor %s", state2, PipeRealtimeDataRegionHybridExtractor.class.getName()));
        }
    }

    private void extractTsFileInsertion(PipeRealtimeEvent pipeRealtimeEvent) {
        pipeRealtimeEvent.getTsFileEpoch().migrateState(this, state -> {
            switch (state) {
                case USING_TSFILE:
                case EMPTY:
                    return TsFileEpoch.State.USING_TSFILE;
                case USING_TABLET:
                    return ((PipeTsFileInsertionEvent) pipeRealtimeEvent.getEvent()).getFileStartTime() < pipeRealtimeEvent.getTsFileEpoch().getInsertNodeMinTime() ? TsFileEpoch.State.USING_BOTH : TsFileEpoch.State.USING_TABLET;
                case USING_BOTH:
                default:
                    return TsFileEpoch.State.USING_BOTH;
            }
        });
        TsFileEpoch.State state2 = pipeRealtimeEvent.getTsFileEpoch().getState(this);
        switch (state2) {
            case USING_TSFILE:
            case EMPTY:
            case USING_TABLET:
            case USING_BOTH:
                if (this.pendingQueue.waitedOffer(pipeRealtimeEvent)) {
                    return;
                }
                String format = String.format("extractTsFileInsertion: pending queue of PipeRealtimeDataRegionHybridExtractor %s has reached capacity, discard TsFile event %s, current state %s", this, pipeRealtimeEvent, pipeRealtimeEvent.getTsFileEpoch().getState(this));
                LOGGER.error(format);
                PipeDataNodeAgent.runtime().report(this.pipeTaskMeta, (PipeRuntimeException) new PipeRuntimeNonCriticalException(format));
                pipeRealtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName(), false);
                return;
            default:
                throw new UnsupportedOperationException(String.format("Unsupported state %s for hybrid realtime extractor %s", state2, PipeRealtimeDataRegionHybridExtractor.class.getName()));
        }
    }

    private boolean canNotUseTabletAnyMore() {
        return isPipeTaskCurrentlyRestarted() || mayWalSizeReachThrottleThreshold() || mayMemTablePinnedCountReachDangerousThreshold() || isHistoricalTsFileEventCountExceededLimit() || isRealtimeTsFileEventCountExceededLimit() || mayTsFileLinkedCountReachDangerousThreshold() || mayInsertNodeMemoryReachDangerousThreshold();
    }

    private boolean isPipeTaskCurrentlyRestarted() {
        return PipeDataNodeAgent.task().isPipeTaskCurrentlyRestarted(this.pipeName);
    }

    private boolean mayWalSizeReachThrottleThreshold() {
        return 3 * WALManager.getInstance().getTotalDiskUsage() > IoTDBDescriptor.getInstance().getConfig().getThrottleThreshold();
    }

    private boolean mayMemTablePinnedCountReachDangerousThreshold() {
        return PipeDataNodeResourceManager.wal().getPinnedWalCount() >= PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount();
    }

    private boolean isHistoricalTsFileEventCountExceededLimit() {
        IoTDBDataRegionExtractor ioTDBDataRegionExtractor = PipeDataRegionExtractorMetrics.getInstance().getExtractorMap().get(getTaskID());
        return Objects.nonNull(ioTDBDataRegionExtractor) && ioTDBDataRegionExtractor.getHistoricalTsFileInsertionEventCount() >= PipeConfig.getInstance().getPipeMaxAllowedHistoricalTsFilePerDataRegion();
    }

    private boolean isRealtimeTsFileEventCountExceededLimit() {
        return this.pendingQueue.getTsFileInsertionEventCount() >= PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion();
    }

    private boolean mayTsFileLinkedCountReachDangerousThreshold() {
        return ((long) PipeDataNodeResourceManager.tsfile().getLinkedTsfileCount()) >= PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount();
    }

    private boolean mayInsertNodeMemoryReachDangerousThreshold() {
        return (3 * PipeDataNodeAgent.task().getFloatingMemoryUsageInByte(this.pipeName)) * ((long) PipeDataNodeAgent.task().getPipeCount()) >= 2 * PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes();
    }

    public Event supply() {
        Event supplyDirectly;
        Event directPoll = this.pendingQueue.directPoll();
        while (true) {
            PipeRealtimeEvent pipeRealtimeEvent = (PipeRealtimeEvent) directPoll;
            if (pipeRealtimeEvent == null) {
                return null;
            }
            EnrichedEvent event = pipeRealtimeEvent.getEvent();
            if (event instanceof TabletInsertionEvent) {
                supplyDirectly = supplyTabletInsertion(pipeRealtimeEvent);
            } else if (event instanceof TsFileInsertionEvent) {
                supplyDirectly = supplyTsFileInsertion(pipeRealtimeEvent);
            } else if (event instanceof PipeHeartbeatEvent) {
                supplyDirectly = supplyHeartbeat(pipeRealtimeEvent);
            } else {
                if (!(event instanceof PipeDeleteDataNodeEvent) && !(event instanceof ProgressReportEvent)) {
                    throw new UnsupportedOperationException(String.format("Unsupported event type %s for hybrid realtime extractor %s to supply.", event.getClass(), this));
                }
                supplyDirectly = supplyDirectly(pipeRealtimeEvent);
            }
            pipeRealtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName(), false);
            if (supplyDirectly != null) {
                return supplyDirectly;
            }
            directPoll = this.pendingQueue.directPoll();
        }
    }

    private Event supplyTabletInsertion(PipeRealtimeEvent pipeRealtimeEvent) {
        pipeRealtimeEvent.getTsFileEpoch().migrateState(this, state -> {
            return !state.equals(TsFileEpoch.State.EMPTY) ? state : canNotUseTabletAnyMore() ? TsFileEpoch.State.USING_TSFILE : TsFileEpoch.State.USING_TABLET;
        });
        switch (pipeRealtimeEvent.getTsFileEpoch().getState(this)) {
            case USING_TSFILE:
                return null;
            case EMPTY:
            case USING_TABLET:
            case USING_BOTH:
            default:
                if (pipeRealtimeEvent.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName())) {
                    return pipeRealtimeEvent.getEvent();
                }
                pipeRealtimeEvent.getTsFileEpoch().migrateState(this, state2 -> {
                    return TsFileEpoch.State.USING_BOTH;
                });
                LOGGER.warn("Discard tablet event {} because it is not reliable anymore. Change the state of TsFileEpoch to USING_TSFILE.", pipeRealtimeEvent);
                return null;
        }
    }

    private Event supplyTsFileInsertion(PipeRealtimeEvent pipeRealtimeEvent) {
        pipeRealtimeEvent.getTsFileEpoch().migrateState(this, state -> {
            if (!state.equals(TsFileEpoch.State.EMPTY)) {
                return state;
            }
            LOGGER.error(String.format("EMPTY TsFileEpoch when supplying TsFile Event %s", pipeRealtimeEvent));
            return TsFileEpoch.State.USING_TSFILE;
        });
        switch (pipeRealtimeEvent.getTsFileEpoch().getState(this)) {
            case USING_TSFILE:
            case EMPTY:
            case USING_BOTH:
            default:
                if (pipeRealtimeEvent.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName())) {
                    return pipeRealtimeEvent.getEvent();
                }
                String format = String.format("TsFile Event %s can not be supplied because the reference count can not be increased, the data represented by this event is lost", pipeRealtimeEvent.getEvent());
                LOGGER.error(format);
                PipeDataNodeAgent.runtime().report(this.pipeTaskMeta, (PipeRuntimeException) new PipeRuntimeNonCriticalException(format));
                return null;
            case USING_TABLET:
                return null;
        }
    }
}
