/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache.distributed.near.consistency;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheEntryVersion;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.CacheConsistencyViolationEvent;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.EntryGetResult;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridFutureAdapter;

public abstract class GridNearReadRepairAbstractFuture
extends GridFutureAdapter<Map<KeyCacheObject, EntryGetResult>> {
    protected static final int MAX_REMAP_CNT = IgniteSystemProperties.getInteger("IGNITE_NEAR_GET_MAX_REMAPS", 3);
    protected static final AtomicIntegerFieldUpdater<GridNearReadRepairAbstractFuture> REMAP_CNT_UPD = AtomicIntegerFieldUpdater.newUpdater(GridNearReadRepairAbstractFuture.class, "remapCnt");
    protected volatile int remapCnt;
    protected final Map<ClusterNode, GridPartitionedGetFuture<KeyCacheObject, EntryGetResult>> futs = new ConcurrentHashMap<ClusterNode, GridPartitionedGetFuture<KeyCacheObject, EntryGetResult>>();
    protected final GridCacheContext<KeyCacheObject, EntryGetResult> ctx;
    protected final Collection<KeyCacheObject> keys;
    protected final boolean readThrough;
    protected final String taskName;
    protected final boolean deserializeBinary;
    protected final boolean recovery;
    protected final IgniteCacheExpiryPolicy expiryPlc;
    protected final IgniteInternalTx tx;
    private volatile Map<KeyCacheObject, ClusterNode> primaries;
    private final boolean canRemap;
    private AffinityTopologyVersion topVer;

    protected GridNearReadRepairAbstractFuture(AffinityTopologyVersion topVer, GridCacheContext<KeyCacheObject, EntryGetResult> ctx, Collection<KeyCacheObject> keys, boolean readThrough, String taskName, boolean deserializeBinary, boolean recovery, IgniteCacheExpiryPolicy expiryPlc, IgniteInternalTx tx) {
        this.ctx = ctx;
        this.keys = keys;
        this.readThrough = readThrough;
        this.taskName = taskName;
        this.deserializeBinary = deserializeBinary;
        this.recovery = recovery;
        this.expiryPlc = expiryPlc;
        this.tx = tx;
        this.canRemap = topVer == null;
        this.map(this.canRemap ? ctx.affinity().affinityTopologyVersion() : topVer);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected synchronized void map(AffinityTopologyVersion topVer) {
        this.topVer = topVer;
        assert (this.futs.isEmpty()) : "Remapping started without the clean-up.";
        HashMap<KeyCacheObject, ClusterNode> primaryNodes = new HashMap<KeyCacheObject, ClusterNode>();
        IgniteInternalTx prevTx = this.ctx.tm().tx(this.tx);
        try {
            HashMap<ClusterNode, Collection> mappings = new HashMap<ClusterNode, Collection>();
            for (KeyCacheObject keyCacheObject : this.keys) {
                List<ClusterNode> nodes = this.ctx.affinity().nodesByKey(keyCacheObject, topVer);
                primaryNodes.put(keyCacheObject, nodes.get(0));
                for (ClusterNode node : nodes) {
                    mappings.computeIfAbsent(node, k -> new HashSet()).add(keyCacheObject);
                }
            }
            this.primaries = primaryNodes;
            for (Map.Entry entry : mappings.entrySet()) {
                ClusterNode node = (ClusterNode)entry.getKey();
                GridPartitionedGetFuture<KeyCacheObject, EntryGetResult> fut = new GridPartitionedGetFuture<KeyCacheObject, EntryGetResult>(this.ctx, (Collection)entry.getValue(), this.readThrough, false, this.taskName, this.deserializeBinary, this.recovery, this.expiryPlc, false, true, true, this.tx != null ? this.tx.label() : null, this.tx != null ? this.tx.mvccSnapshot() : null, node);
                fut.listen(this::onResult);
                this.futs.put((ClusterNode)entry.getKey(), fut);
            }
            for (GridPartitionedGetFuture gridPartitionedGetFuture : this.futs.values()) {
                gridPartitionedGetFuture.init(topVer);
            }
            if (this.futs.isEmpty()) {
                this.onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes left the grid) [topVer=" + topVer + ", cache=" + this.ctx.name() + ']'));
            }
        }
        finally {
            this.ctx.tm().tx(prevTx);
        }
    }

    protected void remap(AffinityTopologyVersion topVer) {
        this.futs.clear();
        this.map(topVer);
    }

    protected synchronized void onResult(IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>> finished) {
        if (this.isDone() || this.topVer == null || !this.futs.containsValue((GridPartitionedGetFuture)finished)) {
            return;
        }
        if (finished.error() != null) {
            if (finished.error() instanceof ClusterTopologyServerNotFoundException) {
                if (REMAP_CNT_UPD.incrementAndGet(this) > MAX_REMAP_CNT) {
                    this.onDone(new ClusterTopologyCheckedException("Failed to remap keys to a new nodes after " + MAX_REMAP_CNT + " attempts (keys got remapped to the same node) ]"));
                } else if (!this.canRemap) {
                    this.remap(this.topVer);
                } else {
                    long maxTopVer = Math.max(this.topVer.topologyVersion() + 1L, this.ctx.discovery().topologyVersion());
                    AffinityTopologyVersion awaitTopVer = new AffinityTopologyVersion(maxTopVer);
                    this.topVer = null;
                    this.ctx.shared().exchange().affinityReadyFuture(awaitTopVer).listen(f -> this.remap(awaitTopVer));
                }
            } else {
                this.onDone(finished.error());
            }
            return;
        }
        for (GridPartitionedGetFuture<KeyCacheObject, EntryGetResult> fut : this.futs.values()) {
            if (fut.isDone() && fut.error() == null) continue;
            return;
        }
        this.reduce();
    }

    protected abstract void reduce();

    protected void recordConsistencyViolation(Set<KeyCacheObject> inconsistentKeys, Map<KeyCacheObject, EntryGetResult> fixedEntries) {
        GridEventStorageManager evtMgr = this.ctx.gridEvents();
        if (!evtMgr.isRecordable(135)) {
            return;
        }
        HashMap<Object, Map<ClusterNode, CacheConsistencyViolationEvent.EntryInfo>> originalMap = new HashMap<Object, Map<ClusterNode, CacheConsistencyViolationEvent.EntryInfo>>();
        for (Map.Entry<ClusterNode, GridPartitionedGetFuture<KeyCacheObject, EntryGetResult>> pair : this.futs.entrySet()) {
            ClusterNode node = pair.getKey();
            GridPartitionedGetFuture<KeyCacheObject, EntryGetResult> fut = pair.getValue();
            for (Map.Entry entry : ((Map)fut.result()).entrySet()) {
                KeyCacheObject key = (KeyCacheObject)entry.getKey();
                if (!inconsistentKeys.contains(key)) continue;
                EntryGetResult res = (EntryGetResult)entry.getValue();
                GridCacheVersion ver = res.version();
                Object val = this.ctx.unwrapBinaryIfNeeded(res.value(), !this.deserializeBinary, false, null);
                Map map = originalMap.computeIfAbsent(this.ctx.unwrapBinaryIfNeeded(key, false, false, null), k -> new HashMap());
                boolean primary = this.primaries.get(key).equals(fut.affNode());
                boolean correct = fixedEntries != null && fixedEntries.get(key).equals(res);
                map.put(node, new EventEntryInfo(val, ver, primary, correct));
            }
        }
        evtMgr.record(new CacheConsistencyViolationEvent(this.ctx.name(), this.ctx.discovery().localNode(), "Consistency violation fixed.", originalMap));
    }

    private static final class EventEntryInfo
    implements CacheConsistencyViolationEvent.EntryInfo {
        final Object val;
        final CacheEntryVersion ver;
        final boolean primary;
        final boolean correct;

        public EventEntryInfo(Object val, CacheEntryVersion ver, boolean primary, boolean correct) {
            this.val = val;
            this.ver = ver;
            this.primary = primary;
            this.correct = correct;
        }

        @Override
        public Object getValue() {
            return this.val;
        }

        @Override
        public CacheEntryVersion getVersion() {
            return this.ver;
        }

        @Override
        public boolean isPrimary() {
            return this.primary;
        }

        @Override
        public boolean isCorrect() {
            return this.correct;
        }
    }
}

