/*
 * Decompiled with CFR 0.152.
 */
package com.indeed.lsmtree.recordcache;

import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.UnmodifiableIterator;
import com.indeed.lsmtree.core.StorageType;
import com.indeed.lsmtree.core.Store;
import com.indeed.lsmtree.core.StoreBuilder;
import com.indeed.lsmtree.recordcache.CacheStats;
import com.indeed.lsmtree.recordcache.Checkpoint;
import com.indeed.lsmtree.recordcache.Delete;
import com.indeed.lsmtree.recordcache.IndexReadException;
import com.indeed.lsmtree.recordcache.Operation;
import com.indeed.lsmtree.recordcache.Put;
import com.indeed.lsmtree.recordcache.RecordCache;
import com.indeed.lsmtree.recordcache.RecordLogDirectoryPoller;
import com.indeed.lsmtree.recordlog.RecordFile;
import com.indeed.lsmtree.recordlog.RecordLogDirectory;
import com.indeed.util.core.Either;
import com.indeed.util.core.threads.NamedThreadFactory;
import com.indeed.util.serialization.LongSerializer;
import com.indeed.util.serialization.Serializer;
import com.indeed.util.varexport.Export;
import fj.P;
import fj.P2;
import fj.data.Option;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.collections.comparators.ComparableComparator;
import org.apache.log4j.Logger;

public final class PersistentRecordCache<K, V>
implements RecordCache<K, V> {
    private static final Logger log = Logger.getLogger(PersistentRecordCache.class);
    private final Store<K, Long> index;
    private final RecordLogDirectory<Operation> recordLogDirectory;
    private final RecordLogDirectoryPoller.Functions indexUpdateFunctions;
    private final AtomicInteger repairedSegments = new AtomicInteger(0);
    private final Comparator<K> comparator;

    private PersistentRecordCache(final Store<K, Long> index, RecordLogDirectory<Operation> recordLogDirectory, final File checkpointDir) throws IOException {
        this.index = index;
        this.comparator = index.getComparator();
        this.recordLogDirectory = recordLogDirectory;
        this.indexUpdateFunctions = new RecordLogDirectoryPoller.Functions(){
            AtomicLong indexPutTime = new AtomicLong(0L);
            AtomicLong indexDeleteTime = new AtomicLong(0L);
            AtomicInteger indexPuts = new AtomicInteger(0);
            AtomicInteger indexDeletes = new AtomicInteger(0);
            AtomicInteger count = new AtomicInteger(0);

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void process(long position, Operation op) throws IOException {
                this.count.incrementAndGet();
                if (this.count.get() % 1000 == 0) {
                    int deletes;
                    int puts = this.indexPuts.get();
                    if (puts > 0) {
                        log.debug((Object)("avg index put time: " + (double)(this.indexPutTime.get() / (long)puts) / 1000.0 + " us"));
                    }
                    if ((deletes = this.indexDeletes.get()) > 0) {
                        log.debug((Object)("avg index delete time: " + (double)(this.indexDeleteTime.get() / (long)deletes) / 1000.0 + " us"));
                    }
                }
                if (op.getClass() == Put.class) {
                    Put put = (Put)op;
                    long start = System.nanoTime();
                    Store store = index;
                    synchronized (store) {
                        index.put(put.getKey(), (Object)position);
                    }
                    this.indexPutTime.addAndGet(System.nanoTime() - start);
                    this.indexPuts.incrementAndGet();
                } else if (op.getClass() == Delete.class) {
                    Delete delete = (Delete)op;
                    for (Object k : delete.getKeys()) {
                        long start = System.nanoTime();
                        Store store = index;
                        synchronized (store) {
                            index.delete(k);
                        }
                        this.indexDeleteTime.addAndGet(System.nanoTime() - start);
                        this.indexDeletes.incrementAndGet();
                    }
                } else if (op.getClass() == Checkpoint.class) {
                    Checkpoint checkpoint = (Checkpoint)op;
                    if (checkpointDir != null) {
                        this.sync();
                        index.checkpoint(new File(checkpointDir, String.valueOf(checkpoint.getTimestamp())));
                    }
                } else {
                    log.warn((Object)"operation class unknown");
                }
            }

            public void sync() throws IOException {
                long start = System.nanoTime();
                index.sync();
                log.debug((Object)("sync time: " + (double)(System.nanoTime() - start) / 1000.0 + " us"));
            }
        };
    }

    @Export(name="repaired-segments")
    public int getRepairedSegments() {
        return this.repairedSegments.get();
    }

    @Export(name="index-active-space-usage")
    public long getIndexActiveSpaceUsage() throws IOException {
        return this.index.getActiveSpaceUsage();
    }

    @Export(name="index-total-space-usage")
    public long getIndexTotalSpaceUsage() throws IOException {
        return this.index.getTotalSpaceUsage();
    }

    @Export(name="index-reserverd-space-usage")
    public long getIndexReservedSpaceUsage() {
        return this.index.getReservedSpaceUsage();
    }

    @Export(name="index-free-space")
    public long getIndexFreeSpace() throws IOException {
        return this.index.getFreeSpace();
    }

    @Override
    @Nullable
    public V get(@Nonnull K key, @Nonnull CacheStats cacheStats) {
        Map<K, V> results = this.getAll(Collections.singleton(key), cacheStats);
        if (results.size() > 0) {
            return results.get(key);
        }
        return null;
    }

    @Override
    @Nonnull
    public Map<K, V> getAll(@Nonnull Collection<K> keys, @Nonnull CacheStats cacheStats) {
        HashMap results = Maps.newHashMap();
        for (K key : keys) {
            try {
                Put<K, V> put;
                Long position;
                long start = System.nanoTime();
                try {
                    position = (Long)this.index.get(key);
                }
                catch (Exception e) {
                    log.error((Object)("index read error while fetching key " + key), (Throwable)e);
                    ++cacheStats.indexReadErrors;
                    throw e;
                }
                cacheStats.indexTime += System.nanoTime() - start;
                if (position == null) continue;
                try {
                    put = this.lookupAddress(cacheStats, position);
                    if (this.comparator.compare(put.getKey(), key) != 0) {
                        throw new IOException("keys do not match - expected: " + key + " actual: " + put.getKey());
                    }
                }
                catch (Exception e) {
                    log.info((Object)("exception looking up key: " + key + ", attempting repair "), (Throwable)e);
                    try {
                        this.reindex(position);
                    }
                    catch (IndexReadException e1) {
                        log.error((Object)("index read error while fetching key " + key), (Throwable)e1);
                        ++cacheStats.indexReadErrors;
                        throw e1;
                    }
                    try {
                        position = (Long)this.index.get(key);
                    }
                    catch (Exception e1) {
                        log.error((Object)("index read error while fetching key " + key), (Throwable)e1);
                        ++cacheStats.indexReadErrors;
                        throw e1;
                    }
                    put = this.lookupAddress(cacheStats, position);
                    log.info((Object)"reindex successful");
                }
                results.put(key, put.getValue());
            }
            catch (Exception e) {
                log.error((Object)("error fetching key: " + key), (Throwable)e);
                ++cacheStats.recordLogReadErrors;
            }
        }
        cacheStats.persistentStoreHits = results.size();
        log.debug((Object)("persistent store hits: " + results.size()));
        cacheStats.misses = keys.size() - results.size();
        log.debug((Object)("misses: " + (keys.size() - results.size())));
        return results;
    }

    private Put<K, V> lookupAddress(@Nullable CacheStats cacheStats, Long position) throws IOException {
        long start1 = System.nanoTime();
        Operation op = (Operation)this.recordLogDirectory.get(position.longValue());
        if (cacheStats != null) {
            cacheStats.recordLogTime += System.nanoTime() - start1;
        }
        if (op.getClass() != Put.class) {
            throw new IOException("class is not Put");
        }
        Put put = (Put)op;
        put.getValue();
        return put;
    }

    public Iterator<Either<Exception, P2<K, V>>> getStreaming(@Nonnull Iterator<K> keys, @Nullable AtomicInteger progress, @Nullable AtomicInteger skipped) {
        log.info((Object)"starting store lookups");
        LongArrayList addressList = new LongArrayList();
        int notFound = 0;
        while (keys.hasNext()) {
            Long address;
            K key = keys.next();
            try {
                address = (Long)this.index.get(key);
            }
            catch (IOException e) {
                log.error((Object)"error", (Throwable)e);
                return Iterators.singletonIterator((Object)Either.Left.of((Throwable)new IndexReadException(e)));
            }
            if (address != null) {
                addressList.add(address);
                continue;
            }
            ++notFound;
        }
        if (progress != null) {
            progress.addAndGet(notFound);
        }
        if (skipped != null) {
            skipped.addAndGet(notFound);
        }
        log.info((Object)"store lookups complete, sorting addresses");
        long[] addresses = addressList.elements();
        Arrays.sort(addresses, 0, addressList.size());
        log.info((Object)"initializing store lookup iterator");
        final ArrayBlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue<Runnable>(100);
        UnmodifiableIterator iterable = Iterators.partition((Iterator)addressList.iterator(), (int)1000);
        final ThreadPoolExecutor primerThreads = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, taskQueue, (ThreadFactory)new NamedThreadFactory("store priming thread", true, log), new RejectedExecutionHandler(){

            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                try {
                    taskQueue.put(r);
                }
                catch (InterruptedException e) {
                    log.error((Object)"error", (Throwable)e);
                    throw new RuntimeException(e);
                }
            }
        });
        final ArrayBlockingQueue completionQueue = new ArrayBlockingQueue(10);
        final AtomicLong runningTasks = new AtomicLong(0L);
        final AtomicBoolean taskSubmitterRunning = new AtomicBoolean(true);
        new Thread(new Runnable((Iterator)iterable, runningTasks, primerThreads, progress, completionQueue, taskSubmitterRunning){
            final /* synthetic */ Iterator val$iterable;
            final /* synthetic */ AtomicLong val$runningTasks;
            final /* synthetic */ ExecutorService val$primerThreads;
            final /* synthetic */ AtomicInteger val$progress;
            final /* synthetic */ BlockingQueue val$completionQueue;
            final /* synthetic */ AtomicBoolean val$taskSubmitterRunning;
            {
                this.val$iterable = iterator;
                this.val$runningTasks = atomicLong;
                this.val$primerThreads = executorService;
                this.val$progress = atomicInteger;
                this.val$completionQueue = blockingQueue;
                this.val$taskSubmitterRunning = atomicBoolean;
            }

            @Override
            public void run() {
                while (this.val$iterable.hasNext()) {
                    this.val$runningTasks.incrementAndGet();
                    List addressesSublist = (List)this.val$iterable.next();
                    this.val$primerThreads.submit(new FutureTask<List<Either<Exception, P2<K, V>>>>(new RecordLookupTask(addressesSublist)){

                        @Override
                        protected void done() {
                            try {
                                List results = (List)this.get();
                                if (val$progress != null) {
                                    val$progress.addAndGet(results.size());
                                }
                                val$completionQueue.put(results);
                            }
                            catch (InterruptedException e) {
                                log.error((Object)"error", (Throwable)e);
                                throw new RuntimeException(e);
                            }
                            catch (ExecutionException e) {
                                log.error((Object)"error", (Throwable)e);
                                throw new RuntimeException(e);
                            }
                        }
                    });
                }
                this.val$taskSubmitterRunning.set(false);
            }
        }, "RecordLookupTaskSubmitterThread").start();
        return new Iterator<Either<Exception, P2<K, V>>>(){
            Iterator<Either<Exception, P2<K, V>>> currentIterator;

            @Override
            public boolean hasNext() {
                if (this.currentIterator != null && this.currentIterator.hasNext()) {
                    return true;
                }
                while (taskSubmitterRunning.get() || runningTasks.get() > 0L) {
                    try {
                        List list = (List)completionQueue.poll(1L, TimeUnit.SECONDS);
                        if (list == null) continue;
                        log.debug((Object)("remaining: " + runningTasks.decrementAndGet()));
                        this.currentIterator = list.iterator();
                        if (!this.currentIterator.hasNext()) continue;
                        return true;
                    }
                    catch (InterruptedException e) {
                        log.error((Object)"error", (Throwable)e);
                        throw new RuntimeException(e);
                    }
                }
                primerThreads.shutdown();
                return false;
            }

            @Override
            public Either<Exception, P2<K, V>> next() {
                return this.currentIterator.next();
            }

            @Override
            public void remove() {
                throw new UnsupportedOperationException();
            }
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void reindex(long address) throws IndexReadException {
        int segmentNum = this.recordLogDirectory.getSegmentNum(address);
        try {
            Option option = this.recordLogDirectory.getFileReader((long)segmentNum);
            block11: for (RecordFile.Reader reader : option) {
                block12: while (true) {
                    while (reader.next()) {
                        Operation op = (Operation)reader.get();
                        if (op.getClass() != Put.class) continue;
                        Put put = (Put)op;
                        Object key = put.getKey();
                        long position = reader.getPosition();
                        Store<K, Long> store = this.index;
                        synchronized (store) {
                            Long currentAddress;
                            try {
                                currentAddress = (Long)this.index.get(key);
                            }
                            catch (Exception e) {
                                throw new IndexReadException(e);
                            }
                            if (currentAddress == null || currentAddress == position) {
                                continue;
                            }
                            int currentSegment = this.recordLogDirectory.getSegmentNum(currentAddress.longValue());
                            if (currentSegment == segmentNum) {
                                this.index.put(key, (Object)position);
                            }
                            continue block12;
                        }
                    }
                    continue block11;
                    {
                        continue block12;
                        break;
                    }
                    break;
                }
                finally {
                    reader.close();
                }
            }
        }
        catch (IndexReadException e) {
            log.error((Object)"error", (Throwable)e);
            throw e;
        }
        catch (Exception e) {
            log.error((Object)("error reindexing segment number " + segmentNum), (Throwable)e);
        }
        this.repairedSegments.incrementAndGet();
    }

    @Override
    public RecordLogDirectoryPoller.Functions getFunctions() {
        return this.indexUpdateFunctions;
    }

    @Override
    public void close() throws IOException {
        this.index.close();
    }

    public void waitForCompactions() throws InterruptedException {
        this.index.waitForCompactions();
    }

    public static class Builder<K, V> {
        private File indexDir;
        private File checkpointDir;
        private Serializer<K> keySerializer;
        private boolean dedicatedIndexPartition;
        private Comparator<K> comparator = new ComparableComparator();
        private RecordLogDirectory<Operation> recordLogDirectory;
        private boolean mlockIndex = false;
        private boolean mlockBloomFilters = false;
        private long bloomFilterMemory = -1L;

        public PersistentRecordCache<K, V> build() throws IOException {
            if (this.indexDir == null) {
                throw new IllegalArgumentException("indexDir must be set");
            }
            if (this.recordLogDirectory == null) {
                throw new IllegalArgumentException("fileCache must be set");
            }
            if (this.keySerializer == null) {
                throw new IllegalArgumentException("keySerializer must be set");
            }
            StoreBuilder indexBuilder = new StoreBuilder(this.indexDir, this.keySerializer, (Serializer)new LongSerializer());
            indexBuilder.setMaxVolatileGenerationSize(0x800000L);
            indexBuilder.setStorageType(StorageType.INLINE);
            indexBuilder.setComparator(this.comparator);
            indexBuilder.setDedicatedPartition(this.dedicatedIndexPartition);
            indexBuilder.setMlockFiles(this.mlockIndex);
            indexBuilder.setMlockBloomFilters(this.mlockBloomFilters);
            if (this.bloomFilterMemory >= 0L) {
                indexBuilder.setBloomFilterMemory(this.bloomFilterMemory);
            }
            Store index = indexBuilder.build();
            return new PersistentRecordCache(index, this.recordLogDirectory, this.checkpointDir);
        }

        public Builder<K, V> setIndexDir(File indexDir) {
            this.indexDir = indexDir;
            return this;
        }

        public Builder<K, V> setKeySerializer(Serializer<K> keySerializer) {
            this.keySerializer = keySerializer;
            return this;
        }

        public Builder<K, V> setComparator(Comparator<K> comparator) {
            this.comparator = comparator;
            return this;
        }

        public Builder<K, V> setRecordLogDirectory(RecordLogDirectory<Operation> recordLogDirectory) {
            this.recordLogDirectory = recordLogDirectory;
            return this;
        }

        public Builder<K, V> setCheckpointDir(File checkpointDir) {
            this.checkpointDir = checkpointDir;
            return this;
        }

        public boolean isDedicatedIndexPartition() {
            return this.dedicatedIndexPartition;
        }

        public Builder<K, V> setDedicatedIndexPartition(boolean dedicatedIndexPartition) {
            this.dedicatedIndexPartition = dedicatedIndexPartition;
            return this;
        }

        public boolean isMlockIndex() {
            return this.mlockIndex;
        }

        public Builder<K, V> setMlockIndex(boolean mlockIndex) {
            this.mlockIndex = mlockIndex;
            return this;
        }

        public boolean isMlockBloomFilters() {
            return this.mlockBloomFilters;
        }

        public Builder<K, V> setMlockBloomFilters(boolean mlockBloomFilters) {
            this.mlockBloomFilters = mlockBloomFilters;
            return this;
        }

        public long getBloomFilterMemory() {
            return this.bloomFilterMemory;
        }

        public Builder<K, V> setBloomFilterMemory(long bloomFilterMemory) {
            this.bloomFilterMemory = bloomFilterMemory;
            return this;
        }
    }

    private final class RecordLookupTask
    implements Callable<List<Either<Exception, P2<K, V>>>> {
        private final List<Long> addresses;

        private RecordLookupTask(List<Long> addresses) {
            this.addresses = addresses;
        }

        @Override
        public List<Either<Exception, P2<K, V>>> call() {
            ArrayList ret = Lists.newArrayList();
            for (Long address : this.addresses) {
                try {
                    Put put = null;
                    try {
                        put = PersistentRecordCache.this.lookupAddress(null, address);
                    }
                    catch (Exception e) {
                        log.info((Object)("exception looking up address: " + address + ", attempting repair"), (Throwable)e);
                        PersistentRecordCache.this.reindex(address);
                        log.info((Object)"reindex successful");
                    }
                    if (put != null) {
                        ret.add(Either.Right.of((Object)P.p(put.getKey(), put.getValue())));
                        continue;
                    }
                    throw new IOException("record for address " + address + " does not exist for some reason");
                }
                catch (Exception e) {
                    ret.add(Either.Left.of((Throwable)e));
                }
            }
            return ret;
        }
    }
}

