/*
 * Decompiled with CFR 0.152.
 */
package org.rx.io;

import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.StreamCorruptedException;
import java.nio.charset.StandardCharsets;
import java.nio.file.StandardOpenOption;
import java.util.AbstractSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.NonNull;
import org.apache.commons.collections4.IteratorUtils;
import org.rx.bean.$;
import org.rx.bean.AbstractMap;
import org.rx.bean.DateTime;
import org.rx.codec.CodecUtil;
import org.rx.core.Disposable;
import org.rx.core.Extends;
import org.rx.core.Linq;
import org.rx.core.Reflects;
import org.rx.core.Strings;
import org.rx.core.Sys;
import org.rx.exception.ExceptionLevel;
import org.rx.exception.InvalidException;
import org.rx.io.Compressible;
import org.rx.io.ExternalSortingIndexer;
import org.rx.io.Files;
import org.rx.io.IOStream;
import org.rx.io.KeyIndexer;
import org.rx.io.KeyValueStoreConfig;
import org.rx.io.Serializer;
import org.rx.io.WALFileStream;
import org.rx.net.http.HttpServer;
import org.rx.net.http.ServerRequest;
import org.rx.third.guava.AbstractSequentialIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KeyValueStore<TK, TV>
extends Disposable
implements AbstractMap<TK, TV> {
    private static final Logger log = LoggerFactory.getLogger(KeyValueStore.class);
    static final byte TOMB_MARK = 1;
    static final int DEFAULT_ITERATOR_SIZE = 50;
    static final String KEY_TYPE_FIELD = "_KEY_TYPE";
    static final String VALUE_TYPE_FIELD = "_VAL_TYPE";
    static final Map<Class<?>, KeyValueStore> instances = new ConcurrentHashMap();
    final KeyValueStoreConfig config;
    final File parentDirectory;
    final String logName;
    final WALFileStream wal;
    final KeyIndexer<TK> indexer;
    final Serializer serializer;
    transient HttpServer apiServer;

    public static <TK, TV> KeyValueStore<TK, TV> getInstance(Class<TK> keyType, Class<TV> valueType) {
        return instances.computeIfAbsent(keyType, k -> new KeyValueStore(KeyValueStoreConfig.newConfig(keyType, valueType)));
    }

    String getTypeId() {
        return String.format("%s:%s", this.config.getKeyType().getName(), this.config.getValueType().getName());
    }

    public KeyValueStore(KeyValueStoreConfig config) {
        this(config, Serializer.DEFAULT);
    }

    public KeyValueStore(@NonNull KeyValueStoreConfig config, @NonNull Serializer serializer) {
        if (config == null) {
            throw new NullPointerException("config is marked non-null but is null");
        }
        if (serializer == null) {
            throw new NullPointerException("serializer is marked non-null but is null");
        }
        Extends.require(config.getKeyType());
        Extends.require(config.getValueType());
        this.config = config;
        this.parentDirectory = new File(Files.createDirectory(config.getDirectoryPath()));
        String typeId = this.getTypeId();
        this.logName = String.format("%s.log", CodecUtil.hashUnsigned64(typeId));
        File logFile = new File(String.format("%s/%s", config.getDirectoryPath(), this.logName));
        this.wal = new WALFileStream(logFile, config.getLogGrowSize(), config.getLogReaderCount(), serializer);
        this.wal.setFlushDelayMillis(config.getFlushDelayMillis());
        this.wal.file.setAttribute("typeId", typeId);
        this.serializer = serializer;
        String idxName = Files.changeExtension(this.logName, "idx");
        this.indexer = new ExternalSortingIndexer(new File(String.format("%s/%s", config.getDirectoryPath(), idxName)), config.getIndexBufferSize(), config.getIndexReaderCount());
        this.wal.lock.writeInvoke(() -> {
            Entry<Object, TV> val;
            long pos = this.wal.meta.getLogPosition();
            $<Long> endPos = $.$();
            while ((val = this.unsafeRead(pos, null, endPos)) != null) {
                boolean incr = false;
                Object k = val.key;
                KeyIndexer.KeyEntity<TK> key = this.indexer.find(k);
                if (key == null) {
                    key = this.indexer.newKey(k);
                    incr = true;
                }
                if (key.logPosition == 1L) {
                    incr = true;
                }
                key.logPosition = pos;
                this.wal.meta.setLogPosition((Long)endPos.v);
                this.indexer.save(key);
                if (incr) {
                    this.wal.meta.incrementSize();
                }
                log.debug("recover {}", key);
                pos = (Long)endPos.v;
            }
        });
        if (this.wal.meta.extra == null) {
            this.wal.meta.extra = new AtomicInteger();
        }
        if (config.getApiPort() > 0) {
            this.startApiServer(config.getApiPort());
        }
    }

    @Override
    protected void freeObjects() throws Throwable {
        this.indexer.close();
        this.wal.close();
    }

    public void fastPut(@NonNull TK k, TV v) {
        if (k == null) {
            throw new NullPointerException("k is marked non-null but is null");
        }
        this.checkNotClosed();
        Entry<TK, TV> val = new Entry<TK, TV>(k, v);
        this.wal.lock.writeInvoke(() -> {
            boolean incr = false;
            KeyIndexer.KeyEntity<Object> key = this.indexer.find(k);
            if (key == null) {
                key = this.indexer.newKey(k);
                incr = true;
            }
            if (key.logPosition == 1L) {
                incr = true;
            }
            long pos = this.wal.meta.getLogPosition();
            if (key.logPosition >= 256L) {
                KeyIndexer.KeyEntity<Object> finalKey = key;
                this.wal.meta.setLogPosition(finalKey.logPosition);
                this.wal.write(1);
                this.wal.meta.setLogPosition(pos);
                log.debug("fastPut mark TOMB {} <- {}", (Object)finalKey.logPosition, (Object)pos);
            }
            key.logPosition = pos;
            this.wal.write(0);
            this.serializer.serialize(val, this.wal);
            int size = (int)(this.wal.meta.getLogPosition() - key.logPosition);
            this.wal.writeInt(size);
            this.indexer.save(key);
            if (incr) {
                this.wal.meta.incrementSize();
            }
        }, 256L);
    }

    public void fastRemove(@NonNull TK k) {
        if (k == null) {
            throw new NullPointerException("k is marked non-null but is null");
        }
        this.checkNotClosed();
        this.wal.lock.writeInvoke(() -> {
            KeyIndexer.KeyEntity<Object> key = this.indexer.find(k);
            if (key == null || key.logPosition == 1L) {
                return;
            }
            long pos = this.wal.meta.getLogPosition();
            this.wal.meta.setLogPosition(key.logPosition);
            this.wal.write(1);
            this.wal.meta.setLogPosition(pos);
            log.debug("fastRemove {}", key);
            key.logPosition = 1L;
            this.indexer.save(key);
            this.wal.meta.decrementSize();
        }, 256L);
    }

    protected TV read(@NonNull TK k) {
        if (k == null) {
            throw new NullPointerException("k is marked non-null but is null");
        }
        Entry val = this.wal.lock.readInvoke(() -> {
            KeyIndexer.KeyEntity<Object> key = this.indexer.find(k);
            if (key == null || key.logPosition == 1L) {
                return null;
            }
            return this.unsafeRead(key.logPosition, key.key, null);
        }, 256L);
        return val != null ? (TV)val.value : null;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private Entry<TK, TV> unsafeRead(long logPosition, TK k, $<Long> position) {
        this.wal.setReaderPosition(logPosition);
        try {
            int status = this.wal.read();
            if (status == 1) {
                Entry<TK, TV> entry = null;
                return entry;
            }
            Entry val = (Entry)this.serializer.deserialize(this.wal, true);
            if (k != null && !k.equals(val.key)) {
                AtomicInteger counter = (AtomicInteger)this.wal.meta.extra;
                int total = counter == null ? -1 : counter.incrementAndGet();
                log.warn("LogPosError hash collision {} total={}", k, (Object)total);
                Files.writeLines("./hc_err.log", Linq.from(String.format("%s %s hc=%s total=%s", DateTime.now(), this.logName, k, total)), StandardCharsets.UTF_8, StandardOpenOption.CREATE, StandardOpenOption.APPEND);
                Entry<TK, TV> entry = null;
                return entry;
            }
            Entry entry = val;
            return entry;
        }
        catch (Exception e) {
            if (!(e instanceof StreamCorruptedException)) throw e;
            log.warn("readValue {} {} {}", new Object[]{k == null ? "[INIT]" : k, logPosition, e.getMessage()});
            Entry<TK, TV> entry = null;
            return entry;
        }
        finally {
            long readerPosition = this.wal.getReaderPosition(true);
            if (position != null) {
                position.v = readerPosition;
            }
        }
    }

    private boolean readBackwards(IteratorContext ctx, int prefetchCount) {
        this.wal.setReaderPosition(ctx.logPos);
        return this.wal.readObjectBackwards(reader -> {
            ctx.readPos = 0;
            ctx.writePos = 0;
            int i = 0;
            while (i < prefetchCount) {
                int status;
                int size;
                long p2;
                long p1;
                long logPos;
                block8: {
                    logPos = ctx.logPos - 4L;
                    if (logPos < 256L) {
                        ctx.remaining = 0;
                        return false;
                    }
                    p1 = 0L;
                    p2 = 0L;
                    size = 0;
                    status = 0;
                    p1 = logPos;
                    reader.setPosition(logPos);
                    size = reader.readInt();
                    p2 = logPos -= (long)size;
                    reader.setPosition(logPos);
                    status = reader.read();
                    if (status != 1) break block8;
                    ctx.logPos = logPos;
                    log.debug("readBackwards prev[{}] status[{}]={} len[{}]={}", new Object[]{logPos, p2, status, p1, size});
                    continue;
                }
                try {
                    Entry val = (Entry)this.serializer.deserialize((IOStream)reader, true);
                    ctx.buf[ctx.writePos++] = val;
                    ++i;
                    ctx.logPos = logPos;
                }
                catch (Exception e) {
                    block9: {
                        Boolean bl;
                        try {
                            if (!(e instanceof StreamCorruptedException | e instanceof EOFException)) break block9;
                            log.warn("readBackwards error {}", (Object)e.getMessage());
                            ctx.remaining = 0;
                            bl = false;
                            ctx.logPos = logPos;
                        }
                        catch (Throwable throwable) {
                            ctx.logPos = logPos;
                            log.debug("readBackwards prev[{}] status[{}]={} len[{}]={}", new Object[]{logPos, p2, status, p1, size});
                            throw throwable;
                        }
                        log.debug("readBackwards prev[{}] status[{}]={} len[{}]={}", new Object[]{logPos, p2, status, p1, size});
                        return bl;
                    }
                    throw e;
                }
                log.debug("readBackwards prev[{}] status[{}]={} len[{}]={}", new Object[]{logPos, p2, status, p1, size});
            }
            return true;
        });
    }

    void startApiServer(int port) {
        this.apiServer = new HttpServer(port, this.config.isApiSsl()).requestMapping("/get", (request, response) -> {
            this.apiCheck(request);
            JSONObject reqJson = Sys.toJsonObject(request.jsonBody());
            JSONObject resJson = new JSONObject();
            Object key = reqJson.get("key");
            if (key == null) {
                resJson.put((Object)"size", (Object)this.size());
                JSONArray keys = reqJson.getJSONArray("keys");
                if (keys != null) {
                    LinkedHashMap map = new LinkedHashMap();
                    for (int i = 0; i < keys.size(); ++i) {
                        Object k = this.apiDeserialize(reqJson, KEY_TYPE_FIELD, keys.get(i));
                        map.put(k, this.get(k));
                    }
                    resJson.put((Object)"code", (Object)0);
                    resJson.put((Object)"entrySet", map);
                } else {
                    resJson.put((Object)"code", (Object)1);
                    resJson.put((Object)"entrySet", this.entrySet());
                }
                response.jsonBody(resJson);
                return;
            }
            Object k = this.apiDeserialize(reqJson, KEY_TYPE_FIELD, key);
            this.apiSerialize(resJson, VALUE_TYPE_FIELD, this.get(k));
            response.jsonBody(resJson);
        }).requestMapping("/set", (request, response) -> {
            this.apiCheck(request);
            JSONObject reqJson = Sys.toJsonObject(request.jsonBody());
            JSONObject resJson = new JSONObject();
            Object key = reqJson.get("key");
            if (key == null) {
                resJson.put((Object)"code", (Object)1);
                response.jsonBody(resJson);
                return;
            }
            Object k = this.apiDeserialize(reqJson, KEY_TYPE_FIELD, key);
            Object value = reqJson.get("value");
            Object concurrentValue = reqJson.get("concurrentValue");
            if (value == null) {
                if (concurrentValue == null) {
                    this.apiSerialize(resJson, VALUE_TYPE_FIELD, this.remove(k));
                } else {
                    this.apiSerialize(resJson, VALUE_TYPE_FIELD, this.remove(k, this.apiDeserialize(reqJson, VALUE_TYPE_FIELD, concurrentValue)));
                }
                response.jsonBody(resJson);
                return;
            }
            Object v = this.apiDeserialize(reqJson, VALUE_TYPE_FIELD, value);
            if (concurrentValue == null) {
                byte flag = Extends.ifNull(reqJson.getByte("flag"), (byte)0);
                switch (flag) {
                    case 1: {
                        this.apiSerialize(resJson, VALUE_TYPE_FIELD, this.putIfAbsent(k, v));
                        break;
                    }
                    case 2: {
                        this.apiSerialize(resJson, VALUE_TYPE_FIELD, this.replace(k, v));
                        break;
                    }
                    default: {
                        this.apiSerialize(resJson, VALUE_TYPE_FIELD, this.put((TK)k, (TV)v));
                        break;
                    }
                }
            } else {
                this.apiSerialize(resJson, VALUE_TYPE_FIELD, this.replace(k, this.apiDeserialize(reqJson, VALUE_TYPE_FIELD, concurrentValue), v));
            }
            response.jsonBody(resJson);
        });
    }

    private <T> T apiDeserialize(JSONObject reqJson, String typeField, Object obj) {
        if (obj instanceof byte[]) {
            byte[] bytes = (byte[])obj;
            return this.serializer.deserialize(IOStream.wrap(null, bytes));
        }
        String type = reqJson.getString(typeField);
        return (T)(type == null ? obj : Sys.fromJson(obj, Reflects.loadClass(type, false)));
    }

    private void apiSerialize(JSONObject resJson, String typeField, Object obj) {
        resJson.put((Object)"code", (Object)0);
        if (obj == null) {
            return;
        }
        if (this.config.isApiReturnJson()) {
            resJson.put((Object)typeField, (Object)obj.getClass().getName());
            resJson.put((Object)"value", obj);
        } else {
            resJson.put((Object)"value", (Object)this.serializer.serializeToBytes(obj));
        }
    }

    private void apiCheck(ServerRequest req) {
        if (Strings.isEmpty((CharSequence)this.config.getApiPassword())) {
            return;
        }
        if (!Extends.eq(this.config.getApiPassword(), req.getHeaders().get("apiPassword"))) {
            throw new InvalidException(ExceptionLevel.USER_OPERATION, "{} auth fail", req.getRemoteEndpoint());
        }
    }

    @Override
    public int size() {
        return this.wal.meta.getSize();
    }

    @Override
    public boolean containsKey(Object key) {
        Object k = key;
        return this.indexer.find(k) != null;
    }

    @Override
    public TV get(Object key) {
        Object k = key;
        return this.read(k);
    }

    @Override
    public TV put(TK key, TV value) {
        TV old = this.read(key);
        if (!Extends.eq(old, value)) {
            this.fastPut(key, value);
        }
        return old;
    }

    @Override
    public TV remove(Object key) {
        Object k = key;
        TV old = this.read(k);
        this.fastRemove(k);
        return old;
    }

    @Override
    public void clear() {
        this.wal.lock.writeInvoke(() -> {
            this.indexer.clear();
            this.wal.clear();
        });
    }

    @Override
    public Set<Map.Entry<TK, TV>> entrySet() {
        return this.entrySet(0, 50);
    }

    public Set<Map.Entry<TK, TV>> entrySet(int offset, int size) {
        Extends.require(offset, offset >= 0);
        Extends.require(size, size >= 0);
        return new EntrySetView(offset, size);
    }

    class EntrySetView
    extends AbstractSet<Map.Entry<TK, TV>> {
        final int offset;
        final int size;

        @Override
        public int size() {
            return KeyValueStore.this.size();
        }

        @Override
        public void clear() {
            KeyValueStore.this.clear();
        }

        @Override
        public Iterator<Map.Entry<TK, TV>> iterator() {
            int total = KeyValueStore.this.size();
            if (total <= this.offset) {
                return IteratorUtils.emptyIterator();
            }
            final IteratorContext ctx = new IteratorContext(new Entry[KeyValueStore.this.config.getIteratorPrefetchCount()]);
            if (this.offset > 0 && !KeyValueStore.this.readBackwards(ctx, this.offset)) {
                return IteratorUtils.emptyIterator();
            }
            KeyValueStore.this.readBackwards(ctx, ctx.buf.length);
            if (ctx.writePos == 0) {
                return IteratorUtils.emptyIterator();
            }
            ctx.remaining = this.size;
            return new AbstractSequentialIterator<Map.Entry<TK, TV>>(ctx.buf[ctx.readPos]){
                Map.Entry<TK, TV> current;

                @Override
                protected Map.Entry<TK, TV> computeNext(Map.Entry<TK, TV> current) {
                    Entry entry;
                    this.current = current;
                    if (--ctx.remaining <= 0) {
                        return null;
                    }
                    do {
                        if (++ctx.readPos != ctx.buf.length) continue;
                        KeyValueStore.this.readBackwards(ctx, Math.min(ctx.buf.length, ctx.remaining));
                        if (ctx.writePos != 0) continue;
                        return null;
                    } while ((entry = ctx.buf[ctx.readPos]) == null);
                    return entry;
                }

                @Override
                public void remove() {
                    KeyValueStore.this.fastRemove(this.current.getKey());
                }
            };
        }

        public EntrySetView(int offset, int size) {
            this.offset = offset;
            this.size = size;
        }
    }

    class IteratorContext {
        final Entry<TK, TV>[] buf;
        long logPos;
        int writePos;
        int readPos;
        int remaining;

        public IteratorContext(Entry<TK, TV>[] buf) {
            this.logPos = KeyValueStore.this.wal.meta.getLogPosition();
            this.buf = buf;
        }
    }

    static class Entry<TK, TV>
    implements Map.Entry<TK, TV>,
    Compressible {
        private static final long serialVersionUID = -2218602651671401557L;
        TK key;
        TV value;

        private void writeObject(ObjectOutputStream out) throws IOException {
            out.writeObject(this.key);
            out.writeObject(this.value);
        }

        private void readObject(ObjectInputStream in) throws IOException {
            try {
                this.key = in.readObject();
                this.value = in.readObject();
            }
            catch (ClassNotFoundException e) {
                log.error("readObject {}", (Object)e.getMessage());
            }
        }

        @Override
        public TV setValue(TV value) {
            throw new UnsupportedOperationException();
        }

        @Override
        public boolean enableCompress() {
            Compressible v = Extends.as(this.value, Compressible.class);
            return v != null && v.enableCompress();
        }

        public Entry(TK key, TV value) {
            this.key = key;
            this.value = value;
        }

        @Override
        public TK getKey() {
            return this.key;
        }

        @Override
        public TV getValue() {
            return this.value;
        }

        public String toString() {
            return "KeyValueStore.Entry(key=" + this.getKey() + ", value=" + this.getValue() + ")";
        }

        @Override
        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof Entry)) {
                return false;
            }
            Entry other = (Entry)o;
            if (!other.canEqual(this)) {
                return false;
            }
            TK this$key = this.getKey();
            TK other$key = other.getKey();
            if (this$key == null ? other$key != null : !this$key.equals(other$key)) {
                return false;
            }
            TV this$value = this.getValue();
            TV other$value = other.getValue();
            return !(this$value == null ? other$value != null : !this$value.equals(other$value));
        }

        protected boolean canEqual(Object other) {
            return other instanceof Entry;
        }

        @Override
        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            TK $key = this.getKey();
            result = result * 59 + ($key == null ? 43 : $key.hashCode());
            TV $value = this.getValue();
            result = result * 59 + ($value == null ? 43 : $value.hashCode());
            return result;
        }
    }
}

