package org.opendaylight.controller.cluster.datastore;

import akka.actor.ActorRef;
import akka.actor.Status;
import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.slf4j.Logger;

/* loaded from: input_file:org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.class */
class ShardCommitCoordinator {
    private CohortEntry currentCohortEntry;
    private final ShardDataTree dataTree;
    private int queueCapacity;
    private final Logger log;
    private final String name;
    private final long cacheExpiryTimeoutInMillis;
    private CohortDecorator cohortDecorator;
    private ReadyTransactionReply readyTransactionReply;
    private final Map<String, CohortEntry> cohortCache = new HashMap();
    private final Queue<CohortEntry> queuedCohortEntries = new LinkedList();

    /* loaded from: input_file:org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator$CohortDecorator.class */
    public interface CohortDecorator {
        ShardDataTreeCohort decorate(String str, ShardDataTreeCohort shardDataTreeCohort);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator$CohortEntry.class */
    public static class CohortEntry {
        private final String transactionID;
        private ShardDataTreeCohort cohort;
        private final ReadWriteShardDataTreeTransaction transaction;
        private ActorRef replySender;
        private Shard shard;
        private boolean doImmediateCommit;
        private final Stopwatch lastAccessTimer;
        private int totalBatchedModificationsReceived;

        CohortEntry(String str, ReadWriteShardDataTreeTransaction readWriteShardDataTreeTransaction) {
            this.lastAccessTimer = Stopwatch.createStarted();
            this.transaction = (ReadWriteShardDataTreeTransaction) Preconditions.checkNotNull(readWriteShardDataTreeTransaction);
            this.transactionID = str;
        }

        CohortEntry(String str, ShardDataTreeCohort shardDataTreeCohort, MutableCompositeModification mutableCompositeModification) {
            this.lastAccessTimer = Stopwatch.createStarted();
            this.transactionID = str;
            this.cohort = shardDataTreeCohort;
            this.transaction = null;
        }

        CohortEntry(String str, ShardDataTreeCohort shardDataTreeCohort) {
            this.lastAccessTimer = Stopwatch.createStarted();
            this.transactionID = str;
            this.cohort = shardDataTreeCohort;
            this.transaction = null;
        }

        void updateLastAccessTime() {
            this.lastAccessTimer.reset();
            this.lastAccessTimer.start();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public String getTransactionID() {
            return this.transactionID;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public ShardDataTreeCohort getCohort() {
            return this.cohort;
        }

        int getTotalBatchedModificationsReceived() {
            return this.totalBatchedModificationsReceived;
        }

        void applyModifications(Iterable<Modification> iterable) {
            Iterator<Modification> it = iterable.iterator();
            while (it.hasNext()) {
                it.next().apply(this.transaction.getSnapshot());
            }
            this.totalBatchedModificationsReceived++;
        }

        void ready(CohortDecorator cohortDecorator, boolean z) {
            Preconditions.checkState(this.cohort == null, "cohort was already set");
            setDoImmediateCommit(z);
            this.cohort = this.transaction.ready();
            if (cohortDecorator != null) {
                this.cohort = cohortDecorator.decorate(this.transactionID, this.cohort);
            }
        }

        boolean isReadyToCommit() {
            return this.replySender != null;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public boolean isExpired(long j) {
            return this.lastAccessTimer.elapsed(TimeUnit.MILLISECONDS) >= j;
        }

        boolean isDoImmediateCommit() {
            return this.doImmediateCommit;
        }

        void setDoImmediateCommit(boolean z) {
            this.doImmediateCommit = z;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public ActorRef getReplySender() {
            return this.replySender;
        }

        void setReplySender(ActorRef actorRef) {
            this.replySender = actorRef;
        }

        Shard getShard() {
            return this.shard;
        }

        void setShard(Shard shard) {
            this.shard = shard;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("CohortEntry [transactionID=").append(this.transactionID).append(", doImmediateCommit=").append(this.doImmediateCommit).append("]");
            return sb.toString();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ShardCommitCoordinator(ShardDataTree shardDataTree, long j, int i, ActorRef actorRef, Logger logger, String str) {
        this.queueCapacity = i;
        this.log = logger;
        this.name = str;
        this.dataTree = (ShardDataTree) Preconditions.checkNotNull(shardDataTree);
        this.cacheExpiryTimeoutInMillis = j;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setQueueCapacity(int i) {
        this.queueCapacity = i;
    }

    private ReadyTransactionReply readyTransactionReply(Shard shard) {
        if (this.readyTransactionReply == null) {
            this.readyTransactionReply = new ReadyTransactionReply(Serialization.serializedActorPath(shard.self()));
        }
        return this.readyTransactionReply;
    }

    private boolean queueCohortEntry(CohortEntry cohortEntry, ActorRef actorRef, Shard shard) {
        if (this.queuedCohortEntries.size() < this.queueCapacity) {
            this.queuedCohortEntries.offer(cohortEntry);
            return true;
        }
        this.cohortCache.remove(cohortEntry.getTransactionID());
        RuntimeException runtimeException = new RuntimeException(String.format("%s: Could not enqueue transaction %s - the maximum commit queue capacity %d has been reached.", this.name, cohortEntry.getTransactionID(), Integer.valueOf(this.queueCapacity)));
        this.log.error(runtimeException.getMessage());
        actorRef.tell(new Status.Failure(runtimeException), shard.self());
        return false;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleForwardedReadyTransaction(ForwardedReadyTransaction forwardedReadyTransaction, ActorRef actorRef, Shard shard) {
        this.log.debug("{}: Readying transaction {}, client version {}", new Object[]{this.name, forwardedReadyTransaction.getTransactionID(), Short.valueOf(forwardedReadyTransaction.getTxnClientVersion())});
        CohortEntry cohortEntry = new CohortEntry(forwardedReadyTransaction.getTransactionID(), forwardedReadyTransaction.getCohort(), (MutableCompositeModification) forwardedReadyTransaction.getModification());
        this.cohortCache.put(forwardedReadyTransaction.getTransactionID(), cohortEntry);
        if (queueCohortEntry(cohortEntry, actorRef, shard)) {
            if (forwardedReadyTransaction.getTxnClientVersion() < 3) {
                ActorRef self = shard.self();
                if (forwardedReadyTransaction.getTxnClientVersion() < 1) {
                    this.log.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", this.name);
                    self = shard.getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(forwardedReadyTransaction.getTransactionID()));
                }
                ReadyTransactionReply readyTransactionReply = new ReadyTransactionReply(Serialization.serializedActorPath(self), forwardedReadyTransaction.getTxnClientVersion());
                actorRef.tell(forwardedReadyTransaction.isReturnSerialized() ? readyTransactionReply.toSerializable() : readyTransactionReply, shard.self());
                return;
            }
            if (!forwardedReadyTransaction.isDoImmediateCommit()) {
                actorRef.tell(readyTransactionReply(shard), shard.self());
                return;
            }
            cohortEntry.setDoImmediateCommit(true);
            cohortEntry.setReplySender(actorRef);
            cohortEntry.setShard(shard);
            handleCanCommit(cohortEntry);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleBatchedModifications(BatchedModifications batchedModifications, ActorRef actorRef, Shard shard) {
        CohortEntry cohortEntry = this.cohortCache.get(batchedModifications.getTransactionID());
        if (cohortEntry == null) {
            cohortEntry = new CohortEntry(batchedModifications.getTransactionID(), this.dataTree.newReadWriteTransaction(batchedModifications.getTransactionID(), batchedModifications.getTransactionChainID()));
            this.cohortCache.put(batchedModifications.getTransactionID(), cohortEntry);
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("{}: Applying {} batched modifications for Tx {}", new Object[]{this.name, Integer.valueOf(batchedModifications.getModifications().size()), batchedModifications.getTransactionID()});
        }
        cohortEntry.applyModifications(batchedModifications.getModifications());
        if (!batchedModifications.isReady()) {
            actorRef.tell(new BatchedModificationsReply(batchedModifications.getModifications().size()), shard.self());
            return;
        }
        if (cohortEntry.getTotalBatchedModificationsReceived() != batchedModifications.getTotalMessagesSent()) {
            throw new IllegalStateException(String.format("The total number of batched messages received %d does not match the number sent %d", Integer.valueOf(cohortEntry.getTotalBatchedModificationsReceived()), Integer.valueOf(batchedModifications.getTotalMessagesSent())));
        }
        if (queueCohortEntry(cohortEntry, actorRef, shard)) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("{}: Readying Tx {}, client version {}", new Object[]{this.name, batchedModifications.getTransactionID(), Short.valueOf(batchedModifications.getVersion())});
            }
            cohortEntry.ready(this.cohortDecorator, batchedModifications.isDoCommitOnReady());
            if (!batchedModifications.isDoCommitOnReady()) {
                actorRef.tell(readyTransactionReply(shard), shard.self());
                return;
            }
            cohortEntry.setReplySender(actorRef);
            cohortEntry.setShard(shard);
            handleCanCommit(cohortEntry);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleReadyLocalTransaction(ReadyLocalTransaction readyLocalTransaction, ActorRef actorRef, Shard shard) {
        CohortEntry cohortEntry = new CohortEntry(readyLocalTransaction.getTransactionID(), new SimpleShardDataTreeCohort(this.dataTree, readyLocalTransaction.getModification(), readyLocalTransaction.getTransactionID()));
        this.cohortCache.put(readyLocalTransaction.getTransactionID(), cohortEntry);
        cohortEntry.setDoImmediateCommit(readyLocalTransaction.isDoCommitOnReady());
        if (queueCohortEntry(cohortEntry, actorRef, shard)) {
            this.log.debug("{}: Applying local modifications for Tx {}", this.name, readyLocalTransaction.getTransactionID());
            if (!readyLocalTransaction.isDoCommitOnReady()) {
                actorRef.tell(readyTransactionReply(shard), shard.self());
                return;
            }
            cohortEntry.setReplySender(actorRef);
            cohortEntry.setShard(shard);
            handleCanCommit(cohortEntry);
        }
    }

    private void handleCanCommit(CohortEntry cohortEntry) {
        String transactionID = cohortEntry.getTransactionID();
        cohortEntry.updateLastAccessTime();
        if (this.currentCohortEntry != null) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now", new Object[]{this.name, this.currentCohortEntry.getTransactionID(), transactionID});
            }
        } else if (this.queuedCohortEntries.peek() == cohortEntry) {
            this.currentCohortEntry = this.queuedCohortEntries.poll();
            doCanCommit(this.currentCohortEntry);
        } else if (this.log.isDebugEnabled()) {
            this.log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now", new Object[]{this.name, this.queuedCohortEntries.peek().getTransactionID(), transactionID});
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleCanCommit(String str, ActorRef actorRef, Shard shard) {
        CohortEntry cohortEntry = this.cohortCache.get(str);
        if (cohortEntry == null) {
            IllegalStateException illegalStateException = new IllegalStateException(String.format("%s: No cohort entry found for transaction %s", this.name, str));
            this.log.error(illegalStateException.getMessage());
            actorRef.tell(new Status.Failure(illegalStateException), shard.self());
        } else {
            cohortEntry.setReplySender(actorRef);
            cohortEntry.setShard(shard);
            handleCanCommit(cohortEntry);
        }
    }

    private void doCanCommit(CohortEntry cohortEntry) {
        try {
            try {
                boolean booleanValue = ((Boolean) cohortEntry.getCohort().canCommit().get()).booleanValue();
                this.log.debug("{}: canCommit for {}: {}", new Object[]{this.name, cohortEntry.getTransactionID(), Boolean.valueOf(booleanValue)});
                if (!cohortEntry.isDoImmediateCommit()) {
                    cohortEntry.getReplySender().tell(booleanValue ? CanCommitTransactionReply.YES.toSerializable() : CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard().self());
                } else if (booleanValue) {
                    doCommit(cohortEntry);
                } else {
                    cohortEntry.getReplySender().tell(new Status.Failure(new TransactionCommitFailedException("Can Commit failed, no detailed cause available.", new RpcError[0])), cohortEntry.getShard().self());
                }
                if (booleanValue) {
                    return;
                }
                currentTransactionComplete(cohortEntry.getTransactionID(), true);
            } catch (Exception e) {
                this.log.debug("{}: An exception occurred during canCommit", this.name, e);
                Exception exc = e;
                if (e instanceof ExecutionException) {
                    exc = e.getCause();
                }
                cohortEntry.getReplySender().tell(new Status.Failure(exc), cohortEntry.getShard().self());
                if (0 == 0) {
                    currentTransactionComplete(cohortEntry.getTransactionID(), true);
                }
            }
        } catch (Throwable th) {
            if (0 == 0) {
                currentTransactionComplete(cohortEntry.getTransactionID(), true);
            }
            throw th;
        }
    }

    private boolean doCommit(CohortEntry cohortEntry) {
        this.log.debug("{}: Committing transaction {}", this.name, cohortEntry.getTransactionID());
        boolean z = false;
        try {
            cohortEntry.getCohort().preCommit().get();
            cohortEntry.getShard().continueCommit(cohortEntry);
            cohortEntry.updateLastAccessTime();
            z = true;
        } catch (Exception e) {
            this.log.error("{} An exception occurred while preCommitting transaction {}", new Object[]{this.name, cohortEntry.getTransactionID(), e});
            cohortEntry.getReplySender().tell(new Status.Failure(e), cohortEntry.getShard().self());
            currentTransactionComplete(cohortEntry.getTransactionID(), true);
        }
        return z;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean handleCommit(String str, ActorRef actorRef, Shard shard) {
        CohortEntry cohortEntryIfCurrent = getCohortEntryIfCurrent(str);
        if (cohortEntryIfCurrent != null) {
            cohortEntryIfCurrent.setReplySender(actorRef);
            return doCommit(cohortEntryIfCurrent);
        }
        IllegalStateException illegalStateException = new IllegalStateException(String.format("%s: Cannot commit transaction %s - it is not the current transaction", this.name, str));
        this.log.error(illegalStateException.getMessage());
        actorRef.tell(new Status.Failure(illegalStateException), shard.self());
        return false;
    }

    public CohortEntry getCohortEntryIfCurrent(String str) {
        if (isCurrentTransaction(str)) {
            return this.currentCohortEntry;
        }
        return null;
    }

    public CohortEntry getCurrentCohortEntry() {
        return this.currentCohortEntry;
    }

    public CohortEntry getAndRemoveCohortEntry(String str) {
        return this.cohortCache.remove(str);
    }

    public boolean isCurrentTransaction(String str) {
        return this.currentCohortEntry != null && this.currentCohortEntry.getTransactionID().equals(str);
    }

    public void currentTransactionComplete(String str, boolean z) {
        if (z) {
            this.cohortCache.remove(str);
        }
        if (isCurrentTransaction(str)) {
            this.currentCohortEntry = null;
            this.log.debug("{}: currentTransactionComplete: {}", this.name, str);
            maybeProcessNextCohortEntry();
        }
    }

    private void maybeProcessNextCohortEntry() {
        Iterator<CohortEntry> it = this.queuedCohortEntries.iterator();
        while (it.hasNext()) {
            CohortEntry next = it.next();
            if (next.isReadyToCommit()) {
                if (this.currentCohortEntry == null) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("{}: Next entry to canCommit {}", this.name, next);
                    }
                    it.remove();
                    this.currentCohortEntry = next;
                    this.currentCohortEntry.updateLastAccessTime();
                    doCanCommit(this.currentCohortEntry);
                    return;
                }
                return;
            }
            if (!next.isExpired(this.cacheExpiryTimeoutInMillis)) {
                return;
            }
            this.log.warn("{}: canCommit for transaction {} was not received within {} ms - entry removed from cache", new Object[]{this.name, next.getTransactionID(), Long.valueOf(this.cacheExpiryTimeoutInMillis)});
            it.remove();
            this.cohortCache.remove(next.getTransactionID());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void cleanupExpiredCohortEntries() {
        maybeProcessNextCohortEntry();
    }

    @VisibleForTesting
    void setCohortDecorator(CohortDecorator cohortDecorator) {
        this.cohortDecorator = cohortDecorator;
    }
}
