package org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.snapshot;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkKind;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.PostgresSourceFetchTaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotFetchTask.class */
public class PostgresSnapshotFetchTask implements FetchTask<SourceSplitBase> {
    private static final Logger log = LoggerFactory.getLogger(PostgresSnapshotFetchTask.class);
    private final SnapshotSplit split;
    private volatile boolean taskRunning = false;
    private PostgresSnapshotSplitReadTask snapshotSplitReadTask;

    public PostgresSnapshotFetchTask(SnapshotSplit snapshotSplit) {
        this.split = snapshotSplit;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask
    public void execute(FetchTask.Context context) throws Exception {
        PostgresSourceFetchTaskContext postgresSourceFetchTaskContext = (PostgresSourceFetchTaskContext) context;
        this.taskRunning = true;
        this.snapshotSplitReadTask = new PostgresSnapshotSplitReadTask(postgresSourceFetchTaskContext.getDbzConnectorConfig(), postgresSourceFetchTaskContext.getOffsetContext(), postgresSourceFetchTaskContext.getSnapshotChangeEventSourceMetrics(), postgresSourceFetchTaskContext.getDatabaseSchema(), postgresSourceFetchTaskContext.getDataConnection(), postgresSourceFetchTaskContext.getDispatcher(), this.split);
        SnapshotSplitChangeEventSourceContext snapshotSplitChangeEventSourceContext = new SnapshotSplitChangeEventSourceContext();
        if (!this.snapshotSplitReadTask.execute(snapshotSplitChangeEventSourceContext, postgresSourceFetchTaskContext.getOffsetContext()).isCompletedOrSkipped()) {
            this.taskRunning = false;
            throw new IllegalStateException(String.format("Read snapshot for split %s fail", this.split));
        }
        boolean isAfter = snapshotSplitChangeEventSourceContext.getHighWatermark().isAfter(snapshotSplitChangeEventSourceContext.getLowWatermark());
        if (context.isExactlyOnce()) {
            dispatchBinlogEndEvent(createBackFillWalSplit(snapshotSplitChangeEventSourceContext), ((PostgresSourceFetchTaskContext) context).getOffsetContext().getPartition(), ((PostgresSourceFetchTaskContext) context).getDispatcher());
            this.taskRunning = false;
        } else {
            this.taskRunning = false;
            if (isAfter) {
                log.debug("Skip merge changelog(exactly-once) for snapshot split {}", this.split);
            }
        }
    }

    private IncrementalSplit createBackFillWalSplit(SnapshotSplitChangeEventSourceContext snapshotSplitChangeEventSourceContext) {
        return new IncrementalSplit(this.split.splitId(), Collections.singletonList(this.split.getTableId()), snapshotSplitChangeEventSourceContext.getLowWatermark(), snapshotSplitChangeEventSourceContext.getHighWatermark(), new ArrayList());
    }

    private void dispatchBinlogEndEvent(IncrementalSplit incrementalSplit, Map<String, ?> map, JdbcSourceEventDispatcher jdbcSourceEventDispatcher) throws InterruptedException {
        jdbcSourceEventDispatcher.dispatchWatermarkEvent(map, incrementalSplit, incrementalSplit.getStopOffset(), WatermarkKind.END);
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask
    public boolean isRunning() {
        return this.taskRunning;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask
    public void shutdown() {
        this.taskRunning = false;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask
    public SourceSplitBase getSplit() {
        return this.split;
    }
}
