package io.activej.crdt.storage.cluster;

import io.activej.async.function.AsyncFunction;
import io.activej.async.process.AsyncCloseable;
import io.activej.async.service.EventloopService;
import io.activej.common.Checks;
import io.activej.common.collection.Try;
import io.activej.common.initializer.WithInitializer;
import io.activej.common.ref.RefInt;
import io.activej.crdt.CrdtData;
import io.activej.crdt.CrdtException;
import io.activej.crdt.function.CrdtFilter;
import io.activej.crdt.function.CrdtFunction;
import io.activej.crdt.primitives.CrdtType;
import io.activej.crdt.storage.CrdtStorage;
import io.activej.crdt.util.RendezvousHashSharder;
import io.activej.datastream.StreamConsumer;
import io.activej.datastream.StreamSupplier;
import io.activej.datastream.processor.StreamReducer;
import io.activej.datastream.processor.StreamReducers;
import io.activej.datastream.processor.StreamSplitter;
import io.activej.datastream.stats.StreamStats;
import io.activej.datastream.stats.StreamStatsBasic;
import io.activej.datastream.stats.StreamStatsDetailed;
import io.activej.eventloop.Eventloop;
import io.activej.eventloop.jmx.EventloopJmxBeanWithStats;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.api.attribute.JmxOperation;
import io.activej.promise.Promise;
import io.activej.promise.Promises;
import java.lang.Comparable;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:io/activej/crdt/storage/cluster/CrdtStorageCluster.class */
public final class CrdtStorageCluster<K extends Comparable<K>, S, P extends Comparable<P>> implements CrdtStorage<K, S>, WithInitializer<CrdtStorageCluster<K, S, P>>, EventloopService, EventloopJmxBeanWithStats {
    private final CrdtPartitions<K, S, P> partitions;
    private final CrdtFunction<S> function;
    private CrdtFilter<S> filter;
    private boolean detailedStats;
    private int deadPartitionsThreshold = 0;
    private int replicationCount = 1;
    private final StreamStatsBasic<CrdtData<K, S>> uploadStats = StreamStats.basic();
    private final StreamStatsDetailed<CrdtData<K, S>> uploadStatsDetailed = StreamStats.detailed();
    private final StreamStatsBasic<CrdtData<K, S>> downloadStats = StreamStats.basic();
    private final StreamStatsDetailed<CrdtData<K, S>> downloadStatsDetailed = StreamStats.detailed();
    private final StreamStatsBasic<K> removeStats = StreamStats.basic();
    private final StreamStatsDetailed<K> removeStatsDetailed = StreamStats.detailed();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/activej/crdt/storage/cluster/CrdtStorageCluster$Container.class */
    public final class Container<T extends AsyncCloseable> {
        final P id;
        final T value;

        Container(P p, T t) {
            this.id = p;
            this.value = t;
        }
    }

    private CrdtStorageCluster(CrdtPartitions<K, S, P> crdtPartitions, CrdtFunction<S> crdtFunction) {
        this.partitions = crdtPartitions;
        this.function = crdtFunction;
    }

    public static <K extends Comparable<K>, S, P extends Comparable<P>> CrdtStorageCluster<K, S, P> create(CrdtPartitions<K, S, P> crdtPartitions, CrdtFunction<S> crdtFunction) {
        return new CrdtStorageCluster<>(crdtPartitions, crdtFunction);
    }

    public static <K extends Comparable<K>, S extends CrdtType<S>, P extends Comparable<P>> CrdtStorageCluster<K, S, P> create(CrdtPartitions<K, S, P> crdtPartitions) {
        return new CrdtStorageCluster<>(crdtPartitions, CrdtFunction.ofCrdtType());
    }

    public CrdtStorageCluster<K, S, P> withReplicationCount(int i) {
        Checks.checkArgument(1 <= i, "Replication count cannot be less than one");
        this.deadPartitionsThreshold = i - 1;
        this.replicationCount = i;
        this.partitions.setTopShards(i);
        return this;
    }

    public CrdtStorageCluster<K, S, P> withFilter(CrdtFilter<S> crdtFilter) {
        this.filter = crdtFilter;
        return this;
    }

    public CrdtStorageCluster<K, S, P> withPersistenceOptions(int i, int i2) {
        Checks.checkArgument(0 <= i && i < this.partitions.getPartitions().size(), "Dead partitions threshold cannot be less than zero or greater than number of partitions");
        Checks.checkArgument(0 <= i2, "Number of upload targets should not be less than zero");
        Checks.checkArgument(i2 <= this.partitions.getPartitions().size(), "Number of upload targets should not exceed total number of partitions");
        this.deadPartitionsThreshold = i;
        this.replicationCount = i2;
        this.partitions.setTopShards(i2);
        return this;
    }

    @NotNull
    public Eventloop getEventloop() {
        return this.partitions.getEventloop();
    }

    public CrdtPartitions<K, S, P> getPartitions() {
        return this.partitions;
    }

    private <T extends AsyncCloseable> Promise<List<CrdtStorageCluster<K, S, P>.Container<T>>> connect(AsyncFunction<CrdtStorage<K, S>, T> asyncFunction) {
        return Promises.toList(this.partitions.getAlivePartitions().entrySet().stream().map(entry -> {
            return asyncFunction.apply((CrdtStorage) entry.getValue()).map(asyncCloseable -> {
                return new Container((Comparable) entry.getKey(), asyncCloseable);
            }).whenException(exc -> {
                this.partitions.markDead((Comparable) entry.getKey(), exc);
            }).toTry();
        })).map(this::checkStillNotDead).map(list -> {
            List list = (List) list.stream().filter((v0) -> {
                return v0.isSuccess();
            }).map((v0) -> {
                return v0.get();
            }).collect(Collectors.toList());
            if (list.isEmpty()) {
                throw new CrdtException("No successful connections");
            }
            return list;
        });
    }

    @Override // io.activej.crdt.storage.CrdtStorage
    public Promise<StreamConsumer<CrdtData<K, S>>> upload() {
        return connect((v0) -> {
            return v0.upload();
        }).then(list -> {
            RendezvousHashSharder<P> sharder = this.partitions.getSharder();
            StreamSplitter<T, T> create = StreamSplitter.create((crdtData, streamDataAcceptorArr) -> {
                for (int i : sharder.shard(crdtData.getKey())) {
                    streamDataAcceptorArr[i].accept(crdtData);
                }
            });
            RefInt refInt = tolerantSplit(list, sharder, create);
            return Promise.of(((StreamConsumer) create.getInput().transformWith(this.detailedStats ? this.uploadStatsDetailed : this.uploadStats)).withAcknowledgement(promise -> {
                return promise.mapException(exc -> {
                    return new CrdtException("Cluster 'upload' failed", exc);
                }).whenResult(r6 -> {
                    return list.size() - refInt.value < this.replicationCount;
                }, () -> {
                    throw new CrdtException("Failed to upload data to the required number of partitions");
                });
            }));
        });
    }

    @Override // io.activej.crdt.storage.CrdtStorage
    public Promise<StreamSupplier<CrdtData<K, S>>> download(long j) {
        return connect(crdtStorage -> {
            return crdtStorage.download(j);
        }).map(list -> {
            StreamReducer<K, CrdtData<K, S>, CrdtData<K, S>> create = StreamReducer.create();
            RefInt refInt = tolerantReduce(list, create);
            return ((StreamSupplier) create.getOutput().transformWith(this.detailedStats ? this.downloadStatsDetailed : this.downloadStats)).withEndOfStream(promise -> {
                return promise.mapException(exc -> {
                    return new CrdtException("Cluster 'download' failed", exc);
                }).whenResult(() -> {
                    if ((this.partitions.getPartitions().size() - list.size()) + refInt.get() >= this.replicationCount) {
                        throw new CrdtException("Failed to download from the required number of partitions");
                    }
                });
            });
        });
    }

    @Override // io.activej.crdt.storage.CrdtStorage
    public Promise<StreamConsumer<K>> remove() {
        return connect((v0) -> {
            return v0.remove();
        }).map(list -> {
            RendezvousHashSharder<P> create = RendezvousHashSharder.create((Set) list.stream().map(container -> {
                return container.id;
            }).collect(Collectors.toSet()), list.size());
            StreamSplitter<T, T> create2 = StreamSplitter.create((comparable, streamDataAcceptorArr) -> {
                for (int i : create.shard(comparable)) {
                    streamDataAcceptorArr[i].accept(comparable);
                }
            });
            RefInt refInt = tolerantSplit(list, create, create2);
            return ((StreamConsumer) create2.getInput().transformWith(this.detailedStats ? this.removeStatsDetailed : this.removeStats)).withAcknowledgement(promise -> {
                return promise.mapException(exc -> {
                    return new CrdtException("Cluster 'remove' failed", exc);
                }).whenResult(() -> {
                    if ((this.partitions.getPartitions().size() - list.size()) + refInt.get() != 0) {
                        throw new CrdtException("Failed to remove items from all partitions");
                    }
                });
            });
        });
    }

    @Override // io.activej.crdt.storage.CrdtStorage
    public Promise<Void> ping() {
        return Promise.complete();
    }

    @NotNull
    public Promise<Void> start() {
        return Promise.complete();
    }

    @NotNull
    public Promise<Void> stop() {
        return Promise.complete();
    }

    private <T extends AsyncCloseable> List<Try<CrdtStorageCluster<K, S, P>.Container<T>>> checkStillNotDead(List<Try<CrdtStorageCluster<K, S, P>.Container<T>>> list) throws CrdtException {
        Map<P, CrdtStorage<K, S>> deadPartitions = this.partitions.getDeadPartitions();
        if (deadPartitions.size() <= this.deadPartitionsThreshold) {
            return list;
        }
        CrdtException crdtException = new CrdtException("There are more dead partitions than allowed(" + deadPartitions.size() + " dead, threshold is " + this.deadPartitionsThreshold + "), aborting");
        list.stream().filter((v0) -> {
            return v0.isSuccess();
        }).map((v0) -> {
            return v0.get();
        }).forEach(container -> {
            container.value.closeEx(crdtException);
        });
        throw crdtException;
    }

    private <T> RefInt tolerantSplit(List<CrdtStorageCluster<K, S, P>.Container<StreamConsumer<T>>> list, RendezvousHashSharder<P> rendezvousHashSharder, StreamSplitter<T, T> streamSplitter) {
        RefInt refInt = new RefInt(0);
        for (CrdtStorageCluster<K, S, P>.Container<StreamConsumer<T>> container : list) {
            Objects.requireNonNull(streamSplitter);
            streamSplitter.addOutput(new StreamSplitter<T, T>.Output(streamSplitter, container, rendezvousHashSharder, refInt, list, streamSplitter) { // from class: io.activej.crdt.storage.cluster.CrdtStorageCluster.1
                final /* synthetic */ Container val$container;
                final /* synthetic */ RendezvousHashSharder val$sharder;
                final /* synthetic */ RefInt val$failed;
                final /* synthetic */ List val$containers;
                final /* synthetic */ StreamSplitter val$splitter;

                /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                {
                    super(streamSplitter);
                    this.val$container = container;
                    this.val$sharder = rendezvousHashSharder;
                    this.val$failed = refInt;
                    this.val$containers = list;
                    this.val$splitter = streamSplitter;
                    Objects.requireNonNull(streamSplitter);
                }

                protected void onError(Exception exc) {
                    CrdtStorageCluster.this.partitions.markDead(this.val$container.id, exc);
                    this.val$sharder.recompute(CrdtStorageCluster.this.partitions.getAlivePartitions().keySet());
                    RefInt refInt2 = this.val$failed;
                    int i = refInt2.value + 1;
                    refInt2.value = i;
                    if (i == this.val$containers.size()) {
                        this.val$splitter.getInput().closeEx(exc);
                    } else {
                        complete();
                        sync();
                    }
                }

                protected boolean canProceed() {
                    return isReady() || isComplete();
                }
            }).streamTo(container.value);
        }
        return refInt;
    }

    private RefInt tolerantReduce(List<CrdtStorageCluster<K, S, P>.Container<StreamSupplier<CrdtData<K, S>>>> list, StreamReducer<K, CrdtData<K, S>, CrdtData<K, S>> streamReducer) {
        RefInt refInt = new RefInt(0);
        for (CrdtStorageCluster<K, S, P>.Container<StreamSupplier<CrdtData<K, S>>> container : list) {
            StreamReducers.Reducer binaryAccumulatorReducer = this.filter == null ? new StreamReducers.BinaryAccumulatorReducer((crdtData, crdtData2) -> {
                return new CrdtData(crdtData.getKey(), this.function.merge(crdtData.getState(), crdtData2.getState()));
            }) : new StreamReducers.BinaryAccumulatorReducer<K, CrdtData<K, S>>((crdtData3, crdtData4) -> {
                return new CrdtData(crdtData3.getKey(), this.function.merge(crdtData3.getState(), crdtData4.getState()));
            }) { // from class: io.activej.crdt.storage.cluster.CrdtStorageCluster.2
                /* JADX INFO: Access modifiers changed from: protected */
                public boolean filter(CrdtData<K, S> crdtData5) {
                    return CrdtStorageCluster.this.filter.test(crdtData5.getState());
                }
            };
            StreamSupplier<CrdtData<K, S>> streamSupplier = container.value;
            Objects.requireNonNull(streamReducer);
            streamSupplier.streamTo(streamReducer.addInput(new StreamReducer<K, CrdtData<K, S>, CrdtData<K, S>>.SimpleInput<CrdtData<K, S>>(streamReducer, (v0) -> {
                return v0.getKey();
            }, binaryAccumulatorReducer, container, refInt, list) { // from class: io.activej.crdt.storage.cluster.CrdtStorageCluster.3
                boolean awaiting;
                static final /* synthetic */ boolean $assertionsDisabled;
                final /* synthetic */ Container val$container;
                final /* synthetic */ RefInt val$failed;
                final /* synthetic */ List val$containers;

                /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                {
                    super(streamReducer, r8, binaryAccumulatorReducer);
                    this.val$container = container;
                    this.val$failed = refInt;
                    this.val$containers = list;
                    Objects.requireNonNull(streamReducer);
                }

                protected void onError(Exception exc) {
                    if (this.awaiting) {
                        advance();
                    }
                    closeInput();
                    CrdtStorageCluster.this.partitions.markDead(this.val$container.id, exc);
                    RefInt refInt2 = this.val$failed;
                    int i = refInt2.value + 1;
                    refInt2.value = i;
                    if (i == this.val$containers.size()) {
                        super.onError(exc);
                    } else {
                        continueReduce();
                    }
                }

                protected int await() {
                    if (!$assertionsDisabled && this.awaiting) {
                        throw new AssertionError();
                    }
                    this.awaiting = true;
                    return super.await();
                }

                protected int advance() {
                    if (!$assertionsDisabled && !this.awaiting) {
                        throw new AssertionError();
                    }
                    this.awaiting = false;
                    return super.advance();
                }

                static {
                    $assertionsDisabled = !CrdtStorageCluster.class.desiredAssertionStatus();
                }
            }));
        }
        return refInt;
    }

    @JmxAttribute
    public int getDeadPartitionsThreshold() {
        return this.deadPartitionsThreshold;
    }

    @JmxAttribute
    public int getReplicationCount() {
        return this.replicationCount;
    }

    @JmxOperation
    public void setPersistenceOptions(int i, int i2) {
        withPersistenceOptions(i, i2);
    }

    @JmxAttribute
    public void setReplicationCount(int i) {
        withReplicationCount(i);
    }

    @JmxAttribute(name = "")
    public CrdtPartitions getPartitionsJmx() {
        return this.partitions;
    }

    @JmxAttribute
    public boolean isDetailedStats() {
        return this.detailedStats;
    }

    @JmxOperation
    public void startDetailedMonitoring() {
        this.detailedStats = true;
    }

    @JmxOperation
    public void stopDetailedMonitoring() {
        this.detailedStats = false;
    }

    @JmxAttribute
    public StreamStatsBasic getUploadStats() {
        return this.uploadStats;
    }

    @JmxAttribute
    public StreamStatsDetailed getUploadStatsDetailed() {
        return this.uploadStatsDetailed;
    }

    @JmxAttribute
    public StreamStatsBasic getDownloadStats() {
        return this.downloadStats;
    }

    @JmxAttribute
    public StreamStatsDetailed getDownloadStatsDetailed() {
        return this.downloadStatsDetailed;
    }

    @JmxAttribute
    public StreamStatsBasic getRemoveStats() {
        return this.removeStats;
    }

    @JmxAttribute
    public StreamStatsDetailed getRemoveStatsDetailed() {
        return this.removeStatsDetailed;
    }
}
