package io.activej.crdt.storage.local;

import io.activej.async.service.EventloopService;
import io.activej.bytebuf.ByteBuf;
import io.activej.bytebuf.ByteBufPool;
import io.activej.common.MemSize;
import io.activej.crdt.CrdtData;
import io.activej.crdt.CrdtException;
import io.activej.crdt.function.CrdtFilter;
import io.activej.crdt.function.CrdtFunction;
import io.activej.crdt.primitives.CrdtType;
import io.activej.crdt.storage.CrdtStorage;
import io.activej.crdt.util.CrdtDataSerializer;
import io.activej.csp.ChannelConsumer;
import io.activej.csp.ChannelSupplier;
import io.activej.datastream.StreamConsumer;
import io.activej.datastream.StreamSupplier;
import io.activej.datastream.stats.StreamStats;
import io.activej.datastream.stats.StreamStatsBasic;
import io.activej.datastream.stats.StreamStatsDetailed;
import io.activej.eventloop.Eventloop;
import io.activej.eventloop.jmx.EventloopJmxBeanWithStats;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.api.attribute.JmxOperation;
import io.activej.jmx.stats.EventStats;
import io.activej.promise.Promise;
import io.activej.serializer.BinaryInput;
import io.activej.serializer.BinarySerializer;
import java.lang.Comparable;
import java.time.Duration;
import java.util.concurrent.Executor;
import org.jetbrains.annotations.NotNull;
import org.rocksdb.Comparator;
import org.rocksdb.ComparatorOptions;
import org.rocksdb.FlushOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Slice;
import org.rocksdb.WriteOptions;

/* loaded from: input_file:io/activej/crdt/storage/local/CrdtStorageRocksDB.class */
public final class CrdtStorageRocksDB<K extends Comparable<K>, S> implements CrdtStorage<K, S>, EventloopService, EventloopJmxBeanWithStats {
    private final Eventloop eventloop;
    private final Executor executor;
    private final RocksDB db;
    private final CrdtFunction<S> function;
    private final BinarySerializer<K> keySerializer;
    private final BinarySerializer<S> stateSerializer;
    private boolean detailedStats;
    private MemSize bufferSize = MemSize.kilobytes(16);
    private CrdtFilter<S> filter = obj -> {
        return true;
    };
    private final StreamStatsBasic<CrdtData<K, S>> uploadStats = StreamStats.basic();
    private final StreamStatsDetailed<CrdtData<K, S>> uploadStatsDetailed = StreamStats.detailed();
    private final StreamStatsBasic<CrdtData<K, S>> downloadStats = StreamStats.basic();
    private final StreamStatsDetailed<CrdtData<K, S>> downloadStatsDetailed = StreamStats.detailed();
    private final StreamStatsBasic<K> removeStats = StreamStats.basic();
    private final StreamStatsDetailed<K> removeStatsDetailed = StreamStats.detailed();
    private final EventStats singlePuts = EventStats.create(Duration.ofMinutes(5));
    private final EventStats singleGets = EventStats.create(Duration.ofMinutes(5));
    private final EventStats singleRemoves = EventStats.create(Duration.ofMinutes(5));
    private final FlushOptions flushOptions = new FlushOptions();
    private final WriteOptions writeOptions = new WriteOptions().setDisableWAL(true);

    /* loaded from: input_file:io/activej/crdt/storage/local/CrdtStorageRocksDB$KeyComparator.class */
    public static class KeyComparator<K extends Comparable<K>> extends Comparator {
        private final ComparatorOptions copt;
        private final BinarySerializer<K> keySerializer;

        public KeyComparator(ComparatorOptions comparatorOptions, BinarySerializer<K> binarySerializer) {
            super(comparatorOptions);
            this.copt = comparatorOptions;
            this.keySerializer = binarySerializer;
        }

        public KeyComparator(BinarySerializer<K> binarySerializer) {
            this(new ComparatorOptions(), binarySerializer);
        }

        public String name() {
            return "CRDT key comparator";
        }

        public int compare(Slice slice, Slice slice2) {
            return ((Comparable) this.keySerializer.decode((byte[]) slice.data(), 0)).compareTo((Comparable) this.keySerializer.decode((byte[]) slice2.data(), 0));
        }

        public void close() {
            super.close();
            this.copt.close();
        }
    }

    private CrdtStorageRocksDB(Eventloop eventloop, Executor executor, RocksDB rocksDB, BinarySerializer<K> binarySerializer, BinarySerializer<S> binarySerializer2, CrdtFunction<S> crdtFunction) {
        this.eventloop = eventloop;
        this.executor = executor;
        this.db = rocksDB;
        this.function = crdtFunction;
        this.keySerializer = binarySerializer;
        this.stateSerializer = binarySerializer2;
    }

    public static <K extends Comparable<K>, S> CrdtStorageRocksDB<K, S> create(Eventloop eventloop, Executor executor, RocksDB rocksDB, CrdtDataSerializer<K, S> crdtDataSerializer, CrdtFunction<S> crdtFunction) {
        return new CrdtStorageRocksDB<>(eventloop, executor, rocksDB, crdtDataSerializer.getKeySerializer(), crdtDataSerializer.getStateSerializer(), crdtFunction);
    }

    public static <K extends Comparable<K>, S extends CrdtType<S>> CrdtStorageRocksDB<K, S> create(Eventloop eventloop, Executor executor, RocksDB rocksDB, CrdtDataSerializer<K, S> crdtDataSerializer) {
        return new CrdtStorageRocksDB<>(eventloop, executor, rocksDB, crdtDataSerializer.getKeySerializer(), crdtDataSerializer.getStateSerializer(), CrdtFunction.ofCrdtType());
    }

    public CrdtStorageRocksDB<K, S> withBufferSize(MemSize memSize) {
        this.bufferSize = memSize;
        return this;
    }

    public CrdtStorageRocksDB<K, S> withFilter(CrdtFilter<S> crdtFilter) {
        this.filter = crdtFilter;
        return this;
    }

    public RocksDB getDb() {
        return this.db;
    }

    @NotNull
    public Eventloop getEventloop() {
        return this.eventloop;
    }

    public Promise<Void> flush() {
        return Promise.ofBlockingRunnable(this.executor, () -> {
            try {
                this.db.flush(this.flushOptions);
            } catch (RocksDBException e) {
                throw new CrdtException("Flush failed", e);
            }
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void doPut(K k, S s) throws CrdtException {
        ByteBuf allocate = ByteBufPool.allocate(this.bufferSize);
        allocate.tail(this.keySerializer.encode(allocate.array(), allocate.tail(), k));
        byte[] array = allocate.getArray();
        try {
            byte[] bArr = this.db.get(array);
            if (bArr != null) {
                s = this.function.merge(s, this.stateSerializer.decode(bArr, 0));
                if (!this.filter.test(s)) {
                    try {
                        this.db.delete(array);
                        allocate.recycle();
                        return;
                    } catch (RocksDBException e) {
                        throw new CrdtException("Failed to delete key: " + k, e);
                    }
                }
            }
            allocate.rewind();
            allocate.tail(this.stateSerializer.encode(allocate.array(), allocate.tail(), s));
            try {
                this.db.put(this.writeOptions, array, allocate.asArray());
            } catch (RocksDBException e2) {
                throw new CrdtException("Failed to put key: " + k, e2);
            }
        } catch (RocksDBException e3) {
            throw new CrdtException("Failed to get key: " + k, e3);
        }
    }

    private void doRemove(K k) throws CrdtException {
        ByteBuf allocate = ByteBufPool.allocate(this.bufferSize);
        allocate.tail(this.keySerializer.encode(allocate.array(), allocate.tail(), k));
        try {
            this.db.delete(this.writeOptions, allocate.asArray());
        } catch (RocksDBException e) {
            throw new CrdtException("Failed  to remove key: " + k, e);
        }
    }

    @Override // io.activej.crdt.storage.CrdtStorage
    public Promise<StreamConsumer<CrdtData<K, S>>> upload() {
        return Promise.of(StreamConsumer.ofChannelConsumer(((ChannelConsumer) ChannelConsumer.of(crdtData -> {
            return Promise.ofBlockingRunnable(this.executor, () -> {
                doPut(crdtData.getKey(), crdtData.getState());
            });
        }).transformWith(this.detailedStats ? this.uploadStatsDetailed : this.uploadStats)).withAcknowledgement(promise -> {
            return promise.then(this::flush);
        })));
    }

    @Override // io.activej.crdt.storage.CrdtStorage
    public Promise<StreamSupplier<CrdtData<K, S>>> download(long j) {
        return Promise.ofBlockingCallable(this.executor, () -> {
            RocksIterator newIterator = this.db.newIterator();
            newIterator.seekToFirst();
            return newIterator;
        }).map(rocksIterator -> {
            return (StreamSupplier) StreamSupplier.ofChannelSupplier(ChannelSupplier.of(() -> {
                return Promise.ofBlockingCallable(this.executor, () -> {
                    while (rocksIterator.isValid()) {
                        byte[] key = rocksIterator.key();
                        byte[] value = rocksIterator.value();
                        rocksIterator.next();
                        Object extract = this.function.extract(this.stateSerializer.decode(value, 0), j);
                        if (extract != null) {
                            return new CrdtData((Comparable) this.keySerializer.decode(key, 0), extract);
                        }
                    }
                    return null;
                });
            })).transformWith(this.detailedStats ? this.downloadStatsDetailed : this.downloadStats);
        });
    }

    @Override // io.activej.crdt.storage.CrdtStorage
    public Promise<StreamConsumer<K>> remove() {
        return Promise.of(StreamConsumer.ofChannelConsumer(((ChannelConsumer) ChannelConsumer.of(comparable -> {
            return Promise.ofBlockingRunnable(this.executor, () -> {
                doRemove(comparable);
            });
        }).transformWith(this.detailedStats ? this.removeStatsDetailed : this.removeStats)).withAcknowledgement(promise -> {
            return promise.then(this::flush);
        })));
    }

    @Override // io.activej.crdt.storage.CrdtStorage
    public Promise<Void> ping() {
        return Promise.complete();
    }

    @NotNull
    public Promise<Void> start() {
        return Promise.complete();
    }

    @NotNull
    public Promise<Void> stop() {
        return Promise.complete();
    }

    public Promise<S> get(K k) {
        return Promise.ofBlockingCallable(this.executor, () -> {
            ByteBuf allocate = ByteBufPool.allocate(this.bufferSize);
            this.keySerializer.encode(allocate.array(), allocate.head(), k);
            try {
                byte[] bArr = this.db.get(allocate.asArray());
                if (bArr == null) {
                    return null;
                }
                this.singleGets.recordEvent();
                return this.stateSerializer.decode(new BinaryInput(bArr));
            } catch (RocksDBException e) {
                throw new CrdtException("Failed to get value at key: " + k, e);
            }
        });
    }

    public Promise<Void> put(K k, S s) {
        return Promise.ofBlockingRunnable(this.executor, () -> {
            doPut(k, s);
            this.singlePuts.recordEvent();
        });
    }

    public Promise<Void> remove(K k) {
        return Promise.ofBlockingRunnable(this.executor, () -> {
            doRemove(k);
            this.singleRemoves.recordEvent();
        });
    }

    @JmxOperation
    public void startDetailedMonitoring() {
        this.detailedStats = true;
    }

    @JmxOperation
    public void stopDetailedMonitoring() {
        this.detailedStats = false;
    }

    @JmxAttribute
    public StreamStatsBasic getUploadStats() {
        return this.uploadStats;
    }

    @JmxAttribute
    public StreamStatsDetailed getUploadStatsDetailed() {
        return this.uploadStatsDetailed;
    }

    @JmxAttribute
    public StreamStatsBasic getDownloadStats() {
        return this.downloadStats;
    }

    @JmxAttribute
    public StreamStatsDetailed getDownloadStatsDetailed() {
        return this.downloadStatsDetailed;
    }

    @JmxAttribute
    public StreamStatsBasic getRemoveStats() {
        return this.removeStats;
    }

    @JmxAttribute
    public StreamStatsDetailed getRemoveStatsDetailed() {
        return this.removeStatsDetailed;
    }

    @JmxAttribute
    public EventStats getSinglePuts() {
        return this.singlePuts;
    }

    @JmxAttribute
    public EventStats getSingleGets() {
        return this.singleGets;
    }

    @JmxAttribute
    public EventStats getSingleRemoves() {
        return this.singleRemoves;
    }
}
