package io.datakernel.crdt;

import io.datakernel.async.Promise;
import io.datakernel.async.Promises;
import io.datakernel.crdt.primitives.CrdtType;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.eventloop.EventloopService;
import io.datakernel.exception.StacklessException;
import io.datakernel.jmx.EventloopJmxMBeanEx;
import io.datakernel.jmx.JmxAttribute;
import io.datakernel.jmx.JmxOperation;
import io.datakernel.stream.StreamConsumer;
import io.datakernel.stream.StreamSupplier;
import io.datakernel.stream.processor.MultiSharder;
import io.datakernel.stream.processor.ShardingStreamSplitter;
import io.datakernel.stream.processor.StreamReducerSimple;
import io.datakernel.stream.processor.StreamReducers;
import io.datakernel.stream.processor.StreamSplitter;
import io.datakernel.stream.stats.StreamStats;
import io.datakernel.stream.stats.StreamStatsBasic;
import io.datakernel.stream.stats.StreamStatsDetailed;
import io.datakernel.util.Initializable;
import io.datakernel.util.LogUtils;
import java.lang.Comparable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datakernel/crdt/CrdtStorageCluster.class */
public final class CrdtStorageCluster<I extends Comparable<I>, K extends Comparable<K>, S> implements CrdtStorage<K, S>, Initializable<CrdtStorageCluster<I, K, S>>, EventloopService, EventloopJmxMBeanEx {
    private static final Logger logger = LoggerFactory.getLogger(CrdtStorageCluster.class);
    private final Eventloop eventloop;
    private final Map<I, CrdtStorage<K, S>> clients;
    private final Map<I, CrdtStorage<K, S>> aliveClients;
    private final CrdtFunction<S> function;
    private final RendezvousHashSharder<I, K> shardingFunction;
    private List<I> orderedIds;
    private boolean detailedStats;
    private int replicationCount = 1;
    private CrdtFilter<S> filter = obj -> {
        return true;
    };
    private final StreamStatsBasic<CrdtData<K, S>> uploadStats = StreamStats.basic();
    private final StreamStatsDetailed<CrdtData<K, S>> uploadStatsDetailed = StreamStats.detailed();
    private final StreamStatsBasic<CrdtData<K, S>> downloadStats = StreamStats.basic();
    private final StreamStatsDetailed<CrdtData<K, S>> downloadStatsDetailed = StreamStats.detailed();
    private final StreamStatsBasic<K> removeStats = StreamStats.basic();
    private final StreamStatsDetailed<K> removeStatsDetailed = StreamStats.detailed();
    private final Map<I, CrdtStorage<K, S>> deadClients = new HashMap();

    private CrdtStorageCluster(Eventloop eventloop, Map<I, CrdtStorage<K, S>> map, CrdtFunction<S> crdtFunction) {
        this.eventloop = eventloop;
        this.clients = map;
        this.aliveClients = new LinkedHashMap(map);
        this.function = crdtFunction;
        ArrayList arrayList = new ArrayList(this.aliveClients.keySet());
        this.orderedIds = arrayList;
        this.shardingFunction = RendezvousHashSharder.create(arrayList, this.replicationCount);
    }

    public static <I extends Comparable<I>, K extends Comparable<K>, S> CrdtStorageCluster<I, K, S> create(Eventloop eventloop, Map<I, ? extends CrdtStorage<K, S>> map, CrdtFunction<S> crdtFunction) {
        return new CrdtStorageCluster<>(eventloop, new HashMap(map), crdtFunction);
    }

    public static <I extends Comparable<I>, K extends Comparable<K>, S extends CrdtType<S>> CrdtStorageCluster<I, K, S> create(Eventloop eventloop, Map<I, ? extends CrdtStorage<K, S>> map) {
        return new CrdtStorageCluster<>(eventloop, new HashMap(map), CrdtFunction.ofCrdtType());
    }

    public CrdtStorageCluster<I, K, S> withPartition(I i, CrdtStorage<K, S> crdtStorage) {
        this.clients.put(i, crdtStorage);
        this.aliveClients.put(i, crdtStorage);
        recompute();
        return this;
    }

    public CrdtStorageCluster<I, K, S> withReplicationCount(int i) {
        this.replicationCount = i;
        recompute();
        return this;
    }

    public CrdtStorageCluster<I, K, S> withFilter(CrdtFilter<S> crdtFilter) {
        this.filter = crdtFilter;
        return this;
    }

    public Map<I, ? extends CrdtStorage<K, S>> getClients() {
        return Collections.unmodifiableMap(this.clients);
    }

    public Map<I, CrdtStorage<K, S>> getAliveClients() {
        return Collections.unmodifiableMap(this.aliveClients);
    }

    public Map<I, CrdtStorage<K, S>> getDeadClients() {
        return Collections.unmodifiableMap(this.deadClients);
    }

    public List<I> getOrderedIds() {
        return Collections.unmodifiableList(this.orderedIds);
    }

    public MultiSharder<K> getShardingFunction() {
        return this.shardingFunction;
    }

    @NotNull
    public Eventloop getEventloop() {
        return this.eventloop;
    }

    public Promise<Void> checkAllPartitions() {
        return Promises.all(this.clients.entrySet().stream().map(entry -> {
            Comparable comparable = (Comparable) entry.getKey();
            return ((CrdtStorage) entry.getValue()).ping().mapEx((r6, th) -> {
                if (th == null) {
                    markAlive(comparable);
                    return null;
                }
                markDead(comparable, th);
                return null;
            });
        })).whenComplete(LogUtils.toLogger(logger, "checkAllPartitions", new Object[0]));
    }

    public Promise<Void> checkDeadPartitions() {
        return Promises.all(this.deadClients.entrySet().stream().map(entry -> {
            return ((CrdtStorage) entry.getValue()).ping().mapEx((r5, th) -> {
                if (th != null) {
                    return null;
                }
                markAlive((Comparable) entry.getKey());
                return null;
            });
        })).whenComplete(LogUtils.toLogger(logger, "checkDeadPartitions", new Object[0]));
    }

    private void markAlive(I i) {
        CrdtStorage<K, S> remove = this.deadClients.remove(i);
        if (remove != null) {
            this.aliveClients.put(i, remove);
            recompute();
            logger.info("Marked partition {} as alive", i);
        }
    }

    public void markDead(I i, Throwable th) {
        CrdtStorage<K, S> remove = this.aliveClients.remove(i);
        if (remove != null) {
            this.deadClients.put(i, remove);
            recompute();
            logger.warn("Marked partition {} as dead", i, th);
        }
    }

    private void recompute() {
        RendezvousHashSharder<I, K> rendezvousHashSharder = this.shardingFunction;
        ArrayList arrayList = new ArrayList(this.aliveClients.keySet());
        this.orderedIds = arrayList;
        rendezvousHashSharder.recompute(arrayList, this.replicationCount);
    }

    private <T> Promise<List<T>> connect(Function<CrdtStorage<K, S>, Promise<T>> function) {
        return Promises.toList(this.aliveClients.entrySet().stream().map(entry -> {
            return ((Promise) function.apply(entry.getValue())).whenException(th -> {
                markDead((Comparable) entry.getKey(), th);
            }).toTry();
        })).then(list -> {
            List list = (List) list.stream().filter((v0) -> {
                return v0.isSuccess();
            }).map((v0) -> {
                return v0.get();
            }).collect(Collectors.toList());
            return list.isEmpty() ? Promise.ofException(new StacklessException(CrdtStorageCluster.class, "No successful connections")) : Promise.of(list);
        });
    }

    @Override // io.datakernel.crdt.CrdtStorage
    public Promise<StreamConsumer<CrdtData<K, S>>> upload() {
        return connect((v0) -> {
            return v0.upload();
        }).then(list -> {
            ShardingStreamSplitter create = ShardingStreamSplitter.create(this.shardingFunction, (v0) -> {
                return v0.getKey();
            });
            list.forEach(streamConsumer -> {
                create.newOutput().streamTo(streamConsumer);
            });
            return Promise.of(((StreamConsumer) create.getInput().transformWith(this.detailedStats ? this.uploadStats : this.uploadStatsDetailed)).withLateBinding());
        });
    }

    @Override // io.datakernel.crdt.CrdtStorage
    public Promise<StreamSupplier<CrdtData<K, S>>> download(long j) {
        return connect(crdtStorage -> {
            return crdtStorage.download(j);
        }).then(list -> {
            StreamReducerSimple create = StreamReducerSimple.create((v0) -> {
                return v0.getKey();
            }, Comparator.naturalOrder(), new StreamReducers.BinaryAccumulatorReducer((crdtData, crdtData2) -> {
                return new CrdtData(crdtData.getKey(), this.function.merge(crdtData.getState(), crdtData2.getState()));
            }).withFilter(crdtData3 -> {
                return this.filter.test(crdtData3.getState());
            }));
            list.forEach(streamSupplier -> {
                streamSupplier.streamTo(create.newInput());
            });
            return Promise.of(((StreamSupplier) create.getOutput().transformWith(this.detailedStats ? this.downloadStats : this.downloadStatsDetailed)).withLateBinding());
        });
    }

    @Override // io.datakernel.crdt.CrdtStorage
    public Promise<StreamConsumer<K>> remove() {
        return connect((v0) -> {
            return v0.remove();
        }).then(list -> {
            StreamSplitter create = StreamSplitter.create();
            list.forEach(streamConsumer -> {
                create.newOutput().streamTo(streamConsumer);
            });
            return Promise.of(((StreamConsumer) create.getInput().transformWith(this.detailedStats ? this.removeStats : this.removeStatsDetailed)).withLateBinding());
        });
    }

    @Override // io.datakernel.crdt.CrdtStorage
    public Promise<Void> ping() {
        return Promise.complete();
    }

    @NotNull
    public Promise<Void> start() {
        return Promise.complete();
    }

    @NotNull
    public Promise<Void> stop() {
        return Promise.complete();
    }

    @JmxAttribute
    public int getReplicationCount() {
        return this.replicationCount;
    }

    @JmxAttribute
    public void setReplicationCount(int i) {
        withReplicationCount(i);
    }

    @JmxAttribute
    public int getAlivePartitionCount() {
        return this.aliveClients.size();
    }

    @JmxAttribute
    public int getDeadPartitionCount() {
        return this.deadClients.size();
    }

    @JmxAttribute
    public String[] getAlivePartitions() {
        return (String[]) this.aliveClients.keySet().stream().map((v0) -> {
            return v0.toString();
        }).toArray(i -> {
            return new String[i];
        });
    }

    @JmxAttribute
    public String[] getDeadPartitions() {
        return (String[]) this.deadClients.keySet().stream().map((v0) -> {
            return v0.toString();
        }).toArray(i -> {
            return new String[i];
        });
    }

    @JmxOperation
    public void startDetailedMonitoring() {
        this.detailedStats = true;
    }

    @JmxOperation
    public void stopDetailedMonitoring() {
        this.detailedStats = false;
    }

    @JmxAttribute
    public StreamStatsBasic getUploadStats() {
        return this.uploadStats;
    }

    @JmxAttribute
    public StreamStatsDetailed getUploadStatsDetailed() {
        return this.uploadStatsDetailed;
    }

    @JmxAttribute
    public StreamStatsBasic getDownloadStats() {
        return this.downloadStats;
    }

    @JmxAttribute
    public StreamStatsDetailed getDownloadStatsDetailed() {
        return this.downloadStatsDetailed;
    }

    @JmxAttribute
    public StreamStatsBasic getRemoveStats() {
        return this.removeStats;
    }

    @JmxAttribute
    public StreamStatsDetailed getRemoveStatsDetailed() {
        return this.removeStatsDetailed;
    }
}
