package io.delta.flink.source.internal.enumerator.monitor;

import io.delta.flink.source.internal.enumerator.processor.ActionProcessor;
import io.delta.standalone.DeltaLog;
import io.delta.standalone.VersionLog;
import io.delta.standalone.actions.AddFile;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;

/* loaded from: input_file:io/delta/flink/source/internal/enumerator/monitor/TableMonitor.class */
public class TableMonitor implements Callable<TableMonitorResult> {
    private final DeltaLog deltaLog;
    private final ActionProcessor actionProcessor;
    private final long maxDurationMillis;
    private long monitorVersion;

    public TableMonitor(DeltaLog deltaLog, long j, long j2, ActionProcessor actionProcessor) {
        this.deltaLog = deltaLog;
        this.monitorVersion = j;
        this.maxDurationMillis = j2;
        this.actionProcessor = actionProcessor;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public TableMonitorResult call() throws Exception {
        TableMonitorResult monitorForChanges = monitorForChanges(this.monitorVersion);
        List<ChangesPerVersion<AddFile>> changes = monitorForChanges.getChanges();
        if (!changes.isEmpty()) {
            this.monitorVersion = changes.get(changes.size() - 1).getSnapshotVersion() + 1;
        }
        return monitorForChanges;
    }

    public long getMonitorVersion() {
        return this.monitorVersion;
    }

    private TableMonitorResult monitorForChanges(long j) {
        Iterator<VersionLog> changes = this.deltaLog.getChanges(j, true);
        return changes.hasNext() ? processChanges(changes) : new TableMonitorResult(Collections.emptyList());
    }

    private TableMonitorResult processChanges(Iterator<VersionLog> it) {
        ArrayList arrayList = new ArrayList();
        long currentTimeMillis = System.currentTimeMillis() + this.maxDurationMillis;
        String uri = this.deltaLog.getPath().toUri().normalize().toString();
        while (it.hasNext()) {
            VersionLog next = it.next();
            arrayList.add(this.actionProcessor.processActions(new ChangesPerVersion<>(uri, next.getVersion(), next.getActions())));
            if (System.currentTimeMillis() >= currentTimeMillis) {
                break;
            }
        }
        return new TableMonitorResult(arrayList);
    }
}
