/*
 * Decompiled with CFR 0.152.
 */
package dlshade.org.apache.bookkeeper.bookie;

import dlshade.org.apache.bookkeeper.bookie.Bookie;
import dlshade.org.apache.bookkeeper.bookie.CheckpointSource;
import dlshade.org.apache.bookkeeper.bookie.EntryKey;
import dlshade.org.apache.bookkeeper.bookie.EntryKeyValue;
import dlshade.org.apache.bookkeeper.bookie.EntryMemTable;
import dlshade.org.apache.bookkeeper.bookie.SkipListFlusher;
import dlshade.org.apache.bookkeeper.common.util.OrderedExecutor;
import dlshade.org.apache.bookkeeper.conf.ServerConfiguration;
import dlshade.org.apache.bookkeeper.stats.StatsLogger;
import java.io.IOException;
import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class EntryMemTableWithParallelFlusher
extends EntryMemTable {
    private static final Logger log = LoggerFactory.getLogger(EntryMemTableWithParallelFlusher.class);
    final OrderedExecutor flushExecutor;

    public EntryMemTableWithParallelFlusher(ServerConfiguration conf, CheckpointSource source, StatsLogger statsLogger) {
        super(conf, source, statsLogger);
        this.flushExecutor = OrderedExecutor.newBuilder().numThreads(conf.getNumOfMemtableFlushThreads()).name("MemtableFlushThreads").build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    long flushSnapshot(SkipListFlusher flusher, CheckpointSource.Checkpoint checkpoint) throws IOException {
        AtomicLong flushedSize = new AtomicLong();
        if (this.snapshot.compareTo(checkpoint) < 0) {
            EntryMemTableWithParallelFlusher entryMemTableWithParallelFlusher = this;
            synchronized (entryMemTableWithParallelFlusher) {
                EntryMemTable.EntrySkipList keyValues = this.snapshot;
                Phaser pendingNumOfLedgerFlushes = new Phaser(1);
                AtomicReference exceptionWhileFlushingParallelly = new AtomicReference();
                if (keyValues.compareTo(checkpoint) < 0) {
                    Map.Entry thisLedgerFirstMapEntry = keyValues.firstEntry();
                    while (thisLedgerFirstMapEntry != null) {
                        EntryKeyValue thisLedgerFirstEntry = (EntryKeyValue)thisLedgerFirstMapEntry.getValue();
                        long thisLedgerId = thisLedgerFirstEntry.getLedgerId();
                        EntryKey thisLedgerCeilingKeyMarker = new EntryKey(thisLedgerId, 0x7FFFFFFFFFFFFFFEL);
                        SortedMap thisLedgerEntries = keyValues.subMap(thisLedgerFirstEntry, thisLedgerCeilingKeyMarker);
                        pendingNumOfLedgerFlushes.register();
                        this.flushExecutor.executeOrdered(thisLedgerId, () -> EntryMemTableWithParallelFlusher.lambda$flushSnapshot$0((ConcurrentNavigableMap)thisLedgerEntries, flushedSize, flusher, pendingNumOfLedgerFlushes, exceptionWhileFlushingParallelly));
                        thisLedgerFirstMapEntry = keyValues.ceilingEntry(thisLedgerCeilingKeyMarker);
                    }
                    boolean phaserTerminatedAbruptly = false;
                    try {
                        phaserTerminatedAbruptly = pendingNumOfLedgerFlushes.arriveAndAwaitAdvance() < 0;
                    }
                    catch (IllegalStateException ise) {
                        log.error("Got IllegalStateException while awaiting on Phaser", (Throwable)ise);
                        throw new IOException("Got IllegalStateException while awaiting on Phaser", ise);
                    }
                    if (phaserTerminatedAbruptly) {
                        log.error("Phaser is terminated while awaiting flushExecutor to complete the entry flushes", (Throwable)exceptionWhileFlushingParallelly.get());
                        throw new IOException("Failed to complete the flushSnapshotByParallelizing", (Throwable)exceptionWhileFlushingParallelly.get());
                    }
                    this.memTableStats.getFlushBytesCounter().addCount(flushedSize.get());
                    this.clearSnapshot(keyValues);
                }
            }
        }
        this.skipListSemaphore.release(flushedSize.intValue());
        return flushedSize.longValue();
    }

    @Override
    public void close() throws Exception {
        this.flushExecutor.shutdown();
    }

    private static /* synthetic */ void lambda$flushSnapshot$0(ConcurrentNavigableMap thisLedgerEntries, AtomicLong flushedSize, SkipListFlusher flusher, Phaser pendingNumOfLedgerFlushes, AtomicReference exceptionWhileFlushingParallelly) {
        try {
            boolean ledgerDeleted = false;
            for (EntryKey key : thisLedgerEntries.keySet()) {
                EntryKeyValue kv = (EntryKeyValue)key;
                flushedSize.addAndGet(kv.getLength());
                long ledger = kv.getLedgerId();
                if (ledgerDeleted) continue;
                try {
                    flusher.process(ledger, kv.getEntryId(), kv.getValueAsByteBuffer());
                }
                catch (Bookie.NoLedgerException exception) {
                    ledgerDeleted = true;
                }
            }
            pendingNumOfLedgerFlushes.arriveAndDeregister();
        }
        catch (Exception exc) {
            log.error("Got Exception while trying to flush process entryies: ", (Throwable)exc);
            exceptionWhileFlushingParallelly.set(exc);
            pendingNumOfLedgerFlushes.forceTermination();
        }
    }
}

