/*
 * Decompiled with CFR 0.152.
 */
package dlshade.org.apache.bookkeeper.bookie;

import dlshade.org.apache.bookkeeper.bookie.Bookie;
import dlshade.org.apache.bookkeeper.bookie.CheckpointSource;
import dlshade.org.apache.bookkeeper.bookie.EntryKey;
import dlshade.org.apache.bookkeeper.bookie.EntryKeyValue;
import dlshade.org.apache.bookkeeper.bookie.EntryMemTable;
import dlshade.org.apache.bookkeeper.bookie.SkipListFlusher;
import dlshade.org.apache.bookkeeper.common.util.OrderedExecutor;
import dlshade.org.apache.bookkeeper.conf.ServerConfiguration;
import dlshade.org.apache.bookkeeper.stats.StatsLogger;
import dlshade.org.apache.bookkeeper.util.SafeRunnable;
import java.io.IOException;
import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class EntryMemTableWithParallelFlusher
extends EntryMemTable {
    private static final Logger log = LoggerFactory.getLogger(EntryMemTableWithParallelFlusher.class);
    final OrderedExecutor flushExecutor;

    public EntryMemTableWithParallelFlusher(ServerConfiguration conf, CheckpointSource source, StatsLogger statsLogger) {
        super(conf, source, statsLogger);
        this.flushExecutor = OrderedExecutor.newBuilder().numThreads(conf.getNumOfMemtableFlushThreads()).name("MemtableFlushThreads").build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    long flushSnapshot(SkipListFlusher flusher, CheckpointSource.Checkpoint checkpoint) throws IOException {
        AtomicLong flushedSize = new AtomicLong();
        if (this.snapshot.compareTo(checkpoint) < 0) {
            EntryMemTableWithParallelFlusher entryMemTableWithParallelFlusher = this;
            synchronized (entryMemTableWithParallelFlusher) {
                EntryMemTable.EntrySkipList keyValues = this.snapshot;
                Phaser pendingNumOfLedgerFlushes = new Phaser(1);
                AtomicReference exceptionWhileFlushingParallelly = new AtomicReference();
                if (keyValues.compareTo(checkpoint) < 0) {
                    Map.Entry thisLedgerFirstMapEntry = keyValues.firstEntry();
                    while (thisLedgerFirstMapEntry != null) {
                        EntryKeyValue thisLedgerFirstEntry = (EntryKeyValue)thisLedgerFirstMapEntry.getValue();
                        long thisLedgerId = thisLedgerFirstEntry.getLedgerId();
                        EntryKey thisLedgerCeilingKeyMarker = new EntryKey(thisLedgerId, 0x7FFFFFFFFFFFFFFEL);
                        SortedMap thisLedgerEntries = keyValues.subMap(thisLedgerFirstEntry, thisLedgerCeilingKeyMarker);
                        pendingNumOfLedgerFlushes.register();
                        this.flushExecutor.executeOrdered(thisLedgerId, (dlshade.org.apache.bookkeeper.common.util.SafeRunnable)new SafeRunnable((ConcurrentNavigableMap)thisLedgerEntries, flushedSize, flusher, pendingNumOfLedgerFlushes, exceptionWhileFlushingParallelly){
                            final /* synthetic */ ConcurrentNavigableMap val$thisLedgerEntries;
                            final /* synthetic */ AtomicLong val$flushedSize;
                            final /* synthetic */ SkipListFlusher val$flusher;
                            final /* synthetic */ Phaser val$pendingNumOfLedgerFlushes;
                            final /* synthetic */ AtomicReference val$exceptionWhileFlushingParallelly;
                            {
                                this.val$thisLedgerEntries = concurrentNavigableMap;
                                this.val$flushedSize = atomicLong;
                                this.val$flusher = skipListFlusher;
                                this.val$pendingNumOfLedgerFlushes = phaser;
                                this.val$exceptionWhileFlushingParallelly = atomicReference;
                            }

                            @Override
                            public void safeRun() {
                                try {
                                    boolean ledgerDeleted = false;
                                    for (EntryKey key : this.val$thisLedgerEntries.keySet()) {
                                        EntryKeyValue kv = (EntryKeyValue)key;
                                        this.val$flushedSize.addAndGet(kv.getLength());
                                        long ledger = kv.getLedgerId();
                                        if (ledgerDeleted) continue;
                                        try {
                                            this.val$flusher.process(ledger, kv.getEntryId(), kv.getValueAsByteBuffer());
                                        }
                                        catch (Bookie.NoLedgerException exception) {
                                            ledgerDeleted = true;
                                        }
                                    }
                                    this.val$pendingNumOfLedgerFlushes.arriveAndDeregister();
                                }
                                catch (Exception exc) {
                                    log.error("Got Exception while trying to flush process entryies: ", (Throwable)exc);
                                    this.val$exceptionWhileFlushingParallelly.set(exc);
                                    this.val$pendingNumOfLedgerFlushes.forceTermination();
                                }
                            }
                        });
                        thisLedgerFirstMapEntry = keyValues.ceilingEntry(thisLedgerCeilingKeyMarker);
                    }
                    boolean phaserTerminatedAbruptly = false;
                    try {
                        phaserTerminatedAbruptly = pendingNumOfLedgerFlushes.arriveAndAwaitAdvance() < 0;
                    }
                    catch (IllegalStateException ise) {
                        log.error("Got IllegalStateException while awaiting on Phaser", (Throwable)ise);
                        throw new IOException("Got IllegalStateException while awaiting on Phaser", ise);
                    }
                    if (phaserTerminatedAbruptly) {
                        log.error("Phaser is terminated while awaiting flushExecutor to complete the entry flushes", (Throwable)exceptionWhileFlushingParallelly.get());
                        throw new IOException("Failed to complete the flushSnapshotByParallelizing", (Throwable)exceptionWhileFlushingParallelly.get());
                    }
                    this.memTableStats.getFlushBytesCounter().add(flushedSize.get());
                    this.clearSnapshot(keyValues);
                }
            }
        }
        this.skipListSemaphore.release(flushedSize.intValue());
        return flushedSize.longValue();
    }

    @Override
    public void close() throws Exception {
        this.flushExecutor.shutdown();
    }
}

