/*
 * Decompiled with CFR 0.152.
 */
package dlshade.org.apache.bookkeeper.bookie.storage.directentrylogger;

import dlshade.com.google.common.annotations.VisibleForTesting;
import dlshade.com.google.common.base.Preconditions;
import dlshade.com.google.common.cache.Cache;
import dlshade.com.google.common.cache.CacheBuilder;
import dlshade.com.google.common.cache.RemovalListener;
import dlshade.org.apache.bookkeeper.bookie.AbstractLogCompactor;
import dlshade.org.apache.bookkeeper.bookie.Bookie;
import dlshade.org.apache.bookkeeper.bookie.EntryLogMetadata;
import dlshade.org.apache.bookkeeper.bookie.storage.CompactionEntryLog;
import dlshade.org.apache.bookkeeper.bookie.storage.EntryLogIds;
import dlshade.org.apache.bookkeeper.bookie.storage.EntryLogScanner;
import dlshade.org.apache.bookkeeper.bookie.storage.EntryLogger;
import dlshade.org.apache.bookkeeper.bookie.storage.directentrylogger.Buffer;
import dlshade.org.apache.bookkeeper.bookie.storage.directentrylogger.BufferPool;
import dlshade.org.apache.bookkeeper.bookie.storage.directentrylogger.DirectCompactionEntryLog;
import dlshade.org.apache.bookkeeper.bookie.storage.directentrylogger.DirectEntryLoggerStats;
import dlshade.org.apache.bookkeeper.bookie.storage.directentrylogger.DirectReader;
import dlshade.org.apache.bookkeeper.bookie.storage.directentrylogger.DirectWriter;
import dlshade.org.apache.bookkeeper.bookie.storage.directentrylogger.Events;
import dlshade.org.apache.bookkeeper.bookie.storage.directentrylogger.Header;
import dlshade.org.apache.bookkeeper.bookie.storage.directentrylogger.LogMetadata;
import dlshade.org.apache.bookkeeper.bookie.storage.directentrylogger.LogReader;
import dlshade.org.apache.bookkeeper.bookie.storage.directentrylogger.LogReaderScan;
import dlshade.org.apache.bookkeeper.bookie.storage.directentrylogger.LogWriter;
import dlshade.org.apache.bookkeeper.bookie.storage.directentrylogger.WriterWithMetadata;
import dlshade.org.apache.bookkeeper.common.util.ExceptionMessageHelper;
import dlshade.org.apache.bookkeeper.common.util.nativeio.NativeIO;
import dlshade.org.apache.bookkeeper.slogger.Slogger;
import dlshade.org.apache.bookkeeper.stats.StatsLogger;
import dlshade.org.apache.bookkeeper.util.LedgerDirUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.stream.Collectors;

public class DirectEntryLogger
implements EntryLogger {
    private final Slogger slog;
    private final File ledgerDir;
    private final EntryLogIds ids;
    private final ExecutorService writeExecutor;
    private final ExecutorService flushExecutor;
    private final long maxFileSize;
    private final DirectEntryLoggerStats stats;
    private final ByteBufAllocator allocator;
    private final BufferPool writeBuffers;
    private final int readBufferSize;
    private final int maxSaneEntrySize;
    private final Set<Integer> unflushedLogs;
    private WriterWithMetadata curWriter;
    private List<Future<?>> pendingFlushes;
    private final NativeIO nativeIO;
    private final List<Cache<?, ?>> allCaches = new CopyOnWriteArrayList();
    private final ThreadLocal<Cache<Integer, LogReader>> caches;
    private static final int NUMBER_OF_WRITE_BUFFERS = 8;

    public DirectEntryLogger(File ledgerDir, EntryLogIds ids, NativeIO nativeIO, ByteBufAllocator allocator, ExecutorService writeExecutor, ExecutorService flushExecutor, long maxFileSize, int maxSaneEntrySize, long totalWriteBufferSize, long totalReadBufferSize, int readBufferSize, int numReadThreads, int maxFdCacheTimeSeconds, Slogger slogParent, StatsLogger stats) throws IOException {
        this.ledgerDir = ledgerDir;
        this.flushExecutor = flushExecutor;
        this.writeExecutor = writeExecutor;
        this.pendingFlushes = new ArrayList();
        this.nativeIO = nativeIO;
        this.unflushedLogs = ConcurrentHashMap.newKeySet();
        this.maxFileSize = maxFileSize;
        this.maxSaneEntrySize = maxSaneEntrySize;
        this.readBufferSize = Buffer.nextAlignment(readBufferSize);
        this.ids = ids;
        this.slog = slogParent.kv((Object)"directory", (Object)ledgerDir).ctx(DirectEntryLogger.class);
        this.stats = new DirectEntryLoggerStats(stats);
        this.allocator = allocator;
        int singleWriteBufferSize = Buffer.nextAlignment((int)(totalWriteBufferSize / 8L));
        this.writeBuffers = new BufferPool(nativeIO, allocator, singleWriteBufferSize, 8);
        long perThreadBufferSize = totalReadBufferSize / (long)numReadThreads;
        if (perThreadBufferSize < (long)readBufferSize) {
            this.slog.kv((Object)"reason", (Object)"perThreadBufferSize lower than readBufferSize (causes immediate reader cache eviction)").kv((Object)"totalReadBufferSize", (Object)totalReadBufferSize).kv((Object)"totalNumReadThreads", (Object)numReadThreads).kv((Object)"readBufferSize", (Object)readBufferSize).kv((Object)"perThreadBufferSize", (Object)perThreadBufferSize).error((Enum)Events.ENTRYLOGGER_MISCONFIGURED);
        }
        long maxCachedReadersPerThread = perThreadBufferSize / (long)readBufferSize;
        long maxCachedReaders = maxCachedReadersPerThread * (long)numReadThreads;
        this.slog.kv((Object)"maxFileSize", (Object)maxFileSize).kv((Object)"maxSaneEntrySize", (Object)maxSaneEntrySize).kv((Object)"totalWriteBufferSize", (Object)totalWriteBufferSize).kv((Object)"singleWriteBufferSize", (Object)singleWriteBufferSize).kv((Object)"totalReadBufferSize", (Object)totalReadBufferSize).kv((Object)"readBufferSize", (Object)readBufferSize).kv((Object)"perThreadBufferSize", (Object)perThreadBufferSize).kv((Object)"maxCachedReadersPerThread", (Object)maxCachedReadersPerThread).kv((Object)"maxCachedReaders", (Object)maxCachedReaders).info((Enum)Events.ENTRYLOGGER_CREATED);
        this.caches = ThreadLocal.withInitial(() -> {
            RemovalListener rl = notification -> {
                try {
                    ((LogReader)notification.getValue()).close();
                    this.stats.getCloseReaderCounter().inc();
                }
                catch (IOException ioe) {
                    this.slog.kv((Object)"logID", notification.getKey()).error((Enum)Events.READER_CLOSE_ERROR);
                }
            };
            Cache cache = CacheBuilder.newBuilder().maximumWeight(perThreadBufferSize).weigher((key, value) -> readBufferSize).removalListener(rl).expireAfterAccess(maxFdCacheTimeSeconds, TimeUnit.SECONDS).concurrencyLevel(1).build();
            this.allCaches.add(cache);
            return cache;
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public long addEntry(long ledgerId, ByteBuf buf) throws IOException {
        long offset;
        long start = System.nanoTime();
        DirectEntryLogger directEntryLogger = this;
        synchronized (directEntryLogger) {
            if (this.curWriter != null && this.curWriter.shouldRoll(buf, this.maxFileSize)) {
                this.flushAndCloseCurrent();
                this.curWriter = null;
            }
            if (this.curWriter == null) {
                int newId = this.ids.nextId();
                this.curWriter = new WriterWithMetadata(this.newDirectWriter(newId), new EntryLogMetadata(newId), this.allocator);
                this.slog.kv((Object)"newLogId", (Object)newId).info((Enum)Events.LOG_ROLL);
            }
            offset = this.curWriter.addEntry(ledgerId, buf);
        }
        this.stats.getAddEntryStats().registerSuccessfulEvent(System.nanoTime() - start, TimeUnit.NANOSECONDS);
        return offset;
    }

    @Override
    public ByteBuf readEntry(long entryLocation) throws IOException, Bookie.NoEntryException {
        return this.internalReadEntry(-1L, -1L, entryLocation, false);
    }

    @Override
    public ByteBuf readEntry(long ledgerId, long entryId, long entryLocation) throws IOException, Bookie.NoEntryException {
        return this.internalReadEntry(ledgerId, entryId, entryLocation, true);
    }

    private LogReader getReader(int logId) throws IOException {
        Cache<Integer, LogReader> cache = this.caches.get();
        try {
            LogReader reader = cache.get(logId, () -> {
                this.stats.getOpenReaderCounter().inc();
                return this.newDirectReader(logId);
            });
            if (reader.isClosed()) {
                this.stats.getCachedReadersServedClosedCounter().inc();
                throw new IOException(ExceptionMessageHelper.exMsg("Cached reader already closed").kv("logId", logId).toString());
            }
            return reader;
        }
        catch (ExecutionException ee) {
            if (ee.getCause() instanceof IOException) {
                throw (IOException)ee.getCause();
            }
            throw new IOException(ExceptionMessageHelper.exMsg("Error loading reader in cache").kv("logId", logId).toString(), ee);
        }
    }

    private ByteBuf internalReadEntry(long ledgerId, long entryId, long location, boolean validateEntry) throws IOException, Bookie.NoEntryException {
        int logId = (int)(location >> 32);
        int pos = (int)(location & 0xFFFFFFFFFFFFFFFFL);
        long start = System.nanoTime();
        LogReader reader = this.getReader(logId);
        try {
            ByteBuf buf = reader.readEntryAt(pos);
            if (validateEntry) {
                long thisLedgerId = buf.getLong(0);
                long thisEntryId = buf.getLong(8);
                if (thisLedgerId != ledgerId || thisEntryId != entryId) {
                    ReferenceCountUtil.release((Object)buf);
                    throw new IOException(ExceptionMessageHelper.exMsg("Bad location").kv("location", location).kv("expectedLedger", ledgerId).kv("expectedEntry", entryId).kv("foundLedger", thisLedgerId).kv("foundEntry", thisEntryId).toString());
                }
            }
            this.stats.getReadEntryStats().registerSuccessfulEvent(System.nanoTime() - start, TimeUnit.NANOSECONDS);
            return buf;
        }
        catch (EOFException eof) {
            this.stats.getReadEntryStats().registerFailedEvent(System.nanoTime() - start, TimeUnit.NANOSECONDS);
            throw new Bookie.NoEntryException(ExceptionMessageHelper.exMsg("Entry location doesn't exist").kv("location", location).toString(), ledgerId, entryId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void flush() throws IOException {
        List<Future<?>> outstandingFlushes;
        long start = System.nanoTime();
        Future<?> currentFuture = this.flushCurrent();
        DirectEntryLogger directEntryLogger = this;
        synchronized (directEntryLogger) {
            outstandingFlushes = this.pendingFlushes;
            this.pendingFlushes = new ArrayList();
        }
        outstandingFlushes.add(currentFuture);
        for (Future future : outstandingFlushes) {
            try {
                future.get();
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                throw new IOException("Interruped while flushing", ie);
            }
            catch (ExecutionException ee) {
                if (ee.getCause() instanceof IOException) {
                    throw (IOException)ee.getCause();
                }
                throw new IOException("Exception flushing writer", ee);
            }
        }
        this.stats.getFlushStats().registerSuccessfulEvent(System.nanoTime() - start, TimeUnit.NANOSECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Future<?> flushCurrent() throws IOException {
        WriterWithMetadata flushWriter;
        DirectEntryLogger directEntryLogger = this;
        synchronized (directEntryLogger) {
            flushWriter = this.curWriter;
        }
        if (flushWriter != null) {
            return this.flushExecutor.submit(() -> {
                long start = System.nanoTime();
                try {
                    flushWriter.flush();
                    this.stats.getWriterFlushStats().registerSuccessfulEvent(System.nanoTime() - start, TimeUnit.NANOSECONDS);
                }
                catch (Throwable t) {
                    this.stats.getWriterFlushStats().registerFailedEvent(System.nanoTime() - start, TimeUnit.NANOSECONDS);
                    throw t;
                }
                return null;
            });
        }
        return CompletableFuture.completedFuture(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void flushAndCloseCurrent() throws IOException {
        WriterWithMetadata flushWriter;
        CompletableFuture<Object> flushPromise = new CompletableFuture<Object>();
        DirectEntryLogger directEntryLogger = this;
        synchronized (directEntryLogger) {
            flushWriter = this.curWriter;
            this.curWriter = null;
            this.pendingFlushes.add(flushPromise);
        }
        if (flushWriter != null) {
            this.flushExecutor.execute(() -> {
                long start = System.nanoTime();
                try {
                    flushWriter.finalizeAndClose();
                    this.stats.getWriterFlushStats().registerSuccessfulEvent(System.nanoTime() - start, TimeUnit.NANOSECONDS);
                    this.unflushedLogs.remove(flushWriter.logId());
                    flushPromise.complete(null);
                }
                catch (Throwable t) {
                    this.stats.getWriterFlushStats().registerFailedEvent(System.nanoTime() - start, TimeUnit.NANOSECONDS);
                    flushPromise.completeExceptionally(t);
                }
            });
        } else {
            flushPromise.complete(null);
        }
    }

    @Override
    public void close() throws IOException {
        this.flushAndCloseCurrent();
        this.flush();
        for (Cache<?, ?> c : this.allCaches) {
            c.invalidateAll();
        }
        this.writeBuffers.close();
    }

    @Override
    public Collection<Long> getFlushedLogIds() {
        return LedgerDirUtil.logIdsInDirectory(this.ledgerDir).stream().filter(logId -> !this.unflushedLogs.contains(logId)).map(i -> (long)i).collect(Collectors.toList());
    }

    @Override
    public boolean removeEntryLog(long entryLogId) {
        Preconditions.checkArgument(entryLogId < Integer.MAX_VALUE, "Entry log id must be an int [%d]", entryLogId);
        File file = DirectEntryLogger.logFile(this.ledgerDir, (int)entryLogId);
        boolean result = file.delete();
        this.slog.kv((Object)"file", (Object)file).kv((Object)"logId", (Object)entryLogId).kv((Object)"result", (Object)result).info((Enum)Events.LOG_DELETED);
        return result;
    }

    @Override
    public void scanEntryLog(long entryLogId, EntryLogScanner scanner) throws IOException {
        Preconditions.checkArgument(entryLogId < Integer.MAX_VALUE, "Entry log id must be an int [%d]", entryLogId);
        try (LogReader reader = this.newDirectReader((int)entryLogId);){
            LogReaderScan.scan(this.allocator, reader, scanner);
        }
    }

    @Override
    public boolean logExists(long logId) {
        Preconditions.checkArgument(logId < Integer.MAX_VALUE, "Entry log id must be an int [%d]", logId);
        return DirectEntryLogger.logFile(this.ledgerDir, (int)logId).exists();
    }

    @Override
    public EntryLogMetadata getEntryLogMetadata(long entryLogId, AbstractLogCompactor.Throttler throttler) throws IOException {
        try {
            return this.readEntryLogIndex(entryLogId);
        }
        catch (IOException e) {
            this.slog.kv((Object)"entryLogId", (Object)entryLogId).kv((Object)"reason", (Object)e.getMessage()).info((Enum)Events.READ_METADATA_FALLBACK);
            return this.scanEntryLogMetadata(entryLogId, throttler);
        }
    }

    @VisibleForTesting
    EntryLogMetadata readEntryLogIndex(long logId) throws IOException {
        Preconditions.checkArgument(logId < Integer.MAX_VALUE, "Entry log id must be an int [%d]", logId);
        try (LogReader reader = this.newDirectReader((int)logId);){
            EntryLogMetadata entryLogMetadata = LogMetadata.read(reader);
            return entryLogMetadata;
        }
    }

    @VisibleForTesting
    EntryLogMetadata scanEntryLogMetadata(long logId, final AbstractLogCompactor.Throttler throttler) throws IOException {
        final EntryLogMetadata meta = new EntryLogMetadata(logId);
        this.scanEntryLog(logId, new EntryLogScanner(){

            @Override
            public void process(long ledgerId, long offset, ByteBuf entry) throws IOException {
                if (throttler != null) {
                    throttler.acquire(entry.readableBytes());
                }
                meta.addLedgerSize(ledgerId, entry.readableBytes() + 4);
            }

            @Override
            public boolean accept(long ledgerId) {
                return ledgerId >= 0L;
            }
        });
        return meta;
    }

    @VisibleForTesting
    LogReader newDirectReader(int logId) throws IOException {
        return new DirectReader(logId, DirectEntryLogger.logFilename(this.ledgerDir, logId), this.allocator, this.nativeIO, this.readBufferSize, this.maxSaneEntrySize, this.stats.getReadBlockStats());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private LogWriter newDirectWriter(int newId) throws IOException {
        this.unflushedLogs.add(newId);
        DirectWriter writer = new DirectWriter(newId, DirectEntryLogger.logFilename(this.ledgerDir, newId), this.maxFileSize, this.writeExecutor, this.writeBuffers, this.nativeIO, this.slog);
        ByteBuf buf = this.allocator.buffer(4096);
        try {
            Header.writeEmptyHeader(buf);
            writer.writeAt(0L, buf);
            writer.position(buf.capacity());
        }
        finally {
            ReferenceCountUtil.release((Object)buf);
        }
        return writer;
    }

    public static File logFile(File directory, int logId) {
        return new File(directory, Long.toHexString(logId) + ".log");
    }

    public static String logFilename(File directory, int logId) {
        return DirectEntryLogger.logFile(directory, logId).toString();
    }

    @Override
    public CompactionEntryLog newCompactionLog(long srcLogId) throws IOException {
        int dstLogId = this.ids.nextId();
        return DirectCompactionEntryLog.newLog((int)srcLogId, dstLogId, this.ledgerDir, this.maxFileSize, this.writeExecutor, this.writeBuffers, this.nativeIO, this.allocator, this.slog);
    }

    @Override
    public Collection<CompactionEntryLog> incompleteCompactionLogs() {
        File[] files;
        ArrayList<CompactionEntryLog> logs = new ArrayList<CompactionEntryLog>();
        if (this.ledgerDir.exists() && this.ledgerDir.isDirectory() && (files = this.ledgerDir.listFiles()) != null && files.length > 0) {
            for (File f : files) {
                Matcher m;
                if (f.getName().endsWith(".log.compacting")) {
                    try {
                        Files.deleteIfExists(f.toPath());
                    }
                    catch (IOException ioe) {
                        this.slog.kv((Object)"file", (Object)f).warn((Enum)Events.COMPACTION_DELETE_FAILURE);
                    }
                }
                if (!(m = LedgerDirUtil.COMPACTED_FILE_PATTERN.matcher(f.getName())).matches()) continue;
                int dstLogId = Integer.parseUnsignedInt(m.group(1), 16);
                int srcLogId = Integer.parseUnsignedInt(m.group(2), 16);
                logs.add(DirectCompactionEntryLog.recoverLog(srcLogId, dstLogId, this.ledgerDir, this.readBufferSize, this.maxSaneEntrySize, this.nativeIO, this.allocator, this.stats.getReadBlockStats(), this.slog));
            }
        }
        return logs;
    }
}

