package io.pravega.segmentstore.storage.impl.bookkeeper;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.Exceptions;
import io.pravega.common.LoggerHelpers;
import io.pravega.common.ObjectClosedException;
import io.pravega.common.Timer;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.util.ArrayView;
import io.pravega.common.util.CloseableIterator;
import io.pravega.common.util.RetriesExhaustedException;
import io.pravega.common.util.Retry;
import io.pravega.segmentstore.storage.DataLogDisabledException;
import io.pravega.segmentstore.storage.DataLogInitializationException;
import io.pravega.segmentstore.storage.DataLogNotAvailableException;
import io.pravega.segmentstore.storage.DataLogWriterNotPrimaryException;
import io.pravega.segmentstore.storage.DurableDataLog;
import io.pravega.segmentstore.storage.DurableDataLogException;
import io.pravega.segmentstore.storage.LogAddress;
import io.pravega.segmentstore.storage.QueueStats;
import io.pravega.segmentstore.storage.ThrottleSourceListener;
import io.pravega.segmentstore.storage.WriteFailureException;
import io.pravega.segmentstore.storage.WriteSettings;
import io.pravega.segmentstore.storage.WriteTooLongException;
import io.pravega.segmentstore.storage.impl.bookkeeper.BookKeeperMetrics;
import io.pravega.segmentstore.storage.impl.bookkeeper.WriteQueue;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.api.BackgroundPathAndBytesable;
import org.apache.curator.framework.api.WatchPathable;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:io/pravega/segmentstore/storage/impl/bookkeeper/BookKeeperLog.class */
class BookKeeperLog implements DurableDataLog {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log;
    private static final long REPORT_INTERVAL = 1000;
    private final String logNodePath;
    private final CuratorFramework zkClient;
    private final BookKeeper bookKeeper;
    private final BookKeeperConfig config;
    private final ScheduledExecutorService executorService;
    private final AtomicBoolean closed;
    private final Object lock = new Object();
    private final String traceObjectId;

    @GuardedBy("lock")
    private WriteLedger writeLedger;

    @GuardedBy("lock")
    private LogMetadata logMetadata;
    private final WriteQueue writes;
    private final SequentialAsyncProcessor writeProcessor;
    private final SequentialAsyncProcessor rolloverProcessor;
    private final BookKeeperMetrics.BookKeeperLog metrics;
    private final ScheduledFuture<?> metricReporter;

    @GuardedBy("queueStateChangeListeners")
    private final HashSet<ThrottleSourceListener> queueStateChangeListeners;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public BookKeeperLog(int i, CuratorFramework curatorFramework, BookKeeper bookKeeper, BookKeeperConfig bookKeeperConfig, ScheduledExecutorService scheduledExecutorService) {
        Preconditions.checkArgument(i >= 0, "containerId must be a non-negative integer.");
        this.zkClient = (CuratorFramework) Preconditions.checkNotNull(curatorFramework, "zkClient");
        this.bookKeeper = (BookKeeper) Preconditions.checkNotNull(bookKeeper, "bookKeeper");
        this.config = (BookKeeperConfig) Preconditions.checkNotNull(bookKeeperConfig, "config");
        this.executorService = (ScheduledExecutorService) Preconditions.checkNotNull(scheduledExecutorService, "executorService");
        this.closed = new AtomicBoolean();
        this.logNodePath = HierarchyUtils.getPath(i, this.config.getZkHierarchyDepth());
        this.traceObjectId = String.format("Log[%d]", Integer.valueOf(i));
        this.writes = new WriteQueue();
        Retry.RetryAndThrowBase<? extends Exception> createRetryPolicy = createRetryPolicy(this.config.getMaxWriteAttempts(), this.config.getBkWriteTimeoutMillis());
        this.writeProcessor = new SequentialAsyncProcessor(this::processWritesSync, createRetryPolicy, this::handleWriteProcessorFailures, this.executorService);
        this.rolloverProcessor = new SequentialAsyncProcessor(this::rollover, createRetryPolicy, this::handleRolloverFailure, this.executorService);
        this.metrics = new BookKeeperMetrics.BookKeeperLog(i);
        this.metricReporter = this.executorService.scheduleWithFixedDelay(this::reportMetrics, REPORT_INTERVAL, REPORT_INTERVAL, TimeUnit.MILLISECONDS);
        this.queueStateChangeListeners = new HashSet<>();
    }

    private Retry.RetryAndThrowBase<? extends Exception> createRetryPolicy(int i, int i2) {
        return Retry.withExpBackoff(i2 / i, 2, i, i2 * i).retryWhen(th -> {
            return true;
        });
    }

    private void handleWriteProcessorFailures(Throwable th) {
        log.warn("{}: Too many write processor failures; closing.", this.traceObjectId, th);
        close();
    }

    private void handleRolloverFailure(Throwable th) {
        log.warn("{}: Too many rollover failures; closing.", this.traceObjectId, th);
        close();
    }

    public void close() {
        WriteLedger writeLedger;
        if (this.closed.getAndSet(true)) {
            return;
        }
        this.metricReporter.cancel(true);
        this.metrics.close();
        this.rolloverProcessor.close();
        this.writeProcessor.close();
        synchronized (this.lock) {
            writeLedger = this.writeLedger;
            this.writeLedger = null;
            this.logMetadata = null;
        }
        this.writes.close().forEach(write -> {
            write.fail(new CancellationException("BookKeeperLog has been closed."), true);
        });
        if (writeLedger != null) {
            try {
                Ledgers.close(writeLedger.ledger);
            } catch (DurableDataLogException e) {
                log.error("{}: Unable to close LedgerHandle for Ledger {}.", new Object[]{this.traceObjectId, Long.valueOf(writeLedger.ledger.getId()), e});
            }
        }
        log.info("{}: Closed.", this.traceObjectId);
    }

    public void initialize(Duration duration) throws DurableDataLogException {
        LogMetadata updateMetadata;
        List<Long> ledgerIdsToDelete;
        synchronized (this.lock) {
            Preconditions.checkState(this.writeLedger == null, "BookKeeperLog is already initialized.");
            if (!$assertionsDisabled && this.logMetadata != null) {
                throw new AssertionError("writeLedger == null but logMetadata != null");
            }
            LogMetadata loadMetadata = loadMetadata();
            if (loadMetadata != null) {
                if (!loadMetadata.isEnabled()) {
                    throw new DataLogDisabledException("BookKeeperLog is disabled. Cannot initialize.");
                }
                loadMetadata = loadMetadata.updateLedgerStatus(Ledgers.fenceOut(loadMetadata.getLedgers(), this.bookKeeper, this.config, this.traceObjectId));
            }
            LedgerHandle create = Ledgers.create(this.bookKeeper, this.config);
            log.info("{}: Created Ledger {}.", this.traceObjectId, Long.valueOf(create.getId()));
            updateMetadata = updateMetadata(loadMetadata, create, true);
            LedgerMetadata ledger = updateMetadata.getLedger(create.getId());
            if (!$assertionsDisabled && ledger == null) {
                throw new AssertionError("cannot find newly added ledger metadata");
            }
            this.writeLedger = new WriteLedger(create, ledger);
            this.logMetadata = updateMetadata;
            ledgerIdsToDelete = getLedgerIdsToDelete(loadMetadata, updateMetadata);
        }
        ledgerIdsToDelete.forEach(l -> {
            try {
                Ledgers.delete(l.longValue(), this.bookKeeper);
                log.info("{}: Deleted orphan empty ledger {}.", this.traceObjectId, l);
            } catch (DurableDataLogException e) {
                log.warn("{}: Unable to delete orphan empty ledger {}.", new Object[]{this.traceObjectId, l, e});
            }
        });
        log.info("{}: Initialized (Epoch = {}, UpdateVersion = {}).", new Object[]{this.traceObjectId, Long.valueOf(updateMetadata.getEpoch()), Integer.valueOf(updateMetadata.getUpdateVersion())});
    }

    public void enable() throws DurableDataLogException {
        Exceptions.checkNotClosed(this.closed.get(), this);
        synchronized (this.lock) {
            Preconditions.checkState(this.writeLedger == null, "BookKeeperLog is already initialized; cannot re-enable.");
            if (!$assertionsDisabled && this.logMetadata != null) {
                throw new AssertionError("writeLedger == null but logMetadata != null");
            }
            LogMetadata loadMetadata = loadMetadata();
            Preconditions.checkState((loadMetadata == null || loadMetadata.isEnabled()) ? false : true, "BookKeeperLog is already enabled.");
            LogMetadata asEnabled = loadMetadata.asEnabled();
            persistMetadata(asEnabled, false);
            log.info("{}: Enabled (Epoch = {}, UpdateVersion = {}).", new Object[]{this.traceObjectId, Long.valueOf(asEnabled.getEpoch()), Integer.valueOf(asEnabled.getUpdateVersion())});
        }
    }

    public void disable() throws DurableDataLogException {
        synchronized (this.lock) {
            ensurePreconditions();
            Preconditions.checkState(getLogMetadata().isEnabled(), "BookKeeperLog is already disabled.");
            LogMetadata asDisabled = this.logMetadata.asDisabled();
            persistMetadata(asDisabled, false);
            this.logMetadata = asDisabled;
            log.info("{}: Disabled (Epoch = {}, UpdateVersion = {}).", new Object[]{this.traceObjectId, Long.valueOf(asDisabled.getEpoch()), Integer.valueOf(asDisabled.getUpdateVersion())});
        }
        close();
    }

    public CompletableFuture<LogAddress> append(ArrayView arrayView, Duration duration) {
        ensurePreconditions();
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "append", new Object[]{Integer.valueOf(arrayView.getLength())});
        if (arrayView.getLength() > 1047552) {
            return Futures.failedFuture(new WriteTooLongException(arrayView.getLength(), 1047552));
        }
        Timer timer = new Timer();
        CompletableFuture<LogAddress> completableFuture = new CompletableFuture<>();
        this.writes.add(new Write(arrayView, getWriteLedger(), completableFuture));
        this.writeProcessor.runAsync();
        completableFuture.whenCompleteAsync((logAddress, th) -> {
            if (th != null) {
                handleWriteException(th);
            } else {
                this.metrics.writeCompleted(timer.getElapsed());
                LoggerHelpers.traceLeave(log, this.traceObjectId, "append", traceEnterWithContext, new Object[]{Integer.valueOf(arrayView.getLength()), logAddress});
            }
        }, (Executor) this.executorService);
        return completableFuture;
    }

    public CompletableFuture<Void> truncate(LogAddress logAddress, Duration duration) {
        ensurePreconditions();
        Preconditions.checkArgument(logAddress instanceof LedgerAddress, "upToAddress must be of type LedgerAddress.");
        return CompletableFuture.runAsync(() -> {
            tryTruncate((LedgerAddress) logAddress);
        }, this.executorService);
    }

    public CloseableIterator<DurableDataLog.ReadItem, DurableDataLogException> getReader() throws DurableDataLogException {
        ensurePreconditions();
        return new LogReader(getLogMetadata(), this.bookKeeper, this.config);
    }

    public WriteSettings getWriteSettings() {
        return new WriteSettings(1047552, Duration.ofMillis(this.config.getBkWriteTimeoutMillis()), this.config.getMaxOutstandingBytes());
    }

    public long getEpoch() {
        ensurePreconditions();
        return getLogMetadata().getEpoch();
    }

    public QueueStats getQueueStatistics() {
        return this.writes.getStatistics();
    }

    public void registerQueueStateChangeListener(ThrottleSourceListener throttleSourceListener) {
        if (throttleSourceListener.isClosed()) {
            log.warn("{} Attempted to register a closed ThrottleSourceListener ({}).", this.traceObjectId, throttleSourceListener);
            return;
        }
        synchronized (this.queueStateChangeListeners) {
            this.queueStateChangeListeners.add(throttleSourceListener);
        }
    }

    private void processWritesSync() {
        if (this.closed.get()) {
            return;
        }
        if (getWriteLedger().ledger.isClosed()) {
            this.rolloverProcessor.runAsync();
        } else {
            if (processPendingWrites() || this.closed.get()) {
                return;
            }
            this.writeProcessor.runAsync();
        }
    }

    private boolean processPendingWrites() {
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "processPendingWrites", new Object[0]);
        WriteQueue.CleanupResult removeFinishedWrites = this.writes.removeFinishedWrites();
        if (removeFinishedWrites.getStatus() == WriteQueue.CleanupStatus.WriteFailed) {
            close();
            LoggerHelpers.traceLeave(log, this.traceObjectId, "processPendingWrites", traceEnterWithContext, new Object[]{removeFinishedWrites});
            return false;
        }
        if (removeFinishedWrites.getRemovedCount() > 0) {
            notifyQueueChangeListeners();
        }
        if (removeFinishedWrites.getStatus() == WriteQueue.CleanupStatus.QueueEmpty) {
            LoggerHelpers.traceLeave(log, this.traceObjectId, "processPendingWrites", traceEnterWithContext, new Object[]{removeFinishedWrites});
            return true;
        }
        List<Write> writesToExecute = getWritesToExecute();
        boolean z = true;
        if (!writesToExecute.isEmpty()) {
            z = executeWrites(writesToExecute);
            if (z) {
                this.rolloverProcessor.runAsync();
            }
        }
        LoggerHelpers.traceLeave(log, this.traceObjectId, "processPendingWrites", traceEnterWithContext, new Object[]{Integer.valueOf(writesToExecute.size()), Boolean.valueOf(z)});
        return z;
    }

    private List<Write> getWritesToExecute() {
        long bkLedgerMaxSize = this.config.getBkLedgerMaxSize() - getWriteLedger().ledger.getLength();
        List<Write> writesToExecute = this.writes.getWritesToExecute(bkLedgerMaxSize);
        if (handleClosedLedgers(writesToExecute)) {
            writesToExecute = this.writes.getWritesToExecute(bkLedgerMaxSize);
        }
        return writesToExecute;
    }

    private boolean executeWrites(List<Write> list) {
        log.debug("{}: Executing {} writes.", this.traceObjectId, Integer.valueOf(list.size()));
        for (int i = 0; i < list.size(); i++) {
            Write write = list.get(i);
            try {
                if (write.beginAttempt() > this.config.getMaxWriteAttempts()) {
                    throw new RetriesExhaustedException(write.getFailureCause());
                }
                write.getWriteLedger().ledger.asyncAddEntry(write.data.array(), write.data.arrayOffset(), write.data.getLength(), this::addCallback, write);
            } catch (Throwable th) {
                boolean z = !isRetryable(th);
                write.fail(th, z);
                for (int i2 = i + 1; i2 < list.size(); i2++) {
                    list.get(i2).fail(new DurableDataLogException("Previous write failed.", th), z);
                }
                return false;
            }
        }
        return true;
    }

    private boolean handleClosedLedgers(List<Write> list) {
        if (list.size() == 0 || !list.get(0).getWriteLedger().ledger.isClosed()) {
            return false;
        }
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "handleClosedLedgers", new Object[]{Integer.valueOf(list.size())});
        WriteLedger writeLedger = getWriteLedger();
        HashMap hashMap = new HashMap();
        boolean z = false;
        for (Write write : list) {
            if (!write.isDone() && write.getWriteLedger().ledger.isClosed()) {
                long fetchLastAddConfirmed = fetchLastAddConfirmed(write.getWriteLedger(), hashMap);
                if (write.getEntryId() >= 0 && write.getEntryId() <= fetchLastAddConfirmed) {
                    completeWrite(write);
                    z = true;
                } else if (writeLedger.ledger.getId() != write.getWriteLedger().ledger.getId()) {
                    write.setWriteLedger(writeLedger);
                    z = true;
                }
            }
        }
        LoggerHelpers.traceLeave(log, this.traceObjectId, "handleClosedLedgers", traceEnterWithContext, new Object[]{Integer.valueOf(list.size()), Boolean.valueOf(z)});
        return z;
    }

    private long fetchLastAddConfirmed(WriteLedger writeLedger, Map<Long, Long> map) {
        try {
            long id = writeLedger.ledger.getId();
            long longValue = map.getOrDefault(Long.valueOf(id), -1L).longValue();
            long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "fetchLastAddConfirmed", new Object[]{Long.valueOf(id), Long.valueOf(longValue)});
            if (longValue < 0) {
                longValue = writeLedger.isRolledOver() ? writeLedger.ledger.getLastAddConfirmed() : Ledgers.readLastAddConfirmed(id, this.bookKeeper, this.config);
                map.put(Long.valueOf(id), Long.valueOf(longValue));
                log.info("{}: Fetched actual LastAddConfirmed ({}) for LedgerId {}.", new Object[]{this.traceObjectId, Long.valueOf(longValue), Long.valueOf(id)});
            }
            LoggerHelpers.traceLeave(log, this.traceObjectId, "fetchLastAddConfirmed", traceEnterWithContext, new Object[]{Long.valueOf(id), Long.valueOf(longValue)});
            return longValue;
        } catch (DurableDataLogException e) {
            throw e;
        }
    }

    private void addCallback(int i, LedgerHandle ledgerHandle, long j, Object obj) {
        Write write = (Write) obj;
        try {
            try {
                if (!$assertionsDisabled && ledgerHandle.getId() != write.getWriteLedger().ledger.getId()) {
                    throw new AssertionError("Handle.Id mismatch: " + write.getWriteLedger().ledger.getId() + " vs " + ledgerHandle.getId());
                }
                write.setEntryId(j);
                if (i != 0) {
                    handleWriteException(i, write);
                    try {
                        return;
                    } catch (ObjectClosedException e) {
                        return;
                    }
                }
                completeWrite(write);
                try {
                    this.writeProcessor.runAsync();
                } catch (ObjectClosedException e2) {
                    log.warn("{}: Not running WriteProcessor as part of callback due to BookKeeperLog being closed.", this.traceObjectId, e2);
                }
            } finally {
                try {
                    this.writeProcessor.runAsync();
                } catch (ObjectClosedException e3) {
                    log.warn("{}: Not running WriteProcessor as part of callback due to BookKeeperLog being closed.", this.traceObjectId, e3);
                }
            }
        } catch (Throwable th) {
            write.fail(th, !isRetryable(th));
            try {
                this.writeProcessor.runAsync();
            } catch (ObjectClosedException e4) {
                log.warn("{}: Not running WriteProcessor as part of callback due to BookKeeperLog being closed.", this.traceObjectId, e4);
            }
        }
    }

    private void completeWrite(Write write) {
        Timer complete = write.complete();
        if (complete != null) {
            this.metrics.bookKeeperWriteCompleted(write.data.getLength(), complete.getElapsed());
        }
    }

    private void handleWriteException(Throwable th) {
        if (!(th instanceof ObjectClosedException) || this.closed.get()) {
            return;
        }
        log.warn("{}: Caught ObjectClosedException but not closed; closing now.", this.traceObjectId, th);
        close();
    }

    private void handleWriteException(int i, Write write) {
        if (!$assertionsDisabled && i == 0) {
            throw new AssertionError("cannot handle an exception when responseCode == 0");
        }
        BKException create = BKException.create(i);
        try {
            DataLogWriterNotPrimaryException dataLogWriterNotPrimaryException = create instanceof BKException.BKLedgerFencedException ? new DataLogWriterNotPrimaryException("BookKeeperLog is not primary anymore.", create) : create instanceof BKException.BKNotEnoughBookiesException ? new DataLogNotAvailableException("BookKeeperLog is not available.", create) : create instanceof BKException.BKLedgerClosedException ? new WriteFailureException("Active Ledger is closed.", create) : create instanceof BKException.BKWriteException ? new WriteFailureException("Unable to write to active Ledger.", create) : create instanceof BKException.BKClientClosedException ? new ObjectClosedException(this, create) : new DurableDataLogException("General exception while accessing BookKeeper.", create);
            write.fail(dataLogWriterNotPrimaryException, !isRetryable(dataLogWriterNotPrimaryException));
        } catch (Throwable th) {
            write.fail(create, !isRetryable(create));
            throw th;
        }
    }

    private static boolean isRetryable(Throwable th) {
        Throwable unwrap = Exceptions.unwrap(th);
        return (unwrap instanceof WriteFailureException) || (unwrap instanceof DataLogNotAvailableException);
    }

    private void tryTruncate(LedgerAddress ledgerAddress) {
        try {
            long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "tryTruncate", new Object[]{ledgerAddress});
            LogMetadata logMetadata = getLogMetadata();
            LogMetadata truncate = logMetadata.truncate(ledgerAddress);
            persistMetadata(truncate, false);
            synchronized (this.lock) {
                this.logMetadata = truncate;
            }
            Set set = (Set) truncate.getLedgers().stream().map((v0) -> {
                return v0.getLedgerId();
            }).collect(Collectors.toSet());
            for (LedgerMetadata ledgerMetadata : logMetadata.getLedgers().stream().filter(ledgerMetadata2 -> {
                return !set.contains(Long.valueOf(ledgerMetadata2.getLedgerId()));
            })) {
                try {
                    Ledgers.delete(ledgerMetadata.getLedgerId(), this.bookKeeper);
                } catch (DurableDataLogException e) {
                    log.error("{}: Unable to delete truncated ledger {}.", new Object[]{this.traceObjectId, Long.valueOf(ledgerMetadata.getLedgerId()), e});
                }
            }
            log.info("{}: Truncated up to {}.", this.traceObjectId, ledgerAddress);
            LoggerHelpers.traceLeave(log, this.traceObjectId, "tryTruncate", traceEnterWithContext, new Object[]{ledgerAddress});
        } catch (DurableDataLogException e2) {
            throw e2;
        }
    }

    private void notifyQueueChangeListeners() {
        boolean mustRethrow;
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        synchronized (this.queueStateChangeListeners) {
            Iterator<ThrottleSourceListener> it = this.queueStateChangeListeners.iterator();
            while (it.hasNext()) {
                ThrottleSourceListener next = it.next();
                if (next.isClosed()) {
                    arrayList2.add(next);
                } else {
                    arrayList.add(next);
                }
            }
            this.queueStateChangeListeners.removeAll(arrayList2);
        }
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            try {
                ((ThrottleSourceListener) it2.next()).notifyThrottleSourceChanged();
            } finally {
                if (mustRethrow) {
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public LogMetadata loadMetadata() throws DataLogInitializationException {
        try {
            Stat stat = new Stat();
            LogMetadata logMetadata = (LogMetadata) LogMetadata.SERIALIZER.deserialize((byte[]) ((WatchPathable) this.zkClient.getData().storingStatIn(stat)).forPath(this.logNodePath));
            logMetadata.withUpdateVersion(stat.getVersion());
            return logMetadata;
        } catch (Exception e) {
            throw new DataLogInitializationException(String.format("Unable to load ZNode contents for path '%s%s'.", this.zkClient.getNamespace(), this.logNodePath), e);
        } catch (KeeperException.NoNodeException e2) {
            log.warn("{}: No ZNode found for path '{}{}'. This is OK if this is the first time accessing this log.", new Object[]{this.traceObjectId, this.zkClient.getNamespace(), this.logNodePath});
            return null;
        }
    }

    private LogMetadata updateMetadata(LogMetadata logMetadata, LedgerHandle ledgerHandle, boolean z) throws DurableDataLogException {
        LogMetadata addLedger;
        boolean z2 = logMetadata == null;
        if (z2) {
            addLedger = new LogMetadata(ledgerHandle.getId());
        } else {
            addLedger = logMetadata.addLedger(ledgerHandle.getId());
            if (z) {
                addLedger = addLedger.removeEmptyLedgers(2);
            }
        }
        try {
            persistMetadata(addLedger, z2);
            log.info("{} Metadata updated ({}).", this.traceObjectId, addLedger);
            return addLedger;
        } catch (DurableDataLogException e) {
            try {
                Ledgers.delete(ledgerHandle.getId(), this.bookKeeper);
            } catch (Exception e2) {
                log.warn("{}: Unable to delete newly created ledger {}.", new Object[]{this.traceObjectId, Long.valueOf(ledgerHandle.getId()), e2});
                e.addSuppressed(e2);
            }
            throw e;
        }
    }

    private void persistMetadata(LogMetadata logMetadata, boolean z) throws DurableDataLogException {
        Stat stat;
        try {
            byte[] copy = LogMetadata.SERIALIZER.serialize(logMetadata).getCopy();
            if (z) {
                stat = new Stat();
                ((ACLBackgroundPathAndBytesable) this.zkClient.create().creatingParentsIfNeeded().storingStatIn(stat)).forPath(this.logNodePath, copy);
            } else {
                stat = (Stat) ((BackgroundPathAndBytesable) this.zkClient.setData().withVersion(logMetadata.getUpdateVersion())).forPath(this.logNodePath, copy);
            }
            logMetadata.withUpdateVersion(stat.getVersion());
            log.info("{} Metadata persisted ({}).", this.traceObjectId, logMetadata);
        } catch (Exception e) {
            throw new DataLogInitializationException(String.format("Unable to update ZNode for path '%s%s'.", this.zkClient.getNamespace(), this.logNodePath), e);
        } catch (KeeperException.NodeExistsException | KeeperException.BadVersionException e2) {
            throw new DataLogWriterNotPrimaryException(String.format("Unable to acquire exclusive write lock for log (path = '%s%s').", this.zkClient.getNamespace(), this.logNodePath), e2);
        }
    }

    private void rollover() {
        LedgerHandle ledgerHandle;
        try {
            if (this.closed.get()) {
                return;
            }
            long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "rollover", new Object[0]);
            LedgerHandle ledgerHandle2 = getWriteLedger().ledger;
            if (!ledgerHandle2.isClosed() && ledgerHandle2.getLength() < this.config.getBkLedgerMaxSize()) {
                this.writeProcessor.runAsync();
                LoggerHelpers.traceLeave(log, this.traceObjectId, "rollover", traceEnterWithContext, new Object[]{false});
                return;
            }
            try {
                LedgerHandle create = Ledgers.create(this.bookKeeper, this.config);
                log.debug("{}: Rollover: created new ledger {}.", this.traceObjectId, Long.valueOf(create.getId()));
                LogMetadata updateMetadata = updateMetadata(getLogMetadata(), create, false);
                LedgerMetadata ledger = updateMetadata.getLedger(create.getId());
                if (!$assertionsDisabled && ledger == null) {
                    throw new AssertionError("cannot find newly added ledger metadata");
                }
                log.debug("{}: Rollover: updated metadata '{}.", this.traceObjectId, updateMetadata);
                synchronized (this.lock) {
                    ledgerHandle = this.writeLedger.ledger;
                    if (!ledgerHandle.isClosed()) {
                        this.writeLedger.setRolledOver(true);
                    }
                    this.writeLedger = new WriteLedger(create, ledger);
                    this.logMetadata = updateMetadata;
                }
                Ledgers.close(ledgerHandle);
                log.info("{}: Rollover: swapped ledger and metadata pointers (Old = {}, New = {}) and closed old ledger.", new Object[]{this.traceObjectId, Long.valueOf(ledgerHandle.getId()), Long.valueOf(create.getId())});
                this.writeProcessor.runAsync();
                LoggerHelpers.traceLeave(log, this.traceObjectId, "rollover", traceEnterWithContext, new Object[]{true});
            } catch (Throwable th) {
                this.writeProcessor.runAsync();
                LoggerHelpers.traceLeave(log, this.traceObjectId, "rollover", traceEnterWithContext, new Object[]{true});
                throw th;
            }
        } catch (DurableDataLogException e) {
            throw e;
        }
    }

    @GuardedBy("lock")
    private List<Long> getLedgerIdsToDelete(LogMetadata logMetadata, LogMetadata logMetadata2) {
        if (logMetadata == null) {
            return Collections.emptyList();
        }
        Set set = (Set) logMetadata2.getLedgers().stream().map((v0) -> {
            return v0.getLedgerId();
        }).collect(Collectors.toSet());
        return (List) logMetadata.getLedgers().stream().map((v0) -> {
            return v0.getLedgerId();
        }).filter(l -> {
            return !set.contains(l);
        }).collect(Collectors.toList());
    }

    private void reportMetrics() {
        this.metrics.ledgerCount(getLogMetadata().getLedgers().size());
        this.metrics.queueStats(this.writes.getStatistics());
    }

    private LogMetadata getLogMetadata() {
        LogMetadata logMetadata;
        synchronized (this.lock) {
            logMetadata = this.logMetadata;
        }
        return logMetadata;
    }

    private WriteLedger getWriteLedger() {
        WriteLedger writeLedger;
        synchronized (this.lock) {
            writeLedger = this.writeLedger;
        }
        return writeLedger;
    }

    private void ensurePreconditions() {
        Exceptions.checkNotClosed(this.closed.get(), this);
        synchronized (this.lock) {
            Preconditions.checkState(this.writeLedger != null, "BookKeeperLog is not initialized.");
            if (!$assertionsDisabled && this.logMetadata == null) {
                throw new AssertionError("writeLedger != null but logMetadata == null");
            }
        }
    }

    static {
        $assertionsDisabled = !BookKeeperLog.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(BookKeeperLog.class);
    }
}
