package io.activej.crdt.storage.local;

import io.activej.async.service.ReactiveService;
import io.activej.common.ApplicationSettings;
import io.activej.common.Checks;
import io.activej.common.builder.AbstractBuilder;
import io.activej.common.time.CurrentTimeProvider;
import io.activej.crdt.CrdtData;
import io.activej.crdt.CrdtException;
import io.activej.crdt.CrdtTombstone;
import io.activej.crdt.function.CrdtFilter;
import io.activej.crdt.function.CrdtFunction;
import io.activej.crdt.primitives.CrdtType;
import io.activej.crdt.storage.ICrdtStorage;
import io.activej.crdt.util.Utils;
import io.activej.datastream.consumer.StreamConsumer;
import io.activej.datastream.consumer.ToListStreamConsumer;
import io.activej.datastream.stats.BasicStreamStats;
import io.activej.datastream.stats.DetailedStreamStats;
import io.activej.datastream.stats.StreamStats;
import io.activej.datastream.supplier.StreamSupplier;
import io.activej.datastream.supplier.StreamSuppliers;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.api.attribute.JmxOperation;
import io.activej.jmx.stats.EventStats;
import io.activej.promise.Promise;
import io.activej.reactor.AbstractReactive;
import io.activej.reactor.Reactive;
import io.activej.reactor.Reactor;
import io.activej.reactor.jmx.ReactiveJmxBeanWithStats;
import java.lang.Comparable;
import java.time.Duration;
import java.util.Iterator;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.stream.Stream;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:io/activej/crdt/storage/local/MapCrdtStorage.class */
public final class MapCrdtStorage<K extends Comparable<K>, S> extends AbstractReactive implements ICrdtStorage<K, S>, ReactiveService, ReactiveJmxBeanWithStats {
    private static final boolean CHECKS;
    public static final Duration DEFAULT_SMOOTHING_WINDOW;
    private final CrdtFunction<S> function;
    private final NavigableMap<K, CrdtData<K, S>> map;
    private final NavigableMap<K, CrdtTombstone<K>> tombstones;
    private NavigableMap<K, CrdtData<K, S>> takenMap;
    private NavigableMap<K, CrdtTombstone<K>> takenTombstones;
    private CrdtFilter<S> filter;
    private CurrentTimeProvider now;
    private boolean detailedStats;
    private final BasicStreamStats<CrdtData<K, S>> uploadStats;
    private final DetailedStreamStats<CrdtData<K, S>> uploadStatsDetailed;
    private final BasicStreamStats<CrdtData<K, S>> downloadStats;
    private final DetailedStreamStats<CrdtData<K, S>> downloadStatsDetailed;
    private final BasicStreamStats<CrdtData<K, S>> takeStats;
    private final DetailedStreamStats<CrdtData<K, S>> takeStatsDetailed;
    private final BasicStreamStats<CrdtTombstone<K>> removeStats;
    private final DetailedStreamStats<CrdtTombstone<K>> removeStatsDetailed;
    private final EventStats uploadedItems;
    private final EventStats downloadedItems;
    private final EventStats takenItems;
    private final EventStats removedItems;
    private final EventStats singlePuts;
    private final EventStats singleGets;
    private final EventStats singleRemoves;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:io/activej/crdt/storage/local/MapCrdtStorage$Builder.class */
    public final class Builder extends AbstractBuilder<MapCrdtStorage<K, S>.Builder, MapCrdtStorage<K, S>> {
        private Builder() {
        }

        public MapCrdtStorage<K, S>.Builder withFilter(CrdtFilter<S> crdtFilter) {
            checkNotBuilt(this);
            MapCrdtStorage.this.filter = crdtFilter;
            return this;
        }

        public MapCrdtStorage<K, S>.Builder withCurrentTimeProvide(CurrentTimeProvider currentTimeProvider) {
            checkNotBuilt(this);
            MapCrdtStorage.this.now = currentTimeProvider;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: doBuild, reason: merged with bridge method [inline-methods] */
        public MapCrdtStorage<K, S> m28doBuild() {
            return MapCrdtStorage.this;
        }
    }

    private MapCrdtStorage(Reactor reactor, CrdtFunction<S> crdtFunction) {
        super(reactor);
        this.map = new TreeMap();
        this.tombstones = new TreeMap();
        this.filter = obj -> {
            return true;
        };
        this.now = CurrentTimeProvider.ofSystem();
        this.uploadStats = StreamStats.basic();
        this.uploadStatsDetailed = StreamStats.detailed();
        this.downloadStats = StreamStats.basic();
        this.downloadStatsDetailed = StreamStats.detailed();
        this.takeStats = StreamStats.basic();
        this.takeStatsDetailed = StreamStats.detailed();
        this.removeStats = StreamStats.basic();
        this.removeStatsDetailed = StreamStats.detailed();
        this.uploadedItems = EventStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.downloadedItems = EventStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.takenItems = EventStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.removedItems = EventStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.singlePuts = EventStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.singleGets = EventStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.singleRemoves = EventStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.function = crdtFunction;
    }

    public static <K extends Comparable<K>, S> MapCrdtStorage<K, S> create(Reactor reactor, CrdtFunction<S> crdtFunction) {
        return (MapCrdtStorage) builder(reactor, crdtFunction).build();
    }

    public static <K extends Comparable<K>, S extends CrdtType<S>> MapCrdtStorage<K, S> create(Reactor reactor) {
        return (MapCrdtStorage) builder(reactor).build();
    }

    public static <K extends Comparable<K>, S> MapCrdtStorage<K, S>.Builder builder(Reactor reactor, CrdtFunction<S> crdtFunction) {
        return new Builder();
    }

    public static <K extends Comparable<K>, S extends CrdtType<S>> MapCrdtStorage<K, S>.Builder builder(Reactor reactor) {
        return new Builder();
    }

    @Override // io.activej.crdt.storage.ICrdtStorage
    public Promise<StreamConsumer<CrdtData<K, S>>> upload() {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        ToListStreamConsumer create = ToListStreamConsumer.create();
        StreamConsumer streamConsumer = (StreamConsumer) create.withAcknowledgement(promise -> {
            return promise.whenResult(() -> {
                create.getList().forEach(this::doPut);
            }).mapException(exc -> {
                return new CrdtException("Error while uploading CRDT data", exc);
            });
        }).transformWith(this.detailedStats ? this.uploadStatsDetailed : this.uploadStats);
        EventStats eventStats = this.uploadedItems;
        Objects.requireNonNull(eventStats);
        return Promise.of((StreamConsumer) streamConsumer.transformWith(Utils.onItem(eventStats::recordEvent)));
    }

    @Override // io.activej.crdt.storage.ICrdtStorage
    public Promise<StreamSupplier<CrdtData<K, S>>> download(long j) {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        StreamSupplier streamSupplier = (StreamSupplier) StreamSuppliers.ofStream(extract(j)).transformWith(this.detailedStats ? this.downloadStatsDetailed : this.downloadStats);
        EventStats eventStats = this.downloadedItems;
        Objects.requireNonNull(eventStats);
        return Promise.of(((StreamSupplier) streamSupplier.transformWith(Utils.onItem(eventStats::recordEvent))).withEndOfStream(promise -> {
            return promise.mapException(exc -> {
                return new CrdtException("Error while downloading CRDT data", exc);
            });
        }));
    }

    @Override // io.activej.crdt.storage.ICrdtStorage
    public Promise<StreamSupplier<CrdtData<K, S>>> take() {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        if (this.takenMap != null) {
            if ($assertionsDisabled || this.takenTombstones != null) {
                return Promise.ofException(new CrdtException("Data is already being taken"));
            }
            throw new AssertionError();
        }
        this.takenMap = new TreeMap((SortedMap) this.map);
        this.takenTombstones = new TreeMap((SortedMap) this.tombstones);
        this.map.clear();
        this.tombstones.clear();
        StreamSupplier streamSupplier = (StreamSupplier) StreamSuppliers.ofIterable(this.takenMap.values()).transformWith(this.detailedStats ? this.takeStatsDetailed : this.takeStats);
        EventStats eventStats = this.takenItems;
        Objects.requireNonNull(eventStats);
        StreamSupplier streamSupplier2 = (StreamSupplier) streamSupplier.transformWith(Utils.onItem(eventStats::recordEvent));
        streamSupplier2.getAcknowledgement().whenResult(() -> {
            this.takenMap = null;
            this.takenTombstones = null;
        }).mapException(exc -> {
            this.takenMap = (NavigableMap) io.activej.common.Utils.nullify(this.takenMap, navigableMap -> {
                navigableMap.values().forEach(this::doPut);
            });
            this.takenTombstones = (NavigableMap) io.activej.common.Utils.nullify(this.takenTombstones, navigableMap2 -> {
                navigableMap2.values().forEach(this::doRemove);
            });
            return new CrdtException("Error while downloading CRDT data", exc);
        });
        return Promise.of(streamSupplier2);
    }

    @Override // io.activej.crdt.storage.ICrdtStorage
    public Promise<StreamConsumer<CrdtTombstone<K>>> remove() {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        ToListStreamConsumer create = ToListStreamConsumer.create();
        StreamConsumer streamConsumer = (StreamConsumer) create.withAcknowledgement(promise -> {
            return promise.whenResult(() -> {
                create.getList().forEach(this::doRemove);
            }).mapException(exc -> {
                return new CrdtException("Error while removing CRDT data", exc);
            });
        }).transformWith(this.detailedStats ? this.removeStatsDetailed : this.removeStats);
        EventStats eventStats = this.removedItems;
        Objects.requireNonNull(eventStats);
        return Promise.of((StreamConsumer) streamConsumer.transformWith(Utils.onItem(eventStats::recordEvent)));
    }

    @Override // io.activej.crdt.storage.ICrdtStorage
    public Promise<Void> ping() {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        return Promise.complete();
    }

    public Promise<?> start() {
        Reactive.checkInReactorThread(this);
        return Promise.complete();
    }

    public Promise<?> stop() {
        Reactive.checkInReactorThread(this);
        return Promise.complete();
    }

    private Stream<CrdtData<K, S>> extract(long j) {
        NavigableMap<K, CrdtData<K, S>> treeMap;
        if (this.takenMap == null) {
            treeMap = this.map;
        } else {
            treeMap = new TreeMap();
            doMerge(treeMap, this.map);
            doMerge(treeMap, this.takenMap);
        }
        return treeMap.values().stream().filter(crdtData -> {
            return crdtData.getTimestamp() >= j;
        });
    }

    private void doMerge(Map<K, CrdtData<K, S>> map, Map<K, CrdtData<K, S>> map2) {
        if (!$assertionsDisabled && this.takenTombstones == null) {
            throw new AssertionError();
        }
        for (Map.Entry<K, CrdtData<K, S>> entry : map2.entrySet()) {
            K key = entry.getKey();
            CrdtData<K, S> value = entry.getValue();
            CrdtTombstone crdtTombstone = (CrdtTombstone) this.tombstones.get(key);
            if (crdtTombstone == null || crdtTombstone.getTimestamp() < value.getTimestamp()) {
                CrdtTombstone crdtTombstone2 = (CrdtTombstone) this.takenTombstones.get(key);
                if (crdtTombstone2 == null || crdtTombstone2.getTimestamp() < value.getTimestamp()) {
                    map.merge(key, value, (crdtData, crdtData2) -> {
                        return crdtData.getTimestamp() > crdtData2.getTimestamp() ? crdtData : crdtData2;
                    });
                }
            }
        }
    }

    private void doPut(CrdtData<K, S> crdtData) {
        K key = crdtData.getKey();
        CrdtTombstone crdtTombstone = (CrdtTombstone) this.tombstones.get(key);
        if (crdtTombstone != null) {
            if (crdtTombstone.getTimestamp() >= crdtData.getTimestamp()) {
                return;
            } else {
                this.tombstones.remove(key);
            }
        }
        this.map.merge(key, crdtData, (crdtData2, crdtData3) -> {
            Object merge = this.function.merge(crdtData2.getState(), crdtData2.getTimestamp(), crdtData3.getState(), crdtData3.getTimestamp());
            long max = Math.max(crdtData2.getTimestamp(), crdtData3.getTimestamp());
            if (this.filter.test(merge)) {
                return new CrdtData(key, max, merge);
            }
            return null;
        });
    }

    private boolean doRemove(CrdtTombstone<K> crdtTombstone) {
        K key = crdtTombstone.getKey();
        CrdtData crdtData = (CrdtData) this.map.get(key);
        if (crdtData != null) {
            if (crdtData.getTimestamp() > crdtTombstone.getTimestamp()) {
                return false;
            }
            this.map.remove(key);
        }
        this.tombstones.merge(key, crdtTombstone, (crdtTombstone2, crdtTombstone3) -> {
            return new CrdtTombstone(key, Math.max(crdtTombstone2.getTimestamp(), crdtTombstone3.getTimestamp()));
        });
        return true;
    }

    public void put(K k, S s) {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        put(new CrdtData<>(k, this.now.currentTimeMillis(), s));
    }

    public void put(CrdtData<K, S> crdtData) {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        this.singlePuts.recordEvent();
        doPut(crdtData);
    }

    @Nullable
    public S get(K k) {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        this.singleGets.recordEvent();
        CrdtData crdtData = (CrdtData) this.map.get(k);
        if (crdtData != null) {
            return (S) crdtData.getState();
        }
        return null;
    }

    public boolean remove(K k) {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        return remove((CrdtTombstone) new CrdtTombstone<>(k, this.now.currentTimeMillis()));
    }

    public boolean remove(CrdtTombstone<K> crdtTombstone) {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        this.singleRemoves.recordEvent();
        return doRemove(crdtTombstone);
    }

    public Iterator<CrdtData<K, S>> iterator(long j) {
        final Iterator<CrdtData<K, S>> it = extract(j).iterator();
        return (Iterator<CrdtData<K, S>>) new Iterator<CrdtData<K, S>>() { // from class: io.activej.crdt.storage.local.MapCrdtStorage.1
            private CrdtData<K, S> current;

            @Override // java.util.Iterator
            public boolean hasNext() {
                return it.hasNext();
            }

            @Override // java.util.Iterator
            public CrdtData<K, S> next() {
                CrdtData<K, S> crdtData = (CrdtData) it.next();
                this.current = crdtData;
                return crdtData;
            }

            @Override // java.util.Iterator
            public void remove() {
                if (this.current != null) {
                    MapCrdtStorage.this.remove((MapCrdtStorage) this.current.getKey());
                }
                it.remove();
            }
        };
    }

    public Iterator<CrdtData<K, S>> iterator() {
        return iterator(0L);
    }

    @JmxOperation
    public void startDetailedMonitoring() {
        this.detailedStats = true;
    }

    @JmxOperation
    public void stopDetailedMonitoring() {
        this.detailedStats = false;
    }

    @JmxAttribute
    public boolean isDetailedStats() {
        return this.detailedStats;
    }

    @JmxAttribute
    public BasicStreamStats getUploadStats() {
        return this.uploadStats;
    }

    @JmxAttribute
    public DetailedStreamStats getUploadStatsDetailed() {
        return this.uploadStatsDetailed;
    }

    @JmxAttribute
    public BasicStreamStats getDownloadStats() {
        return this.downloadStats;
    }

    @JmxAttribute
    public DetailedStreamStats getDownloadStatsDetailed() {
        return this.downloadStatsDetailed;
    }

    @JmxAttribute
    public BasicStreamStats getTakeStats() {
        return this.takeStats;
    }

    @JmxAttribute
    public DetailedStreamStats getTakeStatsDetailed() {
        return this.takeStatsDetailed;
    }

    @JmxAttribute
    public BasicStreamStats getRemoveStats() {
        return this.removeStats;
    }

    @JmxAttribute
    public DetailedStreamStats getRemoveStatsDetailed() {
        return this.removeStatsDetailed;
    }

    @JmxAttribute
    public EventStats getSinglePuts() {
        return this.singlePuts;
    }

    @JmxAttribute
    public EventStats getSingleGets() {
        return this.singleGets;
    }

    @JmxAttribute
    public EventStats getSingleRemoves() {
        return this.singleRemoves;
    }

    @JmxAttribute
    public EventStats getUploadedItems() {
        return this.uploadedItems;
    }

    @JmxAttribute
    public EventStats getDownloadedItems() {
        return this.downloadedItems;
    }

    @JmxAttribute
    public EventStats getTakenItems() {
        return this.takenItems;
    }

    @JmxAttribute
    public EventStats getRemovedItems() {
        return this.removedItems;
    }

    static {
        $assertionsDisabled = !MapCrdtStorage.class.desiredAssertionStatus();
        CHECKS = Checks.isEnabled(MapCrdtStorage.class);
        DEFAULT_SMOOTHING_WINDOW = ApplicationSettings.getDuration(MapCrdtStorage.class, "smoothingWindow", Duration.ofMinutes(1L));
    }
}
