package io.delta.flink.source.internal.enumerator.processor;

import io.delta.flink.internal.options.DeltaConnectorConfiguration;
import io.delta.flink.source.internal.DeltaSourceOptions;
import io.delta.flink.source.internal.enumerator.monitor.ChangesPerVersion;
import io.delta.flink.source.internal.enumerator.monitor.TableMonitor;
import io.delta.flink.source.internal.enumerator.monitor.TableMonitorResult;
import io.delta.flink.source.internal.exceptions.DeltaSourceException;
import io.delta.flink.source.internal.exceptions.DeltaSourceExceptions;
import io.delta.flink.source.internal.file.AddFileEnumerator;
import io.delta.flink.source.internal.state.DeltaEnumeratorStateCheckpointBuilder;
import io.delta.flink.source.internal.state.DeltaSourceSplit;
import io.delta.flink.source.internal.utils.SourceUtils;
import io.delta.standalone.actions.AddFile;
import java.lang.invoke.SerializedLambda;
import java.util.List;
import java.util.function.Consumer;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.core.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/delta/flink/source/internal/enumerator/processor/ChangesProcessor.class */
public class ChangesProcessor extends TableProcessorBase implements ContinuousTableProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(ChangesProcessor.class);
    private final TableMonitor tableMonitor;
    private final SplitEnumeratorContext<DeltaSourceSplit> enumContext;
    private final long checkInterval;
    private final long initialDelay;
    private long currentSnapshotVersion;

    public ChangesProcessor(Path path, TableMonitor tableMonitor, SplitEnumeratorContext<DeltaSourceSplit> splitEnumeratorContext, AddFileEnumerator<DeltaSourceSplit> addFileEnumerator, DeltaConnectorConfiguration deltaConnectorConfiguration) {
        super(path, addFileEnumerator);
        this.tableMonitor = tableMonitor;
        this.enumContext = splitEnumeratorContext;
        this.currentSnapshotVersion = this.tableMonitor.getMonitorVersion();
        this.checkInterval = ((Long) deltaConnectorConfiguration.getValue(DeltaSourceOptions.UPDATE_CHECK_INTERVAL)).longValue();
        this.initialDelay = ((Long) deltaConnectorConfiguration.getValue(DeltaSourceOptions.UPDATE_CHECK_INITIAL_DELAY)).longValue();
    }

    @Override // io.delta.flink.source.internal.enumerator.processor.TableProcessor
    public void process(Consumer<List<DeltaSourceSplit>> consumer) {
        this.enumContext.callAsync(this.tableMonitor, (tableMonitorResult, th) -> {
            processDiscoveredVersions(tableMonitorResult, consumer, th);
        }, this.initialDelay, this.checkInterval);
    }

    @Override // io.delta.flink.source.internal.enumerator.processor.TableProcessor
    public long getSnapshotVersion() {
        return this.currentSnapshotVersion;
    }

    @Override // io.delta.flink.source.internal.enumerator.processor.TableProcessor
    public DeltaEnumeratorStateCheckpointBuilder<DeltaSourceSplit> snapshotState(DeltaEnumeratorStateCheckpointBuilder<DeltaSourceSplit> deltaEnumeratorStateCheckpointBuilder) {
        return deltaEnumeratorStateCheckpointBuilder.withMonitoringForChanges(isMonitoringForChanges());
    }

    @Override // io.delta.flink.source.internal.enumerator.processor.ContinuousTableProcessor
    public boolean isMonitoringForChanges() {
        return true;
    }

    private void processDiscoveredVersions(TableMonitorResult tableMonitorResult, Consumer<List<DeltaSourceSplit>> consumer, Throwable th) {
        if (th == null) {
            tableMonitorResult.getChanges().forEach(changesPerVersion -> {
                processVersion(consumer, changesPerVersion);
            });
            return;
        }
        LOG.error("Failed to enumerate files", th);
        if (!(th instanceof DeltaSourceException)) {
            throw DeltaSourceExceptions.tableMonitorException(SourceUtils.pathToString(this.deltaTablePath), th);
        }
        throw ((DeltaSourceException) th);
    }

    private void processVersion(Consumer<List<DeltaSourceSplit>> consumer, ChangesPerVersion<AddFile> changesPerVersion) {
        this.currentSnapshotVersion = changesPerVersion.getSnapshotVersion() + 1;
        consumer.accept(prepareSplits(changesPerVersion, path -> {
            return true;
        }));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1101549669:
                if (implMethodName.equals("lambda$processVersion$7f36e162$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("io/delta/flink/source/internal/file/AddFileEnumerator$SplitFilter") && serializedLambda.getFunctionalInterfaceMethodName().equals("test") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("io/delta/flink/source/internal/enumerator/processor/ChangesProcessor") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/core/fs/Path;)Z")) {
                    return path -> {
                        return true;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
