package io.activej.crdt.storage.cluster;

import io.activej.async.function.AsyncFunction;
import io.activej.async.function.AsyncSupplier;
import io.activej.async.service.ReactiveService;
import io.activej.common.ApplicationSettings;
import io.activej.common.Checks;
import io.activej.common.builder.AbstractBuilder;
import io.activej.common.collection.Try;
import io.activej.crdt.CrdtData;
import io.activej.crdt.CrdtException;
import io.activej.crdt.CrdtTombstone;
import io.activej.crdt.RemoteCrdtStorage;
import io.activej.crdt.function.CrdtFunction;
import io.activej.crdt.storage.ICrdtStorage;
import io.activej.crdt.storage.cluster.IDiscoveryService;
import io.activej.crdt.util.Utils;
import io.activej.datastream.consumer.StreamConsumer;
import io.activej.datastream.processor.StreamSplitter;
import io.activej.datastream.processor.reducer.BinaryAccumulatorReducer;
import io.activej.datastream.processor.reducer.StreamReducer;
import io.activej.datastream.stats.BasicStreamStats;
import io.activej.datastream.stats.DetailedStreamStats;
import io.activej.datastream.stats.StreamStats;
import io.activej.datastream.supplier.StreamSupplier;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.api.attribute.JmxOperation;
import io.activej.jmx.stats.EventStats;
import io.activej.promise.Promise;
import io.activej.promise.Promises;
import io.activej.reactor.AbstractReactive;
import io.activej.reactor.Reactive;
import io.activej.reactor.Reactor;
import io.activej.reactor.jmx.ReactiveJmxBeanWithStats;
import java.lang.Comparable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.jetbrains.annotations.VisibleForTesting;

/* loaded from: input_file:io/activej/crdt/storage/cluster/ClusterCrdtStorage.class */
public final class ClusterCrdtStorage<K extends Comparable<K>, S, P> extends AbstractReactive implements ICrdtStorage<K, S>, ReactiveService, ReactiveJmxBeanWithStats {
    private static final boolean CHECKS = Checks.isEnabled(ClusterCrdtStorage.class);
    public static final Duration DEFAULT_SMOOTHING_WINDOW = ApplicationSettings.getDuration(ClusterCrdtStorage.class, "smoothingWindow", Duration.ofMinutes(1));
    private final IDiscoveryService<P> discoveryService;
    private final CrdtFunction<S> crdtFunction;
    private final Map<P, ICrdtStorage<K, S>> crdtStorages;
    private IDiscoveryService.PartitionScheme<P> currentPartitionScheme;
    private boolean forceStart;
    private boolean stopped;
    private boolean detailedStats;
    private final BasicStreamStats<CrdtData<K, S>> uploadStats;
    private final DetailedStreamStats<CrdtData<K, S>> uploadStatsDetailed;
    private final BasicStreamStats<CrdtData<K, S>> downloadStats;
    private final DetailedStreamStats<CrdtData<K, S>> downloadStatsDetailed;
    private final BasicStreamStats<CrdtData<K, S>> takeStats;
    private final DetailedStreamStats<CrdtData<K, S>> takeStatsDetailed;
    private final BasicStreamStats<CrdtTombstone<K>> removeStats;
    private final DetailedStreamStats<CrdtTombstone<K>> removeStatsDetailed;
    private final BasicStreamStats<CrdtData<K, S>> repartitionUploadStats;
    private final DetailedStreamStats<CrdtData<K, S>> repartitionUploadStatsDetailed;
    private final EventStats uploadedItems;
    private final EventStats downloadedItems;
    private final EventStats takenItems;
    private final EventStats removedItems;
    private final EventStats repartitionedItems;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.activej.crdt.storage.cluster.ClusterCrdtStorage$1Tuple, reason: invalid class name */
    /* loaded from: input_file:io/activej/crdt/storage/cluster/ClusterCrdtStorage$1Tuple.class */
    public class C1Tuple {
        private final Try<StreamSupplier<CrdtData<K, S>>> downloader;
        private final Map<P, StreamConsumer<CrdtData<K, S>>> uploaders;

        public C1Tuple(Try<StreamSupplier<CrdtData<K, S>>> r5, Map<P, StreamConsumer<CrdtData<K, S>>> map) {
            this.downloader = r5;
            this.uploaders = map;
        }

        private void close() {
            this.downloader.ifSuccess((v0) -> {
                v0.close();
            });
            this.uploaders.values().forEach((v0) -> {
                v0.close();
            });
        }
    }

    /* loaded from: input_file:io/activej/crdt/storage/cluster/ClusterCrdtStorage$Builder.class */
    public final class Builder extends AbstractBuilder<ClusterCrdtStorage<K, S, P>.Builder, ClusterCrdtStorage<K, S, P>> {
        private Builder() {
        }

        public ClusterCrdtStorage<K, S, P>.Builder withForceStart(boolean z) {
            checkNotBuilt(this);
            ClusterCrdtStorage.this.forceStart = z;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: doBuild, reason: merged with bridge method [inline-methods] */
        public ClusterCrdtStorage<K, S, P> m18doBuild() {
            return ClusterCrdtStorage.this;
        }
    }

    private ClusterCrdtStorage(Reactor reactor, IDiscoveryService<P> iDiscoveryService, CrdtFunction<S> crdtFunction) {
        super(reactor);
        this.crdtStorages = new LinkedHashMap();
        this.uploadStats = StreamStats.basic();
        this.uploadStatsDetailed = StreamStats.detailed();
        this.downloadStats = StreamStats.basic();
        this.downloadStatsDetailed = StreamStats.detailed();
        this.takeStats = StreamStats.basic();
        this.takeStatsDetailed = StreamStats.detailed();
        this.removeStats = StreamStats.basic();
        this.removeStatsDetailed = StreamStats.detailed();
        this.repartitionUploadStats = StreamStats.basic();
        this.repartitionUploadStatsDetailed = StreamStats.detailed();
        this.uploadedItems = EventStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.downloadedItems = EventStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.takenItems = EventStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.removedItems = EventStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.repartitionedItems = EventStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.discoveryService = iDiscoveryService;
        this.crdtFunction = crdtFunction;
    }

    public static <K extends Comparable<K>, S, P> ClusterCrdtStorage<K, S, P> create(Reactor reactor, IDiscoveryService<P> iDiscoveryService, CrdtFunction<S> crdtFunction) {
        return (ClusterCrdtStorage) builder(reactor, iDiscoveryService, crdtFunction).build();
    }

    public static <K extends Comparable<K>, S, P> ClusterCrdtStorage<K, S, P>.Builder builder(Reactor reactor, IDiscoveryService<P> iDiscoveryService, CrdtFunction<S> crdtFunction) {
        return new Builder();
    }

    public Promise<?> start() {
        Reactive.checkInReactorThread(this);
        AsyncSupplier<IDiscoveryService.PartitionScheme<P>> discover = this.discoveryService.discover();
        return discover.get().then(partitionScheme -> {
            updatePartitionScheme(partitionScheme);
            return ping().thenCallback((r5, exc, settableCallback) -> {
                if ((exc instanceof CrdtException) && this.forceStart) {
                    settableCallback.set((Object) null);
                } else {
                    settableCallback.set(r5, exc);
                }
            });
        }).whenResult(() -> {
            Promises.repeat(() -> {
                return discover.get().map((partitionScheme2, exc) -> {
                    if (this.stopped) {
                        return false;
                    }
                    if (exc == null) {
                        updatePartitionScheme(partitionScheme2);
                    }
                    return true;
                });
            });
        });
    }

    public Promise<?> stop() {
        Reactive.checkInReactorThread(this);
        this.stopped = true;
        return Promise.complete();
    }

    @Override // io.activej.crdt.storage.ICrdtStorage
    public Promise<StreamConsumer<CrdtData<K, S>>> upload() {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        IDiscoveryService.PartitionScheme<P> partitionScheme = this.currentPartitionScheme;
        return execute(partitionScheme, (v0) -> {
            return v0.upload();
        }).then(map -> {
            ArrayList arrayList = new ArrayList(map.keySet());
            Sharder<K> createSharder = partitionScheme.createSharder(arrayList);
            if (createSharder == null) {
                CrdtException crdtException = new CrdtException("Incomplete cluster");
                Iterator it = map.values().iterator();
                while (it.hasNext()) {
                    ((StreamConsumer) it.next()).closeEx(crdtException);
                }
                throw crdtException;
            }
            StreamSplitter create = StreamSplitter.create((crdtData, streamDataAcceptorArr) -> {
                for (int i : createSharder.shard(crdtData.getKey())) {
                    streamDataAcceptorArr[i].accept(crdtData);
                }
            });
            Iterator<P> it2 = arrayList.iterator();
            while (it2.hasNext()) {
                create.newOutput().streamTo((StreamConsumer) map.get(it2.next()));
            }
            StreamConsumer streamConsumer = (StreamConsumer) create.getInput().transformWith(this.detailedStats ? this.uploadStatsDetailed : this.uploadStats);
            EventStats eventStats = this.uploadedItems;
            Objects.requireNonNull(eventStats);
            return Promise.of((StreamConsumer) streamConsumer.transformWith(Utils.onItem(eventStats::recordEvent)));
        });
    }

    @Override // io.activej.crdt.storage.ICrdtStorage
    public Promise<StreamSupplier<CrdtData<K, S>>> download(long j) {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        return getData(iCrdtStorage -> {
            return iCrdtStorage.download(j);
        }).map(streamSupplier -> {
            StreamSupplier streamSupplier = (StreamSupplier) streamSupplier.transformWith(this.detailedStats ? this.downloadStatsDetailed : this.downloadStats);
            EventStats eventStats = this.downloadedItems;
            Objects.requireNonNull(eventStats);
            return (StreamSupplier) streamSupplier.transformWith(Utils.onItem(eventStats::recordEvent));
        });
    }

    @Override // io.activej.crdt.storage.ICrdtStorage
    public Promise<StreamSupplier<CrdtData<K, S>>> take() {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        return getData((v0) -> {
            return v0.take();
        }).map(streamSupplier -> {
            StreamSupplier streamSupplier = (StreamSupplier) streamSupplier.transformWith(this.detailedStats ? this.takeStatsDetailed : this.takeStats);
            EventStats eventStats = this.takenItems;
            Objects.requireNonNull(eventStats);
            return (StreamSupplier) streamSupplier.transformWith(Utils.onItem(eventStats::recordEvent));
        });
    }

    @Override // io.activej.crdt.storage.ICrdtStorage
    public Promise<StreamConsumer<CrdtTombstone<K>>> remove() {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        IDiscoveryService.PartitionScheme<P> partitionScheme = this.currentPartitionScheme;
        return execute(partitionScheme, (v0) -> {
            return v0.remove();
        }).map(map -> {
            ArrayList arrayList = new ArrayList(map.keySet());
            Sharder<K> createSharder = partitionScheme.createSharder(arrayList);
            if (createSharder == null) {
                CrdtException crdtException = new CrdtException("Incomplete cluster");
                Iterator it = map.values().iterator();
                while (it.hasNext()) {
                    ((StreamConsumer) it.next()).closeEx(crdtException);
                }
                throw crdtException;
            }
            StreamSplitter create = StreamSplitter.create((crdtTombstone, streamDataAcceptorArr) -> {
                for (int i : createSharder.shard(crdtTombstone.getKey())) {
                    streamDataAcceptorArr[i].accept(crdtTombstone);
                }
            });
            Iterator<P> it2 = arrayList.iterator();
            while (it2.hasNext()) {
                create.newOutput().streamTo((StreamConsumer) map.get(it2.next()));
            }
            StreamConsumer streamConsumer = (StreamConsumer) create.getInput().transformWith(this.detailedStats ? this.removeStatsDetailed : this.removeStats);
            EventStats eventStats = this.removedItems;
            Objects.requireNonNull(eventStats);
            return (StreamConsumer) streamConsumer.transformWith(Utils.onItem(eventStats::recordEvent));
        });
    }

    public Promise<Void> repartition(P p) {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        IDiscoveryService.PartitionScheme<P> partitionScheme = this.currentPartitionScheme;
        return Promises.toTuple((r7, map) -> {
            return new C1Tuple(r7, map);
        }, this.crdtStorages.get(p).take().toTry(), execute(partitionScheme, (v0) -> {
            return v0.upload();
        })).whenResult(c1Tuple -> {
            if (!c1Tuple.uploaders.containsKey(p)) {
                c1Tuple.close();
                throw new CrdtException("Could not upload to local storage");
            }
            if (c1Tuple.uploaders.size() == 1) {
                c1Tuple.close();
                throw new CrdtException("Nowhere to upload");
            }
            if (c1Tuple.downloader.isException()) {
                c1Tuple.close();
                throw new CrdtException("Could not download local data", c1Tuple.downloader.getException());
            }
        }).then(c1Tuple2 -> {
            ArrayList arrayList = new ArrayList(c1Tuple2.uploaders.keySet());
            Sharder<K> createSharder = partitionScheme.createSharder(arrayList);
            if (createSharder == null) {
                c1Tuple2.close();
                return Promise.ofException(new CrdtException("Incomplete cluster"));
            }
            StreamSplitter create = StreamSplitter.create((crdtData, streamDataAcceptorArr) -> {
                for (int i : createSharder.shard(crdtData.getKey())) {
                    streamDataAcceptorArr[i].accept(crdtData);
                }
            });
            StreamConsumer input = create.getInput();
            StreamSupplier streamSupplier = (StreamSupplier) c1Tuple2.downloader.get();
            for (P p2 : arrayList) {
                StreamSupplier streamSupplier2 = (StreamSupplier) create.newOutput().transformWith(this.detailedStats ? this.repartitionUploadStatsDetailed : this.repartitionUploadStats);
                EventStats eventStats = this.repartitionedItems;
                Objects.requireNonNull(eventStats);
                ((StreamSupplier) streamSupplier2.transformWith(Utils.onItem(eventStats::recordEvent))).streamTo(c1Tuple2.uploaders.get(p2));
            }
            return streamSupplier.streamTo(input);
        });
    }

    @Override // io.activej.crdt.storage.ICrdtStorage
    public Promise<Void> ping() {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        IDiscoveryService.PartitionScheme<P> partitionScheme = this.currentPartitionScheme;
        return execute(partitionScheme, (v0) -> {
            return v0.ping();
        }).whenResult(map -> {
            if (partitionScheme.createSharder(new ArrayList(map.keySet())) == null) {
                throw new CrdtException("Incomplete cluster");
            }
        }).toVoid();
    }

    private <T> Promise<Map<P, T>> execute(IDiscoveryService.PartitionScheme<P> partitionScheme, AsyncFunction<ICrdtStorage<K, S>, T> asyncFunction) {
        Set<P> partitions = partitionScheme.getPartitions();
        HashMap hashMap = new HashMap();
        return Promises.all(partitions.stream().map(obj -> {
            return asyncFunction.apply(this.crdtStorages.get(obj)).map((obj, exc) -> {
                if (exc == null) {
                    return hashMap.put(obj, obj);
                }
                return null;
            });
        })).map(r3 -> {
            return hashMap;
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    private Promise<StreamSupplier<CrdtData<K, S>>> getData(AsyncFunction<ICrdtStorage<K, S>, StreamSupplier<CrdtData<K, S>>> asyncFunction) {
        IDiscoveryService.PartitionScheme<P> partitionScheme = this.currentPartitionScheme;
        return execute(partitionScheme, asyncFunction).map(map -> {
            if (partitionScheme.isReadValid(map.keySet())) {
                StreamReducer create = StreamReducer.create();
                Iterator it = map.keySet().iterator();
                while (it.hasNext()) {
                    ((StreamSupplier) map.get(it.next())).streamTo(create.newInput((v0) -> {
                        return v0.getKey();
                    }, new BinaryAccumulatorReducer<K, CrdtData<K, S>>() { // from class: io.activej.crdt.storage.cluster.ClusterCrdtStorage.1
                        /* JADX INFO: Access modifiers changed from: protected */
                        public CrdtData<K, S> combine(K k, CrdtData<K, S> crdtData, CrdtData<K, S> crdtData2) {
                            return new CrdtData<>(k, Math.max(crdtData.getTimestamp(), crdtData2.getTimestamp()), ClusterCrdtStorage.this.crdtFunction.merge(crdtData2.getState(), crdtData2.getTimestamp(), crdtData.getState(), crdtData.getTimestamp()));
                        }
                    }));
                }
                return create.getOutput();
            }
            CrdtException crdtException = new CrdtException("Incomplete cluster");
            Iterator it2 = map.values().iterator();
            while (it2.hasNext()) {
                ((StreamSupplier) it2.next()).closeEx(crdtException);
            }
            throw crdtException;
        });
    }

    private void updatePartitionScheme(IDiscoveryService.PartitionScheme<P> partitionScheme) {
        this.currentPartitionScheme = partitionScheme;
        this.crdtStorages.keySet().retainAll(partitionScheme.getPartitions());
        for (P p : partitionScheme.getPartitions()) {
            Map<P, ICrdtStorage<K, S>> map = this.crdtStorages;
            Objects.requireNonNull(partitionScheme);
            map.computeIfAbsent(p, partitionScheme::provideCrdtConnection);
        }
    }

    @VisibleForTesting
    Map<P, ICrdtStorage<K, S>> getCrdtStorages() {
        return this.crdtStorages;
    }

    @JmxAttribute
    public boolean isDetailedStats() {
        return this.detailedStats;
    }

    @JmxOperation
    public void startDetailedMonitoring() {
        this.detailedStats = true;
        for (ICrdtStorage<K, S> iCrdtStorage : this.crdtStorages.values()) {
            if (iCrdtStorage instanceof RemoteCrdtStorage) {
                ((RemoteCrdtStorage) iCrdtStorage).startDetailedMonitoring();
            }
        }
    }

    @JmxOperation
    public void stopDetailedMonitoring() {
        this.detailedStats = false;
        for (ICrdtStorage<K, S> iCrdtStorage : this.crdtStorages.values()) {
            if (iCrdtStorage instanceof RemoteCrdtStorage) {
                ((RemoteCrdtStorage) iCrdtStorage).stopDetailedMonitoring();
            }
        }
    }

    @JmxAttribute
    public BasicStreamStats getUploadStats() {
        return this.uploadStats;
    }

    @JmxAttribute
    public DetailedStreamStats getUploadStatsDetailed() {
        return this.uploadStatsDetailed;
    }

    @JmxAttribute
    public BasicStreamStats getDownloadStats() {
        return this.downloadStats;
    }

    @JmxAttribute
    public DetailedStreamStats getDownloadStatsDetailed() {
        return this.downloadStatsDetailed;
    }

    @JmxAttribute
    public BasicStreamStats getTakeStats() {
        return this.takeStats;
    }

    @JmxAttribute
    public DetailedStreamStats getTakeStatsDetailed() {
        return this.takeStatsDetailed;
    }

    @JmxAttribute
    public BasicStreamStats getRemoveStats() {
        return this.removeStats;
    }

    @JmxAttribute
    public DetailedStreamStats getRemoveStatsDetailed() {
        return this.removeStatsDetailed;
    }

    @JmxAttribute
    public BasicStreamStats getRepartitionUploadStats() {
        return this.repartitionUploadStats;
    }

    @JmxAttribute
    public DetailedStreamStats getRepartitionUploadStatsDetailed() {
        return this.repartitionUploadStatsDetailed;
    }

    @JmxAttribute
    public Map<P, RemoteCrdtStorage> getCrdtStorageClients() {
        return (Map) this.crdtStorages.entrySet().stream().filter(entry -> {
            return entry.getValue() instanceof RemoteCrdtStorage;
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry2 -> {
            return (RemoteCrdtStorage) entry2.getValue();
        }));
    }

    @JmxAttribute
    public EventStats getUploadedItems() {
        return this.uploadedItems;
    }

    @JmxAttribute
    public EventStats getDownloadedItems() {
        return this.downloadedItems;
    }

    @JmxAttribute
    public EventStats getTakenItems() {
        return this.takenItems;
    }

    @JmxAttribute
    public EventStats getRemovedItems() {
        return this.removedItems;
    }

    @JmxAttribute
    public EventStats getRepartitionedItems() {
        return this.repartitionedItems;
    }
}
