package io.pravega.segmentstore.storage.chunklayer;

import com.google.common.annotations.Beta;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.Exceptions;
import io.pravega.common.LoggerHelpers;
import io.pravega.common.Timer;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.concurrent.MultiKeySequentialProcessor;
import io.pravega.common.util.ImmutableDate;
import io.pravega.segmentstore.contracts.SegmentProperties;
import io.pravega.segmentstore.contracts.StreamSegmentExistsException;
import io.pravega.segmentstore.contracts.StreamSegmentInformation;
import io.pravega.segmentstore.contracts.StreamSegmentNotExistsException;
import io.pravega.segmentstore.contracts.StreamSegmentSealedException;
import io.pravega.segmentstore.storage.SegmentHandle;
import io.pravega.segmentstore.storage.SegmentRollingPolicy;
import io.pravega.segmentstore.storage.Storage;
import io.pravega.segmentstore.storage.StorageFullException;
import io.pravega.segmentstore.storage.StorageNotPrimaryException;
import io.pravega.segmentstore.storage.StorageWrapper;
import io.pravega.segmentstore.storage.chunklayer.GarbageCollector;
import io.pravega.segmentstore.storage.metadata.ChunkMetadata;
import io.pravega.segmentstore.storage.metadata.ChunkMetadataStore;
import io.pravega.segmentstore.storage.metadata.MetadataTransaction;
import io.pravega.segmentstore.storage.metadata.ReadIndexBlockMetadata;
import io.pravega.segmentstore.storage.metadata.SegmentMetadata;
import io.pravega.segmentstore.storage.metadata.StorageMetadataWritesFencedOutException;
import io.pravega.shared.NameUtils;
import java.io.InputStream;
import java.time.Duration;
import java.util.Arrays;
import java.util.ConcurrentModificationException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import javax.annotation.concurrent.GuardedBy;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Beta
/* loaded from: input_file:io/pravega/segmentstore/storage/chunklayer/ChunkedSegmentStorage.class */
public class ChunkedSegmentStorage implements Storage, StatsReporter {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ChunkedSegmentStorage.class);
    private final ChunkedSegmentStorageConfig config;
    private final ChunkMetadataStore metadataStore;
    private final ChunkStorage chunkStorage;
    private final Executor executor;
    private volatile long epoch;
    private final int containerId;
    private final SystemJournal systemJournal;
    private final ReadIndexCache readIndexCache;
    private final MultiKeySequentialProcessor<String> taskProcessor;
    private final GarbageCollector garbageCollector;
    private final ScheduledFuture<?> reporter;
    private ScheduledFuture<?> storageChecker;
    private AbstractTaskQueueManager<GarbageCollector.TaskInfo> taskQueue;
    private String logPrefix = "";

    @GuardedBy("activeSegments")
    private final HashSet<String> activeRequests = new HashSet<>();
    private final AtomicBoolean isStorageFull = new AtomicBoolean(false);
    private final AtomicLong storageUsed = new AtomicLong(0);
    private final AtomicBoolean closed = new AtomicBoolean(false);

    public ChunkedSegmentStorage(int i, ChunkStorage chunkStorage, ChunkMetadataStore chunkMetadataStore, ScheduledExecutorService scheduledExecutorService, ChunkedSegmentStorageConfig chunkedSegmentStorageConfig) {
        this.containerId = i;
        this.config = (ChunkedSegmentStorageConfig) Preconditions.checkNotNull(chunkedSegmentStorageConfig, "config");
        this.chunkStorage = (ChunkStorage) Preconditions.checkNotNull(chunkStorage, "chunkStorage");
        this.metadataStore = (ChunkMetadataStore) Preconditions.checkNotNull(chunkMetadataStore, "metadataStore");
        this.executor = (Executor) Preconditions.checkNotNull(scheduledExecutorService, "executor");
        this.readIndexCache = new ReadIndexCache(chunkedSegmentStorageConfig.getMaxIndexedSegments(), chunkedSegmentStorageConfig.getMaxIndexedChunks());
        this.taskProcessor = new MultiKeySequentialProcessor<>(this.executor);
        this.garbageCollector = new GarbageCollector(i, chunkStorage, chunkMetadataStore, chunkedSegmentStorageConfig, scheduledExecutorService, System::currentTimeMillis, duration -> {
            return Futures.delayedFuture(duration, scheduledExecutorService);
        });
        this.systemJournal = new SystemJournal(i, chunkStorage, chunkMetadataStore, this.garbageCollector, chunkedSegmentStorageConfig, scheduledExecutorService);
        this.reporter = scheduledExecutorService.scheduleAtFixedRate(this::report, 1000L, 1000L, TimeUnit.MILLISECONDS);
        if (chunkedSegmentStorageConfig.isSafeStorageSizeCheckEnabled()) {
            this.storageChecker = scheduledExecutorService.scheduleAtFixedRate(this::updateStorageStats, chunkedSegmentStorageConfig.getSafeStorageSizeCheckFrequencyInSeconds(), chunkedSegmentStorageConfig.getSafeStorageSizeCheckFrequencyInSeconds(), TimeUnit.SECONDS);
        }
    }

    public CompletableFuture<Void> bootstrap(SnapshotInfoStore snapshotInfoStore, AbstractTaskQueueManager<GarbageCollector.TaskInfo> abstractTaskQueueManager) {
        this.logPrefix = String.format("ChunkedSegmentStorage[%d]", Integer.valueOf(this.containerId));
        this.taskQueue = abstractTaskQueueManager;
        return this.systemJournal.bootstrap(this.epoch, snapshotInfoStore);
    }

    public CompletableFuture<Void> finishBootstrap() {
        return this.garbageCollector.initialize(this.taskQueue);
    }

    @Override // io.pravega.segmentstore.storage.ReadOnlyStorage
    public void initialize(long j) {
        this.epoch = j;
    }

    @Override // io.pravega.segmentstore.storage.Storage
    public CompletableFuture<SegmentHandle> openWrite(String str) {
        checkInitialized();
        return executeSerialized(() -> {
            long traceEnter = LoggerHelpers.traceEnter(log, "openWrite", new Object[]{str});
            Timer timer = new Timer();
            Preconditions.checkNotNull(str, "streamSegmentName");
            log.debug("{} openWrite - started segment={}.", this.logPrefix, str);
            return tryWith(this.metadataStore.beginTransaction(false, str), metadataTransaction -> {
                return metadataTransaction.get(str).thenComposeAsync(storageMetadata -> {
                    CompletableFuture<Void> completedFuture;
                    SegmentMetadata segmentMetadata = (SegmentMetadata) storageMetadata;
                    checkSegmentExists(str, segmentMetadata);
                    segmentMetadata.checkInvariants();
                    if (segmentMetadata.getOwnerEpoch() < this.epoch) {
                        log.debug("{} openWrite - Segment needs ownership change - segment={}.", this.logPrefix, segmentMetadata.getName());
                        completedFuture = claimOwnership(metadataTransaction, segmentMetadata);
                    } else {
                        completedFuture = CompletableFuture.completedFuture(null);
                    }
                    return completedFuture.thenApplyAsync(r15 -> {
                        checkOwnership(str, segmentMetadata);
                        SegmentStorageHandle writeHandle = SegmentStorageHandle.writeHandle(str);
                        log.debug("{} openWrite - finished segment={} latency={}.", new Object[]{this.logPrefix, str, Long.valueOf(timer.getElapsedMillis())});
                        LoggerHelpers.traceLeave(log, "openWrite", traceEnter, new Object[]{writeHandle});
                        return writeHandle;
                    }, this.executor);
                }, this.executor);
            }, this.executor).handleAsync((segmentStorageHandle, th) -> {
                if (null != th) {
                    log.debug("{} openWrite - exception segment={} latency={}.", new Object[]{this.logPrefix, str, Long.valueOf(timer.getElapsedMillis()), th});
                    handleException(str, th);
                }
                return segmentStorageHandle;
            }, this.executor);
        }, str);
    }

    public static ChunkedSegmentStorage getReference(Storage storage) {
        ChunkedSegmentStorage chunkedSegmentStorage = null;
        if (storage instanceof ChunkedSegmentStorage) {
            chunkedSegmentStorage = (ChunkedSegmentStorage) storage;
        }
        if (storage instanceof StorageWrapper) {
            Storage inner = ((StorageWrapper) storage).getInner();
            if (inner instanceof ChunkedSegmentStorage) {
                chunkedSegmentStorage = (ChunkedSegmentStorage) inner;
            }
        }
        return chunkedSegmentStorage;
    }

    private CompletableFuture<Void> claimOwnership(MetadataTransaction metadataTransaction, SegmentMetadata segmentMetadata) {
        Preconditions.checkState(!segmentMetadata.isStorageSystemSegment(), "claimOwnership called on system segment %s", segmentMetadata);
        String lastChunk = segmentMetadata.getLastChunk();
        return (!segmentMetadata.isAtomicWrite() && shouldAppend() && null != lastChunk ? metadataTransaction.get(lastChunk).thenComposeAsync(storageMetadata -> {
            ChunkMetadata chunkMetadata = (ChunkMetadata) storageMetadata;
            Preconditions.checkState(null != chunkMetadata, "last chunk metadata must not be null.");
            Preconditions.checkState(null != chunkMetadata.getName(), "Name of last chunk must not be null.");
            log.debug("{} claimOwnership - current last chunk - segment={}, last chunk={}, Length={}.", new Object[]{this.logPrefix, segmentMetadata.getName(), chunkMetadata.getName(), Long.valueOf(chunkMetadata.getLength())});
            return this.chunkStorage.getInfo(lastChunk).thenApplyAsync(chunkInfo -> {
                Preconditions.checkState(chunkInfo != null, "chunkInfo for last chunk must not be null.");
                Preconditions.checkState(chunkMetadata != null, "last chunk metadata must not be null.");
                if (chunkInfo.getLength() != chunkMetadata.getLength()) {
                    Preconditions.checkState(chunkInfo.getLength() > chunkMetadata.getLength(), "Length of last chunk on LTS must be greater than what is in metadata. Chunk=%s length=%s", chunkMetadata, chunkInfo.getLength());
                    long length = segmentMetadata.getLength();
                    chunkMetadata.setLength(chunkInfo.getLength());
                    segmentMetadata.setLength(segmentMetadata.getLastChunkStartOffset() + chunkMetadata.getLength());
                    if (!segmentMetadata.isStorageSystemSegment()) {
                        addBlockIndexEntriesForChunk(metadataTransaction, segmentMetadata.getName(), chunkMetadata.getName(), segmentMetadata.getLastChunkStartOffset(), length, segmentMetadata.getLength());
                    }
                    metadataTransaction.update(chunkMetadata);
                    log.debug("{} claimOwnership - Length of last chunk adjusted - segment={}, last chunk={}, Length={}.", new Object[]{this.logPrefix, segmentMetadata.getName(), chunkMetadata.getName(), Long.valueOf(chunkInfo.getLength())});
                }
                return true;
            }, this.executor).exceptionally((Function<Throwable, ? extends U>) th -> {
                Throwable unwrap = Exceptions.unwrap(th);
                if (!(unwrap instanceof ChunkNotFoundException)) {
                    throw new CompletionException(unwrap);
                }
                log.debug("{} claimOwnership - Last chunk was missing, failing fast - segment={}, last chunk={}.", new Object[]{this.logPrefix, segmentMetadata.getName(), chunkMetadata.getName()});
                metadataTransaction.update(segmentMetadata);
                return false;
            });
        }, this.executor) : CompletableFuture.completedFuture(true)).thenComposeAsync(bool -> {
            if (bool.booleanValue()) {
                segmentMetadata.setOwnerEpoch(this.epoch);
                segmentMetadata.setOwnershipChanged(true);
                segmentMetadata.setAtomicWrites(true);
            }
            metadataTransaction.update(segmentMetadata);
            return metadataTransaction.commit();
        }, this.executor);
    }

    @Override // io.pravega.segmentstore.storage.Storage
    public CompletableFuture<SegmentHandle> create(String str, SegmentRollingPolicy segmentRollingPolicy, Duration duration) {
        checkInitialized();
        return executeSerialized(() -> {
            long traceEnter = LoggerHelpers.traceEnter(log, "create", new Object[]{str, segmentRollingPolicy});
            Timer timer = new Timer();
            log.debug("{} create - started segment={}, rollingPolicy={}.", new Object[]{this.logPrefix, str, segmentRollingPolicy});
            return tryWith(this.metadataStore.beginTransaction(false, str), metadataTransaction -> {
                return metadataTransaction.get(str).thenComposeAsync(storageMetadata -> {
                    if (null != ((SegmentMetadata) storageMetadata)) {
                        throw new CompletionException((Throwable) new StreamSegmentExistsException(str));
                    }
                    SegmentMetadata m46build = SegmentMetadata.builder().name(str).maxRollinglength(segmentRollingPolicy.getMaxLength() == 0 ? SegmentRollingPolicy.NO_ROLLING.getMaxLength() : segmentRollingPolicy.getMaxLength()).ownerEpoch(this.epoch).m46build();
                    m46build.setActive(true);
                    m46build.setAtomicWrites(true);
                    metadataTransaction.create(m46build);
                    return metadataTransaction.commit().thenApplyAsync(r15 -> {
                        SegmentStorageHandle writeHandle = SegmentStorageHandle.writeHandle(str);
                        Duration elapsed = timer.getElapsed();
                        ChunkStorageMetrics.SLTS_CREATE_LATENCY.reportSuccessEvent(elapsed);
                        ChunkStorageMetrics.SLTS_CREATE_COUNT.inc();
                        log.debug("{} create - finished segment={}, rollingPolicy={}, latency={}.", new Object[]{this.logPrefix, str, segmentRollingPolicy, Long.valueOf(elapsed.toMillis())});
                        LoggerHelpers.traceLeave(log, "create", traceEnter, new Object[]{writeHandle});
                        return writeHandle;
                    }, this.executor);
                }, this.executor);
            }, this.executor).handleAsync((segmentStorageHandle, th) -> {
                if (null != th) {
                    log.debug("{} create - exception segment={}, rollingPolicy={}, latency={}.", new Object[]{this.logPrefix, str, segmentRollingPolicy, Long.valueOf(timer.getElapsedMillis()), th});
                    handleException(str, th);
                }
                return segmentStorageHandle;
            }, this.executor);
        }, str);
    }

    private void handleException(String str, Throwable th) {
        Throwable unwrap = Exceptions.unwrap(th);
        if (unwrap instanceof StorageMetadataWritesFencedOutException) {
            throw new CompletionException((Throwable) new StorageNotPrimaryException(str, unwrap));
        }
        if (!(unwrap instanceof ChunkStorageFullException)) {
            throw new CompletionException(unwrap);
        }
        throw new CompletionException((Throwable) new StorageFullException(str, unwrap));
    }

    @Override // io.pravega.segmentstore.storage.Storage
    public CompletableFuture<Void> write(SegmentHandle segmentHandle, long j, InputStream inputStream, int i, Duration duration) {
        checkInitialized();
        return null == segmentHandle ? CompletableFuture.failedFuture(new IllegalArgumentException("handle must not be null")) : (!isStorageFull() || isSegmentInSystemScope(segmentHandle)) ? executeSerialized(new WriteOperation(this, segmentHandle, j, inputStream, i), segmentHandle.getSegmentName()) : CompletableFuture.failedFuture(new StorageFullException(segmentHandle.getSegmentName()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isStorageSystemSegment(SegmentMetadata segmentMetadata) {
        return null != this.systemJournal && segmentMetadata.isStorageSystemSegment();
    }

    @Override // io.pravega.segmentstore.storage.Storage
    public CompletableFuture<Void> seal(SegmentHandle segmentHandle, Duration duration) {
        checkInitialized();
        return executeSerialized(() -> {
            long traceEnter = LoggerHelpers.traceEnter(log, "seal", new Object[]{segmentHandle});
            Timer timer = new Timer();
            log.debug("{} seal - started segment={}.", this.logPrefix, segmentHandle.getSegmentName());
            Preconditions.checkNotNull(segmentHandle, "handle");
            String segmentName = segmentHandle.getSegmentName();
            Preconditions.checkNotNull(segmentName, "streamSegmentName");
            Preconditions.checkArgument(!segmentHandle.isReadOnly(), "handle must not be read only. Segment=%s", segmentHandle.getSegmentName());
            return tryWith(this.metadataStore.beginTransaction(false, segmentHandle.getSegmentName()), metadataTransaction -> {
                return metadataTransaction.get(segmentName).thenComposeAsync(storageMetadata -> {
                    SegmentMetadata segmentMetadata = (SegmentMetadata) storageMetadata;
                    checkSegmentExists(segmentName, segmentMetadata);
                    checkOwnership(segmentName, segmentMetadata);
                    if (segmentMetadata.isSealed()) {
                        return CompletableFuture.completedFuture(null);
                    }
                    segmentMetadata.setSealed(true);
                    metadataTransaction.update(segmentMetadata);
                    return metadataTransaction.commit();
                }, this.executor).thenRunAsync(() -> {
                    log.debug("{} seal - finished segment={} latency={}.", new Object[]{this.logPrefix, segmentHandle.getSegmentName(), Long.valueOf(timer.getElapsedMillis())});
                    LoggerHelpers.traceLeave(log, "seal", traceEnter, new Object[]{segmentHandle});
                }, this.executor);
            }, this.executor).exceptionally(th -> {
                log.warn("{} seal - exception segment={} latency={}.", new Object[]{this.logPrefix, segmentHandle.getSegmentName(), Long.valueOf(timer.getElapsedMillis()), th});
                handleException(segmentName, th);
                return null;
            });
        }, segmentHandle.getSegmentName());
    }

    @Override // io.pravega.segmentstore.storage.Storage
    public CompletableFuture<Void> concat(SegmentHandle segmentHandle, long j, String str, Duration duration) {
        checkInitialized();
        return null == segmentHandle ? CompletableFuture.failedFuture(new IllegalArgumentException("handle must not be null")) : null == str ? CompletableFuture.failedFuture(new IllegalArgumentException("sourceSegment must not be null")) : (!isStorageFull() || isSegmentInSystemScope(segmentHandle)) ? executeSerialized(new ConcatOperation(this, segmentHandle, j, str), segmentHandle.getSegmentName(), str) : CompletableFuture.failedFuture(new StorageFullException(segmentHandle.getSegmentName()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean shouldAppend() {
        return this.chunkStorage.supportsAppend() && this.config.isAppendEnabled();
    }

    public CompletableFuture<Void> defrag(MetadataTransaction metadataTransaction, SegmentMetadata segmentMetadata, String str, String str2, List<String> list, List<ChunkNameOffsetPair> list2, long j) {
        return new DefragmentOperation(this, metadataTransaction, segmentMetadata, str, str2, list, list2, j).call();
    }

    @Override // io.pravega.segmentstore.storage.Storage
    public CompletableFuture<Void> delete(SegmentHandle segmentHandle, Duration duration) {
        checkInitialized();
        return null == segmentHandle ? CompletableFuture.failedFuture(new IllegalArgumentException("handle must not be null")) : executeSerialized(() -> {
            long traceEnter = LoggerHelpers.traceEnter(log, "delete", new Object[]{segmentHandle});
            log.debug("{} delete - started segment={}.", this.logPrefix, segmentHandle.getSegmentName());
            Timer timer = new Timer();
            String segmentName = segmentHandle.getSegmentName();
            return tryWith(this.metadataStore.beginTransaction(false, segmentName), metadataTransaction -> {
                return metadataTransaction.get(segmentName).thenComposeAsync(storageMetadata -> {
                    SegmentMetadata segmentMetadata = (SegmentMetadata) storageMetadata;
                    checkSegmentExists(segmentName, segmentMetadata);
                    checkOwnership(segmentName, segmentMetadata);
                    segmentMetadata.setActive(false);
                    metadataTransaction.update(segmentMetadata);
                    return this.garbageCollector.addSegmentToGarbage(metadataTransaction.getVersion(), segmentName).thenComposeAsync(r15 -> {
                        return metadataTransaction.commit().thenRunAsync(() -> {
                            this.readIndexCache.remove(segmentName);
                            Duration elapsed = timer.getElapsed();
                            ChunkStorageMetrics.SLTS_DELETE_LATENCY.reportSuccessEvent(elapsed);
                            ChunkStorageMetrics.SLTS_DELETE_COUNT.inc();
                            log.debug("{} delete - finished segment={}, latency={}.", new Object[]{this.logPrefix, segmentHandle.getSegmentName(), Long.valueOf(elapsed.toMillis())});
                            LoggerHelpers.traceLeave(log, "delete", traceEnter, new Object[]{segmentHandle});
                        }, this.executor);
                    }, this.executor);
                }, this.executor);
            }, this.executor).exceptionally(th -> {
                log.warn("{} delete - exception segment={}, latency={}.", new Object[]{this.logPrefix, segmentHandle.getSegmentName(), Long.valueOf(timer.getElapsedMillis()), th});
                handleException(segmentName, th);
                return null;
            });
        }, segmentHandle.getSegmentName());
    }

    @Override // io.pravega.segmentstore.storage.Storage
    public CompletableFuture<Void> truncate(SegmentHandle segmentHandle, long j, Duration duration) {
        checkInitialized();
        return null == segmentHandle ? CompletableFuture.failedFuture(new IllegalArgumentException("handle must not be null")) : executeSerialized(new TruncateOperation(this, segmentHandle, j), segmentHandle.getSegmentName());
    }

    @Override // io.pravega.segmentstore.storage.Storage
    public boolean supportsTruncation() {
        return true;
    }

    @Override // io.pravega.segmentstore.storage.Storage
    public boolean supportsAtomicWrites() {
        return true;
    }

    @Override // io.pravega.segmentstore.storage.Storage
    public CompletableFuture<Iterator<SegmentProperties>> listSegments() {
        return this.metadataStore.getAllEntries().thenApplyAsync(stream -> {
            return stream.filter(storageMetadata -> {
                return (storageMetadata instanceof SegmentMetadata) && ((SegmentMetadata) storageMetadata).isActive();
            }).map(storageMetadata2 -> {
                SegmentMetadata segmentMetadata = (SegmentMetadata) storageMetadata2;
                return StreamSegmentInformation.builder().name(segmentMetadata.getName()).sealed(segmentMetadata.isSealed()).length(segmentMetadata.getLength()).startOffset(segmentMetadata.getStartOffset()).lastModified(new ImmutableDate(segmentMetadata.getLastModified())).build();
            }).iterator();
        }, this.executor);
    }

    @Override // io.pravega.segmentstore.storage.ReadOnlyStorage
    public CompletableFuture<SegmentHandle> openRead(String str) {
        checkInitialized();
        return executeParallel(() -> {
            long traceEnter = LoggerHelpers.traceEnter(log, "openRead", new Object[]{str});
            Timer timer = new Timer();
            Preconditions.checkNotNull(str, "streamSegmentName");
            log.debug("{} openRead - started segment={}.", this.logPrefix, str);
            return tryWith(this.metadataStore.beginTransaction(false, str), metadataTransaction -> {
                return metadataTransaction.get(str).thenComposeAsync(storageMetadata -> {
                    CompletableFuture completedFuture;
                    SegmentMetadata segmentMetadata = (SegmentMetadata) storageMetadata;
                    checkSegmentExists(str, segmentMetadata);
                    segmentMetadata.checkInvariants();
                    if (segmentMetadata.getOwnerEpoch() < this.epoch) {
                        log.debug("{} openRead - Segment needs ownership change. segment={}.", this.logPrefix, segmentMetadata.getName());
                        completedFuture = executeSerialized(() -> {
                            return claimOwnership(metadataTransaction, segmentMetadata);
                        }, str);
                    } else {
                        completedFuture = CompletableFuture.completedFuture(null);
                    }
                    return completedFuture.thenApplyAsync(r14 -> {
                        SegmentStorageHandle readHandle = SegmentStorageHandle.readHandle(str);
                        log.debug("{} openRead - finished segment={} latency={}.", new Object[]{this.logPrefix, str, Long.valueOf(timer.getElapsedMillis())});
                        LoggerHelpers.traceLeave(log, "openRead", traceEnter, new Object[]{readHandle});
                        return readHandle;
                    }, this.executor);
                }, this.executor);
            }, this.executor).handleAsync((segmentStorageHandle, th) -> {
                if (null != th) {
                    log.debug("{} openRead - exception segment={} latency={}.", new Object[]{this.logPrefix, str, Long.valueOf(timer.getElapsedMillis()), th});
                    handleException(str, th);
                }
                return segmentStorageHandle;
            }, this.executor);
        }, str);
    }

    @Override // io.pravega.segmentstore.storage.ReadOnlyStorage
    public CompletableFuture<Integer> read(SegmentHandle segmentHandle, long j, byte[] bArr, int i, int i2, Duration duration) {
        checkInitialized();
        return executeParallel(new ReadOperation(this, segmentHandle, j, bArr, i, i2), segmentHandle.getSegmentName());
    }

    @Override // io.pravega.segmentstore.storage.ReadOnlyStorage
    public CompletableFuture<SegmentProperties> getStreamSegmentInfo(String str, Duration duration) {
        checkInitialized();
        return executeParallel(() -> {
            long traceEnter = LoggerHelpers.traceEnter(log, "getStreamSegmentInfo", new Object[]{str});
            Timer timer = new Timer();
            Preconditions.checkNotNull(str, "streamSegmentName");
            log.debug("{} getStreamSegmentInfo - started segment={}.", this.logPrefix, str);
            return tryWith(this.metadataStore.beginTransaction(true, str), metadataTransaction -> {
                return metadataTransaction.get(str).thenApplyAsync(storageMetadata -> {
                    SegmentMetadata segmentMetadata = (SegmentMetadata) storageMetadata;
                    checkSegmentExists(str, segmentMetadata);
                    segmentMetadata.checkInvariants();
                    StreamSegmentInformation build = StreamSegmentInformation.builder().name(str).sealed(segmentMetadata.isSealed()).length(segmentMetadata.getLength()).startOffset(segmentMetadata.getStartOffset()).lastModified(new ImmutableDate(segmentMetadata.getLastModified())).build();
                    log.debug("{} getStreamSegmentInfo - finished segment={} latency={}.", new Object[]{this.logPrefix, str, Long.valueOf(timer.getElapsedMillis())});
                    LoggerHelpers.traceLeave(log, "getStreamSegmentInfo", traceEnter, new Object[]{build});
                    return build;
                }, this.executor);
            }, this.executor).handleAsync((streamSegmentInformation, th) -> {
                if (null != th) {
                    log.debug("{} getStreamSegmentInfo - exception segment={}.", new Object[]{this.logPrefix, str, th});
                    handleException(str, th);
                }
                return streamSegmentInformation;
            }, this.executor);
        }, str);
    }

    @Override // io.pravega.segmentstore.storage.ReadOnlyStorage
    public CompletableFuture<Boolean> exists(String str, Duration duration) {
        checkInitialized();
        return executeParallel(() -> {
            long traceEnter = LoggerHelpers.traceEnter(log, "exists", new Object[]{str});
            Preconditions.checkNotNull(str, "streamSegmentName");
            return tryWith(this.metadataStore.beginTransaction(true, str), metadataTransaction -> {
                return metadataTransaction.get(str).thenApplyAsync(storageMetadata -> {
                    SegmentMetadata segmentMetadata = (SegmentMetadata) storageMetadata;
                    boolean z = segmentMetadata != null && segmentMetadata.isActive();
                    LoggerHelpers.traceLeave(log, "exists", traceEnter, new Object[]{Boolean.valueOf(z)});
                    return Boolean.valueOf(z);
                }, this.executor);
            }, this.executor);
        }, str);
    }

    @Override // io.pravega.segmentstore.storage.chunklayer.StatsReporter
    public void report() {
        this.garbageCollector.report();
        this.metadataStore.report();
        this.chunkStorage.report();
        this.readIndexCache.report();
        ChunkStorageMetrics.DYNAMIC_LOGGER.reportGaugeValue("pravega.segmentstore.storage.used_bytes", Long.valueOf(this.storageUsed.get()), new String[0]);
        ChunkStorageMetrics.DYNAMIC_LOGGER.reportGaugeValue("pravega.segmentstore.storage.used_percentage", Double.valueOf((100.0d * this.storageUsed.get()) / this.config.getMaxSafeStorageSize()), new String[0]);
    }

    CompletableFuture<Void> updateStorageStats() {
        return this.chunkStorage.getUsedSpace().thenAcceptAsync(l -> {
            this.storageUsed.set(l.longValue());
            boolean z = l.longValue() >= this.config.getMaxSafeStorageSize();
            if (z) {
                if (!this.isStorageFull.get()) {
                    log.warn("{} STORAGE FULL. ENTERING READ ONLY MODE. Any non-critical writes will be rejected.", this.logPrefix);
                }
                log.warn("{} STORAGE FULL - used={} total={}.", new Object[]{this.logPrefix, l, Long.valueOf(this.config.getMaxSafeStorageSize())});
            } else if (this.isStorageFull.get()) {
                log.info("{} STORAGE AVAILABLE. LEAVING READ ONLY MODE. Restoring normal writes", this.logPrefix);
            }
            this.isStorageFull.set(z);
        }, this.executor).exceptionally(th -> {
            log.warn("{} updateStorageStats.", this.logPrefix, th);
            return null;
        });
    }

    boolean isSafeMode() {
        return this.isStorageFull.get();
    }

    @Override // io.pravega.segmentstore.storage.Storage, io.pravega.segmentstore.storage.ReadOnlyStorage, java.lang.AutoCloseable
    public void close() {
        close("metadataStore", this.metadataStore);
        close("garbageCollector", this.garbageCollector);
        close("taskQueue", this.taskQueue);
        close("chunkStorage", this.chunkStorage);
        this.reporter.cancel(true);
        if (null != this.storageChecker) {
            this.storageChecker.cancel(true);
        }
        this.closed.set(true);
    }

    private void close(String str, AutoCloseable autoCloseable) {
        try {
            log.debug("{} Closing {}", this.logPrefix, str);
            if (null != autoCloseable) {
                autoCloseable.close();
            }
            log.info("{} Closed {}", this.logPrefix, str);
        } catch (Exception e) {
            log.error("{} Error while closing {}", new Object[]{this.logPrefix, str, e});
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addBlockIndexEntriesForChunk(MetadataTransaction metadataTransaction, String str, String str2, long j, long j2, long j3) {
        Preconditions.checkState(j <= j2, "chunkStartOffset must be less than or equal to fromOffset. Segment=%s Chunk=%s chunkStartOffset=%s fromOffset=%s", str, str2, Long.valueOf(j), Long.valueOf(j2));
        Preconditions.checkState(j2 <= j3, "fromOffset must be less than or equal to toOffset. Segment=%s Chunk=%s toOffset=%s fromOffset=%s", str, str2, Long.valueOf(j3), Long.valueOf(j2));
        long indexBlockSize = this.config.getIndexBlockSize();
        long j4 = (j2 / indexBlockSize) * indexBlockSize;
        while (true) {
            long j5 = j4;
            if (j5 >= j3) {
                return;
            }
            if (j5 >= j) {
                ReadIndexBlockMetadata m44build = ReadIndexBlockMetadata.builder().name(NameUtils.getSegmentReadIndexBlockName(str, j5)).startOffset(j).chunkName(str2).status(1).m44build();
                metadataTransaction.create(m44build);
                log.debug("{} adding new block index entry segment={}, entry={}.", new Object[]{this.logPrefix, str, m44build});
            }
            j4 = j5 + indexBlockSize;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void deleteBlockIndexEntriesForChunk(MetadataTransaction metadataTransaction, String str, long j, long j2) {
        this.garbageCollector.deleteBlockIndexEntriesForChunk(metadataTransaction, str, j, j2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reportMetricsForSystemSegment(SegmentMetadata segmentMetadata) {
        String replace = segmentMetadata.getName().substring(segmentMetadata.getName().lastIndexOf(47) + 1).replace('$', '_');
        ChunkStorageMetrics.DYNAMIC_LOGGER.reportGaugeValue("pravega.segmentstore.storage.size." + replace, Long.valueOf(segmentMetadata.getLength() - segmentMetadata.getStartOffset()), new String[0]);
        ChunkStorageMetrics.DYNAMIC_LOGGER.reportGaugeValue("pravega.segmentstore.storage.num_chunks." + replace, Integer.valueOf(segmentMetadata.getChunkCount()), new String[0]);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <R> CompletableFuture<R> executeSerialized(Callable<CompletableFuture<R>> callable, String... strArr) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        if (strArr.length != 1 || !this.systemJournal.isStorageSystemSegment(strArr[0])) {
            return this.taskProcessor.add(Arrays.asList(strArr), () -> {
                return executeExclusive(callable, strArr);
            });
        }
        String[] systemSegments = this.systemJournal.getSystemSegments();
        return this.taskProcessor.add(Arrays.asList(systemSegments), () -> {
            return executeExclusive(callable, systemSegments);
        });
    }

    private <R> CompletableFuture<R> executeExclusive(Callable<CompletableFuture<R>> callable, String... strArr) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        acquire(strArr);
        atomicBoolean.set(true);
        return CompletableFuture.completedFuture(null).thenComposeAsync(obj -> {
            Exceptions.checkNotClosed(this.closed.get(), this);
            try {
                return (CompletionStage) callable.call();
            } catch (CompletionException e) {
                throw new CompletionException(Exceptions.unwrap(e));
            } catch (Exception e2) {
                throw new CompletionException(e2);
            }
        }, this.executor).whenCompleteAsync((obj2, th) -> {
            if (atomicBoolean.get()) {
                release(strArr);
            }
        }, this.executor);
    }

    private void acquire(String... strArr) {
        synchronized (this.activeRequests) {
            for (String str : strArr) {
                if (this.activeRequests.contains(str)) {
                    log.error("{} Concurrent modifications for Segment={}", this.logPrefix, str);
                    throw new ConcurrentModificationException(String.format("Concurrent modifications not allowed. Segment=%s", str));
                }
            }
            for (String str2 : strArr) {
                this.activeRequests.add(str2);
            }
        }
    }

    private void release(String... strArr) {
        synchronized (this.activeRequests) {
            for (String str : strArr) {
                this.activeRequests.remove(str);
            }
        }
    }

    <R> CompletableFuture<R> executeParallel(Callable<CompletableFuture<R>> callable, String... strArr) {
        return CompletableFuture.completedFuture(null).thenComposeAsync(obj -> {
            Exceptions.checkNotClosed(this.closed.get(), this);
            try {
                return (CompletionStage) callable.call();
            } catch (CompletionException e) {
                throw new CompletionException(Exceptions.unwrap(e));
            } catch (Exception e2) {
                throw new CompletionException(e2);
            }
        }, this.executor);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <T extends AutoCloseable, R> CompletableFuture<R> tryWith(T t, Function<T, CompletableFuture<R>> function, Executor executor) {
        return function.apply(t).whenCompleteAsync((obj, th) -> {
            try {
                t.close();
            } catch (Exception e) {
                throw new CompletionException(e);
            }
        }, executor);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void checkSegmentExists(String str, SegmentMetadata segmentMetadata) {
        if (null == segmentMetadata || !segmentMetadata.isActive()) {
            throw new CompletionException((Throwable) new StreamSegmentNotExistsException(str));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void checkOwnership(String str, SegmentMetadata segmentMetadata) {
        if (segmentMetadata.getOwnerEpoch() > this.epoch) {
            throw new CompletionException((Throwable) new StorageNotPrimaryException(str));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void checkNotSealed(String str, SegmentMetadata segmentMetadata) {
        if (segmentMetadata.isSealed()) {
            throw new CompletionException((Throwable) new StreamSegmentSealedException(str));
        }
    }

    boolean isStorageFull() {
        return this.config.isSafeStorageSizeCheckEnabled() && this.isStorageFull.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isSegmentInSystemScope(SegmentHandle segmentHandle) {
        return segmentHandle.getSegmentName().startsWith("_system/");
    }

    private void checkInitialized() {
        Preconditions.checkState(0 != this.epoch, "epoch must not be zero");
        Preconditions.checkState(!this.closed.get(), "ChunkedSegmentStorage instance must not be closed");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getNewChunkName(String str, long j) {
        return NameUtils.getSegmentChunkName(str, getEpoch(), j);
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public ChunkedSegmentStorageConfig getConfig() {
        return this.config;
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public ChunkMetadataStore getMetadataStore() {
        return this.metadataStore;
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public ChunkStorage getChunkStorage() {
        return this.chunkStorage;
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public Executor getExecutor() {
        return this.executor;
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public long getEpoch() {
        return this.epoch;
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public int getContainerId() {
        return this.containerId;
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public SystemJournal getSystemJournal() {
        return this.systemJournal;
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public ReadIndexCache getReadIndexCache() {
        return this.readIndexCache;
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public String getLogPrefix() {
        return this.logPrefix;
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public GarbageCollector getGarbageCollector() {
        return this.garbageCollector;
    }
}
