/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.bookie;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.functions.runtime.shaded.io.netty.buffer.ByteBuf;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.bookie.EntryLogManagerBase;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.bookie.EntryLogger;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.bookie.EntryLoggerAllocator;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.bookie.LedgerDirsManager;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class EntryLogManagerForSingleEntryLog
extends EntryLogManagerBase {
    private static final Logger log = LoggerFactory.getLogger(EntryLogManagerForSingleEntryLog.class);
    private volatile EntryLogger.BufferedLogChannel activeLogChannel;
    private long logIdBeforeFlush = -1L;
    private final AtomicBoolean shouldCreateNewEntryLog = new AtomicBoolean(false);
    private final EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus;

    EntryLogManagerForSingleEntryLog(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager, EntryLoggerAllocator entryLoggerAllocator, List<EntryLogger.EntryLogListener> listeners, EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus) {
        super(conf, ledgerDirsManager, entryLoggerAllocator, listeners);
        this.rotatedLogChannels = new LinkedList();
        this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus;
        ledgerDirsManager.addLedgerDirsListener(this.getLedgerDirsListener());
    }

    private LedgerDirsManager.LedgerDirsListener getLedgerDirsListener() {
        return new LedgerDirsManager.LedgerDirsListener(){

            @Override
            public void diskFull(File disk) {
                EntryLogger.BufferedLogChannel currentActiveLogChannel = EntryLogManagerForSingleEntryLog.this.activeLogChannel;
                if (currentActiveLogChannel != null && currentActiveLogChannel.getLogFile().getParentFile().equals(disk)) {
                    EntryLogManagerForSingleEntryLog.this.shouldCreateNewEntryLog.set(true);
                }
            }

            @Override
            public void diskAlmostFull(File disk) {
                EntryLogger.BufferedLogChannel currentActiveLogChannel = EntryLogManagerForSingleEntryLog.this.activeLogChannel;
                if (currentActiveLogChannel != null && currentActiveLogChannel.getLogFile().getParentFile().equals(disk)) {
                    EntryLogManagerForSingleEntryLog.this.shouldCreateNewEntryLog.set(true);
                }
            }
        };
    }

    @Override
    public synchronized long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException {
        return super.addEntry(ledger, entry, rollLog);
    }

    @Override
    synchronized EntryLogger.BufferedLogChannel getCurrentLogForLedgerForAddEntry(long ledgerId, int entrySize, boolean rollLog) throws IOException {
        if (null == this.activeLogChannel) {
            this.createNewLog(-1L, "because current active log channel has not initialized yet");
        }
        boolean reachEntryLogLimit = rollLog ? this.reachEntryLogLimit(this.activeLogChannel, entrySize) : this.readEntryLogHardLimit(this.activeLogChannel, entrySize);
        boolean createNewLog = this.shouldCreateNewEntryLog.get();
        if (createNewLog || reachEntryLogLimit) {
            if (this.activeLogChannel != null) {
                this.activeLogChannel.flushAndForceWriteIfRegularFlush(false);
            }
            this.createNewLog(-1L, ": createNewLog = " + createNewLog + ", reachEntryLogLimit = " + reachEntryLogLimit);
            if (createNewLog) {
                this.shouldCreateNewEntryLog.set(false);
            }
        }
        return this.activeLogChannel;
    }

    @Override
    synchronized void createNewLog(long ledgerId) throws IOException {
        super.createNewLog(ledgerId);
    }

    @Override
    public synchronized void setCurrentLogForLedgerAndAddToRotate(long ledgerId, EntryLogger.BufferedLogChannel logChannel) {
        EntryLogger.BufferedLogChannel hasToRotateLogChannel = this.activeLogChannel;
        this.activeLogChannel = logChannel;
        if (hasToRotateLogChannel != null) {
            this.rotatedLogChannels.add(hasToRotateLogChannel);
        }
    }

    @Override
    public EntryLogger.BufferedLogChannel getCurrentLogForLedger(long ledgerId) {
        return this.activeLogChannel;
    }

    @Override
    public EntryLogger.BufferedLogChannel getCurrentLogIfPresent(long entryLogId) {
        EntryLogger.BufferedLogChannel activeLogChannelTemp = this.activeLogChannel;
        if (activeLogChannelTemp != null && activeLogChannelTemp.getLogId() == entryLogId) {
            return activeLogChannelTemp;
        }
        return null;
    }

    @Override
    public File getDirForNextEntryLog(List<File> writableLedgerDirs) {
        Collections.shuffle(writableLedgerDirs);
        return writableLedgerDirs.get(0);
    }

    @Override
    public void checkpoint() throws IOException {
        this.flushRotatedLogs();
    }

    public long getCurrentLogId() {
        EntryLogger.BufferedLogChannel currentActiveLogChannel = this.activeLogChannel;
        if (currentActiveLogChannel != null) {
            return currentActiveLogChannel.getLogId();
        }
        return -57005L;
    }

    @Override
    public void flushCurrentLogs() throws IOException {
        EntryLogger.BufferedLogChannel currentActiveLogChannel = this.activeLogChannel;
        if (currentActiveLogChannel != null) {
            this.flushLogChannel(currentActiveLogChannel, true);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    void flushRotatedLogs() throws IOException {
        List channels = null;
        EntryLogManagerForSingleEntryLog entryLogManagerForSingleEntryLog = this;
        synchronized (entryLogManagerForSingleEntryLog) {
            channels = this.rotatedLogChannels;
            this.rotatedLogChannels = new LinkedList();
        }
        if (null == channels) {
            return;
        }
        Iterator chIter = channels.iterator();
        while (chIter.hasNext()) {
            EntryLogger.BufferedLogChannel channel = (EntryLogger.BufferedLogChannel)chIter.next();
            try {
                channel.flushAndForceWrite(true);
            }
            catch (IOException ioe) {
                EntryLogManagerForSingleEntryLog entryLogManagerForSingleEntryLog2 = this;
                synchronized (entryLogManagerForSingleEntryLog2) {
                    if (null == this.rotatedLogChannels) {
                        this.rotatedLogChannels = channels;
                    } else {
                        this.rotatedLogChannels.addAll(0, channels);
                    }
                }
                throw ioe;
            }
            chIter.remove();
            channel.close();
            this.recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(channel.getLogId());
            log.info("Synced entry logger {} to disk.", (Object)channel.getLogId());
        }
    }

    @Override
    public void close() throws IOException {
        if (this.activeLogChannel != null) {
            this.activeLogChannel.close();
        }
    }

    @Override
    public void forceClose() {
        IOUtils.close(log, (Closeable)this.activeLogChannel);
    }

    @Override
    public void prepareEntryMemTableFlush() {
        this.logIdBeforeFlush = this.getCurrentLogId();
    }

    @Override
    public boolean commitEntryMemTableFlush() throws IOException {
        long logIdAfterFlush = this.getCurrentLogId();
        if (this.reachEntryLogLimit(this.activeLogChannel, 0L) || logIdAfterFlush != this.logIdBeforeFlush) {
            log.info("Rolling entry logger since it reached size limitation");
            this.createNewLog(-1L, "due to reaching log limit after flushing memtable : logIdBeforeFlush = " + this.logIdBeforeFlush + ", logIdAfterFlush = " + logIdAfterFlush);
            return true;
        }
        return false;
    }

    @Override
    public void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws IOException {
        if (numBytesFlushed > 0L) {
            this.createNewLog(-1L, "due to preparing checkpoint : numBytesFlushed = " + numBytesFlushed);
        }
    }

    @Override
    public EntryLogger.BufferedLogChannel createNewLogForCompaction() throws IOException {
        return this.entryLoggerAllocator.createNewLogForCompaction(this.selectDirForNextEntryLog());
    }
}

