package io.delta.flink.sink.internal.writer;

import io.delta.flink.sink.internal.committables.DeltaCommittable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.sink.filesystem.DeltaBulkBucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.DeltaBulkPartWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.DeltaInProgressPart;
import org.apache.flink.streaming.api.functions.sink.filesystem.DeltaPendingFile;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
import org.apache.flink.table.utils.PartitionPathUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/delta/flink/sink/internal/writer/DeltaWriterBucket.class */
public class DeltaWriterBucket<IN> {
    private static final Logger LOG = LoggerFactory.getLogger(DeltaWriterBucket.class);
    public static final String RECORDS_WRITTEN_METRIC_NAME = "DeltaSinkRecordsWritten";
    public static final String BYTES_WRITTEN_METRIC_NAME = "DeltaSinkBytesWritten";
    private final String bucketId;
    private final Path bucketPath;
    private final OutputFileConfig outputFileConfig;
    private final String uniqueId;
    private final DeltaBulkBucketWriter<IN, String> bucketWriter;
    private final CheckpointRollingPolicy<IN, String> rollingPolicy;
    private final List<DeltaPendingFile> pendingFiles;
    private final LinkedHashMap<String, String> partitionSpec;
    private long partCounter;
    private long inProgressPartRecordCount;

    @Nullable
    private DeltaInProgressPart<IN> deltaInProgressPart;
    private final Counter recordsWrittenCounter;
    private final Counter bytesWrittenCounter;

    /* loaded from: input_file:io/delta/flink/sink/internal/writer/DeltaWriterBucket$DeltaWriterBucketFactory.class */
    public static class DeltaWriterBucketFactory {
        /* JADX INFO: Access modifiers changed from: package-private */
        public static <IN> DeltaWriterBucket<IN> getNewBucket(String str, Path path, DeltaBulkBucketWriter<IN, String> deltaBulkBucketWriter, CheckpointRollingPolicy<IN, String> checkpointRollingPolicy, OutputFileConfig outputFileConfig, MetricGroup metricGroup) {
            return new DeltaWriterBucket<>(str, path, deltaBulkBucketWriter, checkpointRollingPolicy, outputFileConfig, metricGroup);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static <IN> DeltaWriterBucket<IN> restoreBucket(DeltaBulkBucketWriter<IN, String> deltaBulkBucketWriter, CheckpointRollingPolicy<IN, String> checkpointRollingPolicy, DeltaWriterBucketState deltaWriterBucketState, OutputFileConfig outputFileConfig, MetricGroup metricGroup) {
            return new DeltaWriterBucket<>(deltaBulkBucketWriter, checkpointRollingPolicy, deltaWriterBucketState, outputFileConfig, metricGroup);
        }
    }

    private DeltaWriterBucket(String str, Path path, DeltaBulkBucketWriter<IN, String> deltaBulkBucketWriter, CheckpointRollingPolicy<IN, String> checkpointRollingPolicy, OutputFileConfig outputFileConfig, MetricGroup metricGroup) {
        this.pendingFiles = new ArrayList();
        this.bucketId = (String) Preconditions.checkNotNull(str);
        this.bucketPath = (Path) Preconditions.checkNotNull(path);
        this.bucketWriter = (DeltaBulkBucketWriter) Preconditions.checkNotNull(deltaBulkBucketWriter);
        this.rollingPolicy = (CheckpointRollingPolicy) Preconditions.checkNotNull(checkpointRollingPolicy);
        this.outputFileConfig = (OutputFileConfig) Preconditions.checkNotNull(outputFileConfig);
        this.partitionSpec = PartitionPathUtils.extractPartitionSpecFromPath(this.bucketPath);
        this.uniqueId = UUID.randomUUID().toString();
        this.partCounter = 0L;
        this.inProgressPartRecordCount = 0L;
        this.recordsWrittenCounter = metricGroup.counter(RECORDS_WRITTEN_METRIC_NAME);
        this.bytesWrittenCounter = metricGroup.counter(BYTES_WRITTEN_METRIC_NAME);
    }

    private DeltaWriterBucket(DeltaBulkBucketWriter<IN, String> deltaBulkBucketWriter, CheckpointRollingPolicy<IN, String> checkpointRollingPolicy, DeltaWriterBucketState deltaWriterBucketState, OutputFileConfig outputFileConfig, MetricGroup metricGroup) {
        this(deltaWriterBucketState.getBucketId(), deltaWriterBucketState.getBucketPath(), deltaBulkBucketWriter, checkpointRollingPolicy, outputFileConfig, metricGroup);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<DeltaCommittable> prepareCommit(boolean z, String str, long j) throws IOException {
        if (this.deltaInProgressPart != null) {
            if (!this.rollingPolicy.shouldRollOnCheckpoint(this.deltaInProgressPart.getBulkPartWriter()) && !z) {
                throw new RuntimeException("Unexpected behaviour. Delta writers should always roll part files on checkpoint. To resolve this issue verify behaviour of your rolling policy.");
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Closing in-progress part file for bucket id={} on checkpoint.", this.bucketId);
            }
            closePartFile();
        }
        ArrayList arrayList = new ArrayList();
        this.pendingFiles.forEach(deltaPendingFile -> {
            arrayList.add(new DeltaCommittable(deltaPendingFile, str, j));
        });
        this.pendingFiles.clear();
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DeltaWriterBucketState snapshotState(String str) {
        return new DeltaWriterBucketState(this.bucketId, this.bucketPath, str);
    }

    private DeltaInProgressPart<IN> rollPartFile(long j) throws IOException {
        closePartFile();
        Path assembleNewPartPath = assembleNewPartPath();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Opening new part file \"{}\" for bucket id={}.", assembleNewPartPath.getName(), this.bucketId);
        }
        DeltaBulkPartWriter openNewInProgressFile = this.bucketWriter.openNewInProgressFile(this.bucketId, assembleNewPartPath, j);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Successfully opened new part file \"{}\" for bucket id={}.", assembleNewPartPath.getName(), this.bucketId);
        }
        return new DeltaInProgressPart<>(assembleNewPartPath.getName(), openNewInProgressFile);
    }

    private void closePartFile() throws IOException {
        if (this.deltaInProgressPart != null) {
            this.deltaInProgressPart.getBulkPartWriter().closeWriter();
            long size = this.deltaInProgressPart.getBulkPartWriter().getSize();
            DeltaPendingFile deltaPendingFile = new DeltaPendingFile(this.partitionSpec, this.deltaInProgressPart.getFileName(), this.deltaInProgressPart.getBulkPartWriter().closeForCommit(), this.inProgressPartRecordCount, size, this.deltaInProgressPart.getBulkPartWriter().getLastUpdateTime());
            this.pendingFiles.add(deltaPendingFile);
            this.deltaInProgressPart = null;
            this.inProgressPartRecordCount = 0L;
            this.recordsWrittenCounter.inc(deltaPendingFile.getRecordCount());
            this.bytesWrittenCounter.inc(size);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void write(IN in, long j) throws IOException {
        if (this.deltaInProgressPart == null || this.rollingPolicy.shouldRollOnEvent(this.deltaInProgressPart.getBulkPartWriter(), in)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Opening new part file for bucket id={} due to element {}.", this.bucketId, in);
            }
            this.deltaInProgressPart = rollPartFile(j);
        }
        this.deltaInProgressPart.getBulkPartWriter().write(in, j);
        this.inProgressPartRecordCount++;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void merge(DeltaWriterBucket<IN> deltaWriterBucket) throws IOException {
        Preconditions.checkNotNull(deltaWriterBucket);
        Preconditions.checkState(Objects.equals(deltaWriterBucket.bucketPath, this.bucketPath));
        deltaWriterBucket.closePartFile();
        this.pendingFiles.addAll(deltaWriterBucket.pendingFiles);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Merging buckets for bucket id={}", this.bucketId);
        }
    }

    public boolean isActive() {
        return this.deltaInProgressPart != null || this.pendingFiles.size() > 0;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onProcessingTime(long j) throws IOException {
        if (this.deltaInProgressPart == null || !this.rollingPolicy.shouldRollOnProcessingTime(this.deltaInProgressPart.getBulkPartWriter(), j)) {
            return;
        }
        DeltaBulkPartWriter<IN, String> bulkPartWriter = this.deltaInProgressPart.getBulkPartWriter();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Bucket {} closing in-progress part file for part file id={} due to processing time rolling policy (in-progress file created @ {}, last updated @ {} and current time is {}).", new Object[]{this.bucketId, this.uniqueId, Long.valueOf(bulkPartWriter.getCreationTime()), Long.valueOf(bulkPartWriter.getLastUpdateTime()), Long.valueOf(j)});
        }
        closePartFile();
    }

    /*  JADX ERROR: Failed to decode insn: 0x0005: MOVE_MULTI, method: io.delta.flink.sink.internal.writer.DeltaWriterBucket.assembleNewPartPath():org.apache.flink.core.fs.Path
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[8]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:110)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    private org.apache.flink.core.fs.Path assembleNewPartPath() {
        /*
            r8 = this;
            r0 = r8
            r1 = r0
            long r1 = r1.partCounter
            // decode failed: arraycopy: source index -1 out of bounds for object array[8]
            r2 = 1
            long r1 = r1 + r2
            r0.partCounter = r1
            r9 = r-1
            org.apache.flink.core.fs.Path r-1 = new org.apache.flink.core.fs.Path
            r0 = r-1
            r1 = r8
            org.apache.flink.core.fs.Path r1 = r1.bucketPath
            java.lang.StringBuilder r2 = new java.lang.StringBuilder
            r3 = r2
            r3.<init>()
            r3 = r8
            org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig r3 = r3.outputFileConfig
            java.lang.String r3 = r3.getPartPrefix()
            java.lang.StringBuilder r2 = r2.append(r3)
            r3 = 45
            java.lang.StringBuilder r2 = r2.append(r3)
            r3 = r8
            java.lang.String r3 = r3.uniqueId
            java.lang.StringBuilder r2 = r2.append(r3)
            r3 = 45
            java.lang.StringBuilder r2 = r2.append(r3)
            r3 = r9
            java.lang.StringBuilder r2 = r2.append(r3)
            r3 = r8
            org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig r3 = r3.outputFileConfig
            java.lang.String r3 = r3.getPartSuffix()
            java.lang.StringBuilder r2 = r2.append(r3)
            java.lang.String r2 = r2.toString()
            r0.<init>(r1, r2)
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: io.delta.flink.sink.internal.writer.DeltaWriterBucket.assembleNewPartPath():org.apache.flink.core.fs.Path");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void disposePartFile() {
        if (this.deltaInProgressPart != null) {
            this.deltaInProgressPart.getBulkPartWriter().dispose();
        }
    }
}
