package io.delta.flink.sink.internal.writer;

import io.delta.flink.sink.internal.committables.DeltaCommittable;
import io.delta.flink.sink.internal.writer.DeltaWriterBucket;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.DeltaBulkBucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/delta/flink/sink/internal/writer/DeltaWriter.class */
public class DeltaWriter<IN> implements SinkWriter<IN, DeltaCommittable, DeltaWriterBucketState>, Sink.ProcessingTimeService.ProcessingTimeCallback {
    private static final Logger LOG = LoggerFactory.getLogger(DeltaWriter.class);
    public static final String RECORDS_OUT_METRIC_NAME = "DeltaSinkRecordsOut";
    public static final String NOOP_WRITER_STATE = "<noop-writer-state>";
    private final String appId;
    private long nextCheckpointId;
    private final DeltaBulkBucketWriter<IN, String> bucketWriter;
    private final CheckpointRollingPolicy<IN, String> rollingPolicy;
    private final Path basePath;
    private final BucketAssigner<IN, String> bucketAssigner;
    private final Sink.ProcessingTimeService processingTimeService;
    private final long bucketCheckInterval;
    private final Map<String, DeltaWriterBucket<IN>> activeBuckets = new HashMap();
    private final BucketerContext bucketerContext = new BucketerContext();
    private final OutputFileConfig outputFileConfig;
    private final MetricGroup metricGroup;
    private final Counter recordsOutCounter;

    /* loaded from: input_file:io/delta/flink/sink/internal/writer/DeltaWriter$BucketerContext.class */
    private static final class BucketerContext implements BucketAssigner.Context {

        @Nullable
        private Long elementTimestamp;
        private long currentWatermark;
        private long currentProcessingTime;

        private BucketerContext() {
            this.elementTimestamp = null;
            this.currentWatermark = Long.MIN_VALUE;
            this.currentProcessingTime = Long.MIN_VALUE;
        }

        void update(@Nullable Long l, long j, long j2) {
            this.elementTimestamp = l;
            this.currentWatermark = j;
            this.currentProcessingTime = j2;
        }

        public long currentProcessingTime() {
            return this.currentProcessingTime;
        }

        public long currentWatermark() {
            return this.currentWatermark;
        }

        @Nullable
        public Long timestamp() {
            return this.elementTimestamp;
        }
    }

    public DeltaWriter(Path path, BucketAssigner<IN, String> bucketAssigner, DeltaBulkBucketWriter<IN, String> deltaBulkBucketWriter, CheckpointRollingPolicy<IN, String> checkpointRollingPolicy, OutputFileConfig outputFileConfig, Sink.ProcessingTimeService processingTimeService, MetricGroup metricGroup, long j, String str, long j2) {
        this.basePath = (Path) Preconditions.checkNotNull(path);
        this.bucketAssigner = (BucketAssigner) Preconditions.checkNotNull(bucketAssigner);
        this.bucketWriter = (DeltaBulkBucketWriter) Preconditions.checkNotNull(deltaBulkBucketWriter);
        this.rollingPolicy = (CheckpointRollingPolicy) Preconditions.checkNotNull(checkpointRollingPolicy);
        this.outputFileConfig = (OutputFileConfig) Preconditions.checkNotNull(outputFileConfig);
        this.processingTimeService = (Sink.ProcessingTimeService) Preconditions.checkNotNull(processingTimeService);
        this.metricGroup = metricGroup;
        this.recordsOutCounter = metricGroup.counter(RECORDS_OUT_METRIC_NAME);
        Preconditions.checkArgument(j > 0, "Bucket checking interval for processing time should be positive.");
        this.bucketCheckInterval = j;
        this.appId = str;
        this.nextCheckpointId = j2;
    }

    public List<DeltaWriterBucketState> snapshotState() {
        Preconditions.checkState(this.bucketWriter != null, "sink has not been initialized");
        ArrayList arrayList = new ArrayList();
        Iterator<DeltaWriterBucket<IN>> it = this.activeBuckets.values().iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().snapshotState(this.appId));
        }
        if (arrayList.isEmpty()) {
            arrayList.add(new DeltaWriterBucketState(NOOP_WRITER_STATE, this.basePath, this.appId));
        }
        return arrayList;
    }

    private void incrementNextCheckpointId() {
        this.nextCheckpointId++;
    }

    long getNextCheckpointId() {
        return this.nextCheckpointId;
    }

    public void write(IN in, SinkWriter.Context context) throws IOException {
        this.bucketerContext.update(context.timestamp(), context.currentWatermark(), this.processingTimeService.getCurrentProcessingTime());
        getOrCreateBucketForBucketId((String) this.bucketAssigner.getBucketId(in, this.bucketerContext)).write(in, this.processingTimeService.getCurrentProcessingTime());
        this.recordsOutCounter.inc();
    }

    public List<DeltaCommittable> prepareCommit(boolean z) throws IOException {
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<String, DeltaWriterBucket<IN>>> it = this.activeBuckets.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, DeltaWriterBucket<IN>> next = it.next();
            if (next.getValue().isActive()) {
                arrayList.addAll(next.getValue().prepareCommit(z, this.appId, this.nextCheckpointId));
            } else {
                it.remove();
            }
        }
        incrementNextCheckpointId();
        return arrayList;
    }

    public void initializeState(List<DeltaWriterBucketState> list) throws IOException {
        Preconditions.checkNotNull(list, "The retrieved state was null.");
        for (DeltaWriterBucketState deltaWriterBucketState : list) {
            String bucketId = deltaWriterBucketState.getBucketId();
            if (!bucketId.equals(NOOP_WRITER_STATE)) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Restoring: {}", deltaWriterBucketState);
                }
                updateActiveBucketId(bucketId, DeltaWriterBucket.DeltaWriterBucketFactory.restoreBucket(this.bucketWriter, this.rollingPolicy, deltaWriterBucketState, this.outputFileConfig, this.metricGroup));
            }
        }
        registerNextBucketInspectionTimer();
    }

    private void updateActiveBucketId(String str, DeltaWriterBucket<IN> deltaWriterBucket) throws IOException {
        DeltaWriterBucket<IN> deltaWriterBucket2 = this.activeBuckets.get(str);
        if (deltaWriterBucket2 != null) {
            deltaWriterBucket2.merge(deltaWriterBucket);
        } else {
            this.activeBuckets.put(str, deltaWriterBucket);
        }
    }

    private DeltaWriterBucket<IN> getOrCreateBucketForBucketId(String str) {
        DeltaWriterBucket<IN> deltaWriterBucket = this.activeBuckets.get(str);
        if (deltaWriterBucket == null) {
            deltaWriterBucket = DeltaWriterBucket.DeltaWriterBucketFactory.getNewBucket(str, assembleBucketPath(str), this.bucketWriter, this.rollingPolicy, this.outputFileConfig, this.metricGroup);
            this.activeBuckets.put(str, deltaWriterBucket);
        }
        return deltaWriterBucket;
    }

    public void close() {
        if (this.activeBuckets != null) {
            this.activeBuckets.values().forEach((v0) -> {
                v0.disposePartFile();
            });
        }
    }

    private Path assembleBucketPath(String str) {
        return "".equals(str) ? this.basePath : new Path(this.basePath, str);
    }

    public void onProcessingTime(long j) throws IOException {
        Iterator<DeltaWriterBucket<IN>> it = this.activeBuckets.values().iterator();
        while (it.hasNext()) {
            it.next().onProcessingTime(j);
        }
        registerNextBucketInspectionTimer();
    }

    private void registerNextBucketInspectionTimer() {
        this.processingTimeService.registerProcessingTimer(this.processingTimeService.getCurrentProcessingTime() + this.bucketCheckInterval, this);
    }

    @VisibleForTesting
    Map<String, DeltaWriterBucket<IN>> getActiveBuckets() {
        return this.activeBuckets;
    }
}
