package io.pravega.segmentstore.storage.chunklayer;

import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.Exceptions;
import io.pravega.common.LoggerHelpers;
import io.pravega.common.Timer;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.io.BoundedInputStream;
import io.pravega.segmentstore.contracts.BadOffsetException;
import io.pravega.segmentstore.storage.SegmentHandle;
import io.pravega.segmentstore.storage.StorageFullException;
import io.pravega.segmentstore.storage.StorageNotPrimaryException;
import io.pravega.segmentstore.storage.chunklayer.SystemJournal;
import io.pravega.segmentstore.storage.metadata.ChunkMetadata;
import io.pravega.segmentstore.storage.metadata.MetadataTransaction;
import io.pravega.segmentstore.storage.metadata.SegmentMetadata;
import io.pravega.segmentstore.storage.metadata.StorageMetadataWritesFencedOutException;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/segmentstore/storage/chunklayer/WriteOperation.class */
class WriteOperation implements Callable<CompletableFuture<Void>> {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(WriteOperation.class);
    private final SegmentHandle handle;
    private final long offset;
    private final InputStream data;
    private final int length;
    private final ChunkedSegmentStorage chunkedSegmentStorage;
    private final long traceId;
    private volatile SegmentMetadata segmentMetadata;
    private volatile boolean isSystemSegment;
    private volatile boolean isFirstWriteAfterFailover;
    private volatile boolean skipOverFailedChunk;
    private final List<SystemJournal.SystemJournalRecord> systemLogRecords = new Vector();
    private final List<ChunkNameOffsetPair> newReadIndexEntries = new Vector();
    private final AtomicInteger chunksAddedCount = new AtomicInteger();
    private volatile boolean isCommitted = false;
    private final AtomicReference<ChunkMetadata> lastChunkMetadata = new AtomicReference<>(null);
    private volatile ChunkHandle chunkHandle = null;
    private final AtomicInteger bytesRemaining = new AtomicInteger();
    private final AtomicInteger totalBytesRead = new AtomicInteger();
    private final AtomicLong currentOffset = new AtomicLong();
    private volatile boolean didSegmentLayoutChange = false;
    private final Timer timer = new Timer();

    /* JADX INFO: Access modifiers changed from: package-private */
    public WriteOperation(ChunkedSegmentStorage chunkedSegmentStorage, SegmentHandle segmentHandle, long j, InputStream inputStream, int i) {
        this.handle = segmentHandle;
        this.offset = j;
        this.data = inputStream;
        this.length = i;
        this.chunkedSegmentStorage = chunkedSegmentStorage;
        this.traceId = LoggerHelpers.traceEnter(log, "write", new Object[]{segmentHandle, Long.valueOf(j), Integer.valueOf(i)});
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public CompletableFuture<Void> call() {
        checkPreconditions();
        log.debug("{} write - started op={}, segment={}, offset={} length={}.", new Object[]{this.chunkedSegmentStorage.getLogPrefix(), Integer.valueOf(System.identityHashCode(this)), this.handle.getSegmentName(), Long.valueOf(this.offset), Integer.valueOf(this.length)});
        String segmentName = this.handle.getSegmentName();
        return ChunkedSegmentStorage.tryWith(this.chunkedSegmentStorage.getMetadataStore().beginTransaction(false, this.handle.getSegmentName()), metadataTransaction -> {
            this.didSegmentLayoutChange = false;
            return metadataTransaction.get(segmentName).thenComposeAsync(storageMetadata -> {
                this.segmentMetadata = (SegmentMetadata) storageMetadata;
                checkState();
                this.isSystemSegment = this.chunkedSegmentStorage.isStorageSystemSegment(this.segmentMetadata);
                this.isFirstWriteAfterFailover = this.segmentMetadata.isOwnershipChanged();
                this.lastChunkMetadata.set(null);
                this.chunkHandle = null;
                this.bytesRemaining.set(this.length);
                this.currentOffset.set(this.offset);
                return getLastChunk(metadataTransaction).thenComposeAsync(r6 -> {
                    return writeData(metadataTransaction).thenComposeAsync(r6 -> {
                        return commit(metadataTransaction).thenApplyAsync(r3 -> {
                            return postCommit();
                        }, this.chunkedSegmentStorage.getExecutor()).exceptionally((Function<Throwable, ? extends U>) this::handleException);
                    }, this.chunkedSegmentStorage.getExecutor()).thenRunAsync(this::logEnd, this.chunkedSegmentStorage.getExecutor());
                }, this.chunkedSegmentStorage.getExecutor());
            }, this.chunkedSegmentStorage.getExecutor());
        }, this.chunkedSegmentStorage.getExecutor()).exceptionally(th -> {
            return (Void) handleException(th);
        });
    }

    private Object handleException(Throwable th) {
        log.debug("{} write - exception op={}, segment={}, offset={}, length={}.", new Object[]{this.chunkedSegmentStorage.getLogPrefix(), Integer.valueOf(System.identityHashCode(this)), this.handle.getSegmentName(), Long.valueOf(this.offset), Integer.valueOf(this.length)});
        Throwable unwrap = Exceptions.unwrap(th);
        if (unwrap instanceof StorageMetadataWritesFencedOutException) {
            throw new CompletionException((Throwable) new StorageNotPrimaryException(this.handle.getSegmentName(), unwrap));
        }
        if (unwrap instanceof ChunkStorageFullException) {
            throw new CompletionException((Throwable) new StorageFullException(this.handle.getSegmentName(), unwrap));
        }
        throw new CompletionException(unwrap);
    }

    private Object postCommit() {
        this.chunkedSegmentStorage.getReadIndexCache().addIndexEntries(this.handle.getSegmentName(), this.newReadIndexEntries);
        return null;
    }

    private CompletableFuture<Void> getLastChunk(MetadataTransaction metadataTransaction) {
        return null != this.segmentMetadata.getLastChunk() ? metadataTransaction.get(this.segmentMetadata.getLastChunk()).thenAcceptAsync(storageMetadata -> {
            this.lastChunkMetadata.set((ChunkMetadata) storageMetadata);
        }, this.chunkedSegmentStorage.getExecutor()) : CompletableFuture.completedFuture(null);
    }

    private void logEnd() {
        Duration elapsed = this.timer.getElapsed();
        ChunkStorageMetrics.SLTS_WRITE_LATENCY.reportSuccessEvent(elapsed);
        ChunkStorageMetrics.SLTS_WRITE_BYTES.add(this.length);
        ChunkStorageMetrics.SLTS_NUM_CHUNKS_ADDED.reportSuccessValue(this.chunksAddedCount.get());
        if (this.segmentMetadata.isStorageSystemSegment()) {
            ChunkStorageMetrics.SLTS_SYSTEM_WRITE_LATENCY.reportSuccessEvent(elapsed);
            ChunkStorageMetrics.SLTS_SYSTEM_WRITE_BYTES.add(this.length);
            ChunkStorageMetrics.SLTS_SYSTEM_NUM_CHUNKS_ADDED.reportSuccessValue(this.chunksAddedCount.get());
            this.chunkedSegmentStorage.reportMetricsForSystemSegment(this.segmentMetadata);
        }
        if (elapsed.toMillis() > 0) {
            ChunkStorageMetrics.SLTS_WRITE_INSTANT_TPUT.reportSuccessValue((1000 * this.length) / elapsed.toMillis());
        }
        if (this.chunkedSegmentStorage.getConfig().getLateWarningThresholdInMillis() < elapsed.toMillis()) {
            log.warn("{} write - late op={}, segment={}, offset={}, length={}, latency={}.", new Object[]{this.chunkedSegmentStorage.getLogPrefix(), Integer.valueOf(System.identityHashCode(this)), this.handle.getSegmentName(), Long.valueOf(this.offset), Integer.valueOf(this.length), Long.valueOf(elapsed.toMillis())});
        } else {
            log.debug("{} write - finished op={}, segment={}, offset={}, length={}, latency={}.", new Object[]{this.chunkedSegmentStorage.getLogPrefix(), Integer.valueOf(System.identityHashCode(this)), this.handle.getSegmentName(), Long.valueOf(this.offset), Integer.valueOf(this.length), Long.valueOf(elapsed.toMillis())});
        }
        LoggerHelpers.traceLeave(log, "write", this.traceId, new Object[]{this.handle, Long.valueOf(this.offset)});
    }

    private CompletableFuture<Void> commit(MetadataTransaction metadataTransaction) {
        if (this.isSystemSegment && this.systemLogRecords.size() > 0) {
            metadataTransaction.setExternalCommitStep(() -> {
                return this.chunkedSegmentStorage.getSystemJournal().commitRecords(this.systemLogRecords);
            });
        }
        return metadataTransaction.commit().thenRunAsync(() -> {
            this.isCommitted = true;
        }, this.chunkedSegmentStorage.getExecutor());
    }

    private CompletableFuture<Void> writeData(MetadataTransaction metadataTransaction) {
        byte[] bArr;
        InputStream inputStream;
        int chunkCount = this.segmentMetadata.getChunkCount();
        long length = this.segmentMetadata.getLength();
        if (shouldValidateData()) {
            bArr = readNBytes(this.data, this.length);
            inputStream = new ByteArrayInputStream(bArr);
        } else {
            bArr = null;
            inputStream = this.data;
        }
        InputStream inputStream2 = inputStream;
        byte[] bArr2 = bArr;
        return Futures.loop(() -> {
            return Boolean.valueOf(this.bytesRemaining.get() > 0);
        }, () -> {
            return openChunkToWrite(metadataTransaction).thenComposeAsync(r15 -> {
                long j = this.currentOffset.get();
                long lastChunkStartOffset = this.currentOffset.get() - this.segmentMetadata.getLastChunkStartOffset();
                return writeToChunk(metadataTransaction, this.segmentMetadata, inputStream2, this.chunkHandle, this.lastChunkMetadata.get(), lastChunkStartOffset, (int) Math.min(this.bytesRemaining.get(), this.segmentMetadata.getMaxRollinglength() - lastChunkStartOffset), bArr2).thenRunAsync(() -> {
                    if (this.segmentMetadata.isStorageSystemSegment()) {
                        return;
                    }
                    this.chunkedSegmentStorage.addBlockIndexEntriesForChunk(metadataTransaction, this.segmentMetadata.getName(), this.chunkHandle.getChunkName(), this.segmentMetadata.getLastChunkStartOffset(), j, this.segmentMetadata.getLength());
                }, this.chunkedSegmentStorage.getExecutor());
            }, this.chunkedSegmentStorage.getExecutor());
        }, this.chunkedSegmentStorage.getExecutor()).thenRunAsync(() -> {
            this.segmentMetadata.checkInvariants();
            Preconditions.checkState(this.totalBytesRead.get() == this.length, "totalBytesRead (%s) must match length(%s)", this.totalBytesRead.get(), this.length);
            Preconditions.checkState(chunkCount + this.chunksAddedCount.get() == this.segmentMetadata.getChunkCount(), "Number of chunks do not match. old value (%s) + number of chunks added (%s) must match current chunk count(%s)", Integer.valueOf(chunkCount), Integer.valueOf(this.chunksAddedCount.get()), Integer.valueOf(this.segmentMetadata.getChunkCount()));
            Preconditions.checkState(length + ((long) this.length) == this.segmentMetadata.getLength(), "New length must match. old value (%s) + length (%s) must match current chunk count(%s)", Long.valueOf(length), Integer.valueOf(this.length), Long.valueOf(this.segmentMetadata.getLength()));
            if (null != this.lastChunkMetadata.get()) {
                Preconditions.checkState(this.segmentMetadata.getLastChunkStartOffset() + this.lastChunkMetadata.get().getLength() == this.segmentMetadata.getLength(), "Last chunk start offset (%s) + Last chunk length (%s) must match segment length (%s)", Long.valueOf(this.segmentMetadata.getLastChunkStartOffset()), Long.valueOf(this.lastChunkMetadata.get().getLength()), Long.valueOf(this.segmentMetadata.getLength()));
            }
        }, this.chunkedSegmentStorage.getExecutor());
    }

    private CompletableFuture<Void> openChunkToWrite(MetadataTransaction metadataTransaction) {
        if (null == this.lastChunkMetadata.get() || this.lastChunkMetadata.get().getLength() >= this.segmentMetadata.getMaxRollinglength() || this.isFirstWriteAfterFailover || this.skipOverFailedChunk || !this.chunkedSegmentStorage.shouldAppend()) {
            return addNewChunk(metadataTransaction);
        }
        this.chunkHandle = ChunkHandle.writeHandle(this.lastChunkMetadata.get().getName());
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<Void> addNewChunk(MetadataTransaction metadataTransaction) {
        String newChunkName = this.chunkedSegmentStorage.getNewChunkName(this.handle.getSegmentName(), this.segmentMetadata.getLength());
        return this.chunkedSegmentStorage.getGarbageCollector().trackNewChunk(metadataTransaction.getVersion(), newChunkName).thenComposeAsync(r8 -> {
            return (this.chunkedSegmentStorage.shouldAppend() ? this.chunkedSegmentStorage.getChunkStorage().create(newChunkName) : CompletableFuture.completedFuture(ChunkHandle.writeHandle(newChunkName))).thenAcceptAsync(chunkHandle -> {
                this.chunkHandle = chunkHandle;
                String name = this.lastChunkMetadata.get() == null ? null : this.lastChunkMetadata.get().getName();
                this.lastChunkMetadata.set(updateMetadataForChunkAddition(metadataTransaction, this.segmentMetadata, newChunkName, this.isFirstWriteAfterFailover, this.lastChunkMetadata.get()));
                if (this.isSystemSegment) {
                    addSystemLogRecord(this.systemLogRecords, this.handle.getSegmentName(), this.segmentMetadata.getLength(), name, newChunkName);
                    metadataTransaction.markPinned(this.lastChunkMetadata.get());
                }
                this.newReadIndexEntries.add(new ChunkNameOffsetPair(this.segmentMetadata.getLength(), newChunkName));
                this.isFirstWriteAfterFailover = false;
                this.skipOverFailedChunk = false;
                this.didSegmentLayoutChange = true;
                this.chunksAddedCount.incrementAndGet();
                log.debug("{} write - New chunk added - op={}, segment={}, chunk={}, offset={}.", new Object[]{this.chunkedSegmentStorage.getLogPrefix(), Integer.valueOf(System.identityHashCode(this)), this.handle.getSegmentName(), newChunkName, Long.valueOf(this.segmentMetadata.getLength())});
            }, this.chunkedSegmentStorage.getExecutor());
        }, this.chunkedSegmentStorage.getExecutor());
    }

    private void checkState() {
        String segmentName = this.handle.getSegmentName();
        this.chunkedSegmentStorage.checkSegmentExists(segmentName, this.segmentMetadata);
        this.segmentMetadata.checkInvariants();
        this.chunkedSegmentStorage.checkNotSealed(segmentName, this.segmentMetadata);
        this.chunkedSegmentStorage.checkOwnership(segmentName, this.segmentMetadata);
        if (this.segmentMetadata.getLength() != this.offset) {
            throw new CompletionException((Throwable) new BadOffsetException(this.handle.getSegmentName(), this.segmentMetadata.getLength(), this.offset));
        }
    }

    private void checkPreconditions() {
        Preconditions.checkArgument(null != this.data, "data must not be null");
        Preconditions.checkArgument(!this.handle.isReadOnly(), "handle must not be read only. Segment = %s", this.handle.getSegmentName());
        Preconditions.checkArgument(this.offset >= 0, "offset must be non negative. Segment = %s", this.handle.getSegmentName());
        Preconditions.checkArgument(this.length >= 0, "length must be non negative. Segment = %s", this.handle.getSegmentName());
    }

    private ChunkMetadata updateMetadataForChunkAddition(MetadataTransaction metadataTransaction, SegmentMetadata segmentMetadata, String str, boolean z, ChunkMetadata chunkMetadata) {
        ChunkMetadata m42build = ChunkMetadata.builder().name(str).m42build();
        m42build.setActive(true);
        segmentMetadata.setLastChunk(str);
        if (chunkMetadata == null) {
            segmentMetadata.setFirstChunk(str);
        } else {
            chunkMetadata.setNextChunk(str);
            metadataTransaction.update(chunkMetadata);
        }
        segmentMetadata.setLastChunkStartOffset(segmentMetadata.getLength());
        if (z) {
            segmentMetadata.setOwnerEpoch(this.chunkedSegmentStorage.getEpoch());
            segmentMetadata.setOwnershipChanged(false);
            log.debug("{} write - First write after failover - op={}, segment={}.", new Object[]{this.chunkedSegmentStorage.getLogPrefix(), Integer.valueOf(System.identityHashCode(this)), segmentMetadata.getName()});
        }
        segmentMetadata.setChunkCount(segmentMetadata.getChunkCount() + 1);
        metadataTransaction.create(m42build);
        metadataTransaction.update(segmentMetadata);
        return m42build;
    }

    private void addSystemLogRecord(List<SystemJournal.SystemJournalRecord> list, String str, long j, String str2, String str3) {
        list.add(SystemJournal.ChunkAddedRecord.builder().segmentName(str).offset(j).oldChunkName(str2).newChunkName(str3).m22build());
    }

    private CompletableFuture<Void> writeToChunk(MetadataTransaction metadataTransaction, SegmentMetadata segmentMetadata, InputStream inputStream, ChunkHandle chunkHandle, ChunkMetadata chunkMetadata, long j, int i, byte[] bArr) {
        Preconditions.checkState(i > 0, "bytesCount must be positive. Segment=%s Chunk=%s offsetToWriteAt=%s bytesCount=%s", segmentMetadata, chunkMetadata, Long.valueOf(j), Integer.valueOf(i));
        InputStream boundedInputStream = new BoundedInputStream(inputStream, i);
        return (this.chunkedSegmentStorage.shouldAppend() ? this.chunkedSegmentStorage.getChunkStorage().write(chunkHandle, j, i, boundedInputStream) : this.chunkedSegmentStorage.getChunkStorage().createWithContent(chunkHandle.getChunkName(), i, boundedInputStream).thenApplyAsync(chunkHandle2 -> {
            return Integer.valueOf(i);
        }, this.chunkedSegmentStorage.getExecutor())).thenComposeAsync(num -> {
            Preconditions.checkState(num.intValue() == i, "bytesWritten (%s) must equal bytesCount(%s). Segment=%s Chunk=%s offsetToWriteAt=%s", new Object[]{num, Integer.valueOf(i), segmentMetadata, chunkMetadata, Long.valueOf(j)});
            CompletableFuture completedFuture = CompletableFuture.completedFuture(null);
            if (this.chunkedSegmentStorage.getConfig().isSelfCheckForMetadataEnabled()) {
                completedFuture = completedFuture.thenComposeAsync(r8 -> {
                    return this.chunkedSegmentStorage.getChunkStorage().getInfo(chunkHandle.getChunkName()).thenAcceptAsync(chunkInfo -> {
                        Preconditions.checkState(chunkInfo.getLength() == chunkMetadata.getLength() + ((long) num.intValue()), "Length of chunk does not match expected length. Expected (%s) Actual (%s)", chunkMetadata.getLength() + num.intValue(), chunkInfo.getLength());
                    }, this.chunkedSegmentStorage.getExecutor());
                }, this.chunkedSegmentStorage.getExecutor());
            }
            if (shouldValidateData()) {
                completedFuture = completedFuture.thenComposeAsync(r14 -> {
                    return validateWrittenData(chunkHandle, j, this.totalBytesRead.get(), i, bArr);
                }, this.chunkedSegmentStorage.getExecutor());
            }
            return completedFuture.thenApplyAsync(r3 -> {
                return num;
            }, this.chunkedSegmentStorage.getExecutor());
        }, this.chunkedSegmentStorage.getExecutor()).thenAcceptAsync(num2 -> {
            segmentMetadata.setLength(segmentMetadata.getLength() + num2.intValue());
            chunkMetadata.setLength(chunkMetadata.getLength() + num2.intValue());
            metadataTransaction.update(chunkMetadata);
            metadataTransaction.update(segmentMetadata);
            this.bytesRemaining.addAndGet(-num2.intValue());
            this.currentOffset.addAndGet(num2.intValue());
            this.totalBytesRead.addAndGet(num2.intValue());
            if (this.isSystemSegment) {
                this.systemLogRecords.add(SystemJournal.AppendRecord.builder().segmentName(segmentMetadata.getName()).chunkName(chunkMetadata.getName()).offset(j).length(num2.intValue()).m20build());
            }
        }, this.chunkedSegmentStorage.getExecutor()).handleAsync((r17, th) -> {
            if (null == th) {
                return r17;
            }
            Throwable unwrap = Exceptions.unwrap(th);
            if (!(unwrap instanceof InvalidOffsetException)) {
                throw new CompletionException(unwrap);
            }
            InvalidOffsetException invalidOffsetException = (InvalidOffsetException) unwrap;
            if (invalidOffsetException.getExpectedOffset() <= j) {
                throw new CompletionException((Throwable) new BadOffsetException(segmentMetadata.getName(), this.currentOffset.get() + ((InvalidOffsetException) unwrap).getExpectedOffset(), this.currentOffset.get() + ((InvalidOffsetException) unwrap).getGivenOffset()));
            }
            this.skipOverFailedChunk = true;
            log.debug("{} write - skipping partially written chunk op={}, segment={}, chunk={} expected={} given={}.", new Object[]{this.chunkedSegmentStorage.getLogPrefix(), Integer.valueOf(System.identityHashCode(this)), this.handle.getSegmentName(), chunkHandle.getChunkName(), Long.valueOf(invalidOffsetException.getExpectedOffset()), Long.valueOf(invalidOffsetException.getGivenOffset())});
            return null;
        }, this.chunkedSegmentStorage.getExecutor());
    }

    private CompletableFuture<Void> validateWrittenData(ChunkHandle chunkHandle, long j, int i, int i2, byte[] bArr) {
        byte[] bArr2 = new byte[i2];
        return readChunk(chunkHandle, j, i2, bArr2).thenAcceptAsync(r14 -> {
            int mismatch = Arrays.mismatch(bArr2, 0, i2, bArr, i, i + i2);
            Preconditions.checkState(-1 == mismatch, "Data read from chunk differs from data written at offset %s", j + mismatch);
        }, this.chunkedSegmentStorage.getExecutor());
    }

    private CompletableFuture<Void> readChunk(ChunkHandle chunkHandle, long j, int i, byte[] bArr) {
        AtomicInteger atomicInteger = new AtomicInteger(i);
        AtomicLong atomicLong = new AtomicLong(j);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        return Futures.loop(() -> {
            return Boolean.valueOf(atomicInteger.get() > 0);
        }, () -> {
            return this.chunkedSegmentStorage.getChunkStorage().read(chunkHandle, atomicLong.get(), atomicInteger.get(), bArr, atomicInteger2.get()).thenAccept(num -> {
                Preconditions.checkState(num.intValue() != 0, "Zero bytes read chunk=%s, fromOffset=%d", chunkHandle.getChunkName(), j);
                atomicInteger.addAndGet(-num.intValue());
                atomicLong.addAndGet(num.intValue());
                atomicInteger2.addAndGet(num.intValue());
            });
        }, this.chunkedSegmentStorage.getExecutor());
    }

    private static byte[] readNBytes(InputStream inputStream, int i) {
        return inputStream.readNBytes(i);
    }

    private boolean shouldValidateData() {
        return this.chunkedSegmentStorage.getConfig().isSelfCheckForDataEnabled() && this.chunkedSegmentStorage.getChunkStorage().supportsDataIntegrityCheck();
    }
}
