package io.activej.crdt.wal;

import io.activej.async.function.AsyncRunnable;
import io.activej.async.function.AsyncRunnables;
import io.activej.async.service.ReactiveService;
import io.activej.async.util.LogUtils;
import io.activej.common.Checks;
import io.activej.common.builder.AbstractBuilder;
import io.activej.common.time.CurrentTimeProvider;
import io.activej.crdt.CrdtData;
import io.activej.crdt.function.CrdtFunction;
import io.activej.crdt.primitives.CrdtType;
import io.activej.crdt.storage.ICrdtStorage;
import io.activej.datastream.supplier.StreamSuppliers;
import io.activej.promise.Promise;
import io.activej.reactor.AbstractReactive;
import io.activej.reactor.Reactive;
import io.activej.reactor.Reactor;
import java.lang.Comparable;
import java.util.Map;
import java.util.TreeMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/activej/crdt/wal/InMemoryWriteAheadLog.class */
public final class InMemoryWriteAheadLog<K extends Comparable<K>, S> extends AbstractReactive implements IWriteAheadLog<K, S>, ReactiveService {
    private static final Logger logger = LoggerFactory.getLogger(InMemoryWriteAheadLog.class);
    private static final boolean CHECKS = Checks.isEnabled(InMemoryWriteAheadLog.class);
    private Map<K, CrdtData<K, S>> map;
    private final CrdtFunction<S> function;
    private final ICrdtStorage<K, S> storage;
    private final AsyncRunnable flush;
    private CurrentTimeProvider now;

    /* loaded from: input_file:io/activej/crdt/wal/InMemoryWriteAheadLog$Builder.class */
    public final class Builder extends AbstractBuilder<InMemoryWriteAheadLog<K, S>.Builder, InMemoryWriteAheadLog<K, S>> {
        private Builder() {
        }

        public InMemoryWriteAheadLog<K, S>.Builder withCurrentTimeProvider(CurrentTimeProvider currentTimeProvider) {
            checkNotBuilt(this);
            InMemoryWriteAheadLog.this.now = currentTimeProvider;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: doBuild, reason: merged with bridge method [inline-methods] */
        public InMemoryWriteAheadLog<K, S> m37doBuild() {
            return InMemoryWriteAheadLog.this;
        }
    }

    private InMemoryWriteAheadLog(Reactor reactor, CrdtFunction<S> crdtFunction, ICrdtStorage<K, S> iCrdtStorage) {
        super(reactor);
        this.map = new TreeMap();
        this.flush = AsyncRunnables.coalesce(this::doFlush);
        this.now = CurrentTimeProvider.ofSystem();
        this.function = crdtFunction;
        this.storage = iCrdtStorage;
    }

    public static <K extends Comparable<K>, S> InMemoryWriteAheadLog<K, S> create(Reactor reactor, CrdtFunction<S> crdtFunction, ICrdtStorage<K, S> iCrdtStorage) {
        return (InMemoryWriteAheadLog) builder(reactor, crdtFunction, iCrdtStorage).build();
    }

    public static <K extends Comparable<K>, S extends CrdtType<S>> InMemoryWriteAheadLog<K, S> create(Reactor reactor, ICrdtStorage<K, S> iCrdtStorage) {
        return (InMemoryWriteAheadLog) builder(reactor, CrdtFunction.ofCrdtType(), iCrdtStorage).build();
    }

    public static <K extends Comparable<K>, S> InMemoryWriteAheadLog<K, S>.Builder builder(Reactor reactor, CrdtFunction<S> crdtFunction, ICrdtStorage<K, S> iCrdtStorage) {
        return new Builder();
    }

    public static <K extends Comparable<K>, S extends CrdtType<S>> InMemoryWriteAheadLog<K, S>.Builder builder(Reactor reactor, ICrdtStorage<K, S> iCrdtStorage) {
        return new Builder();
    }

    @Override // io.activej.crdt.wal.IWriteAheadLog
    public Promise<Void> put(K k, S s) {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        if (logger.isTraceEnabled()) {
            logger.trace("{} value for key {}", this.map.containsKey(k) ? "Merging" : "Putting new", k);
        }
        doPut(k, new CrdtData<>(k, this.now.currentTimeMillis(), s));
        return Promise.complete();
    }

    @Override // io.activej.crdt.wal.IWriteAheadLog
    public Promise<Void> flush() {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        return this.flush.run();
    }

    private Promise<Void> doFlush() {
        if (this.map.isEmpty()) {
            logger.info("Nothing to flush");
            return Promise.complete();
        }
        Map<K, CrdtData<K, S>> map = this.map;
        this.map = new TreeMap();
        return this.storage.upload().then(streamConsumer -> {
            return StreamSuppliers.ofIterable(map.values()).streamTo(streamConsumer);
        }).whenException(exc -> {
            map.forEach(this::doPut);
        }).whenComplete(LogUtils.toLogger(logger, LogUtils.Level.INFO, LogUtils.Level.INFO, "flush", new Object[]{Integer.valueOf(map.size())}));
    }

    public Promise<?> start() {
        Reactive.checkInReactorThread(this);
        return Promise.complete();
    }

    public Promise<?> stop() {
        Reactive.checkInReactorThread(this);
        return flush();
    }

    private void doPut(K k, CrdtData<K, S> crdtData) {
        this.map.merge(k, crdtData, (crdtData2, crdtData3) -> {
            return new CrdtData(k, Math.max(crdtData2.getTimestamp(), crdtData3.getTimestamp()), this.function.merge(crdtData2.getState(), crdtData2.getTimestamp(), crdtData3.getState(), crdtData3.getTimestamp()));
        });
    }
}
