/*
 * Decompiled with CFR 0.152.
 */
package org.opendaylight.controller.cluster.datastore;

import akka.actor.ActorRef;
import akka.actor.Status;
import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.primitives.UnsignedLong;
import com.google.common.util.concurrent.FutureCallback;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.CohortEntry;
import org.opendaylight.controller.cluster.datastore.Shard;
import org.opendaylight.controller.cluster.datastore.ShardDataTree;
import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor;
import org.opendaylight.yangtools.concepts.Identifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor;
import org.slf4j.Logger;

final class ShardCommitCoordinator {
    private final Map<Identifier, CohortEntry> cohortCache = new HashMap<Identifier, CohortEntry>();
    private final ShardDataTree dataTree;
    private final Logger log;
    private final String name;
    @VisibleForTesting
    private CohortDecorator cohortDecorator;
    private ReadyTransactionReply readyTransactionReply;

    ShardCommitCoordinator(ShardDataTree dataTree, Logger log, String name) {
        this.log = log;
        this.name = name;
        this.dataTree = (ShardDataTree)Preconditions.checkNotNull((Object)dataTree);
    }

    int getCohortCacheSize() {
        return this.cohortCache.size();
    }

    private String persistenceId() {
        return this.dataTree.logContext();
    }

    private ReadyTransactionReply readyTransactionReply(ActorRef cohort) {
        if (this.readyTransactionReply == null) {
            this.readyTransactionReply = new ReadyTransactionReply(Serialization.serializedActorPath((ActorRef)cohort));
        }
        return this.readyTransactionReply;
    }

    void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard) {
        this.log.debug("{}: Readying transaction {}, client version {}", new Object[]{this.name, ready.getTransactionId(), ready.getTxnClientVersion()});
        ShardDataTreeCohort cohort = ready.getTransaction().ready();
        CohortEntry cohortEntry = CohortEntry.createReady(cohort, ready.getTxnClientVersion());
        this.cohortCache.put((Identifier)cohortEntry.getTransactionId(), cohortEntry);
        if (ready.isDoImmediateCommit()) {
            cohortEntry.setDoImmediateCommit(true);
            cohortEntry.setReplySender(sender);
            cohortEntry.setShard(shard);
            this.handleCanCommit(cohortEntry);
        } else {
            sender.tell((Object)this.readyTransactionReply(shard.self()), shard.self());
        }
    }

    void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard) {
        CohortEntry cohortEntry = this.cohortCache.get(batched.getTransactionId());
        if (cohortEntry == null) {
            cohortEntry = CohortEntry.createOpen(this.dataTree.newReadWriteTransaction(batched.getTransactionId()), batched.getVersion());
            this.cohortCache.put((Identifier)cohortEntry.getTransactionId(), cohortEntry);
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("{}: Applying {} batched modifications for Tx {}", new Object[]{this.name, batched.getModifications().size(), batched.getTransactionId()});
        }
        cohortEntry.applyModifications(batched.getModifications());
        if (batched.isReady()) {
            if (cohortEntry.getLastBatchedModificationsException() != null) {
                this.cohortCache.remove(cohortEntry.getTransactionId());
                throw cohortEntry.getLastBatchedModificationsException();
            }
            if (cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) {
                this.cohortCache.remove(cohortEntry.getTransactionId());
                throw new IllegalStateException(String.format("The total number of batched messages received %d does not match the number sent %d", cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent()));
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("{}: Readying Tx {}, client version {}", new Object[]{this.name, batched.getTransactionId(), batched.getVersion()});
            }
            cohortEntry.setDoImmediateCommit(batched.isDoCommitOnReady());
            cohortEntry.ready(this.cohortDecorator);
            if (batched.isDoCommitOnReady()) {
                cohortEntry.setReplySender(sender);
                cohortEntry.setShard(shard);
                this.handleCanCommit(cohortEntry);
            } else {
                sender.tell((Object)this.readyTransactionReply(shard.self()), shard.self());
            }
        } else {
            sender.tell((Object)new BatchedModificationsReply(batched.getModifications().size()), shard.self());
        }
    }

    void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
        TransactionIdentifier txId = message.getTransactionId();
        ShardDataTreeCohort cohort = this.dataTree.newReadyCohort(txId, message.getModification());
        CohortEntry cohortEntry = CohortEntry.createReady(cohort, (short)5);
        this.cohortCache.put((Identifier)cohortEntry.getTransactionId(), cohortEntry);
        cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
        this.log.debug("{}: Applying local modifications for Tx {}", (Object)this.name, (Object)txId);
        if (message.isDoCommitOnReady()) {
            cohortEntry.setReplySender(sender);
            cohortEntry.setShard(shard);
            this.handleCanCommit(cohortEntry);
        } else {
            sender.tell((Object)this.readyTransactionReply(shard.self()), shard.self());
        }
    }

    Collection<BatchedModifications> createForwardedBatchedModifications(final BatchedModifications from, final int maxModificationsPerBatch) {
        CohortEntry cohortEntry = this.cohortCache.remove(from.getTransactionId());
        if (cohortEntry == null || cohortEntry.getTransaction() == null) {
            return Collections.singletonList(from);
        }
        cohortEntry.applyModifications(from.getModifications());
        final LinkedList<BatchedModifications> newModifications = new LinkedList<BatchedModifications>();
        ((DataTreeModification)cohortEntry.getTransaction().getSnapshot()).applyToCursor((DataTreeModificationCursor)new AbstractBatchedModificationsCursor(){

            @Override
            protected BatchedModifications getModifications() {
                if (newModifications.isEmpty() || ((BatchedModifications)newModifications.getLast()).getModifications().size() >= maxModificationsPerBatch) {
                    newModifications.add(new BatchedModifications(from.getTransactionId(), from.getVersion()));
                }
                return (BatchedModifications)newModifications.getLast();
            }
        });
        BatchedModifications last = (BatchedModifications)newModifications.getLast();
        last.setDoCommitOnReady(from.isDoCommitOnReady());
        last.setReady(from.isReady());
        last.setTotalMessagesSent(newModifications.size());
        return newModifications;
    }

    private void handleCanCommit(final CohortEntry cohortEntry) {
        cohortEntry.canCommit(new FutureCallback<Void>(){

            public void onSuccess(Void result) {
                ShardCommitCoordinator.this.log.debug("{}: canCommit for {}: success", (Object)ShardCommitCoordinator.this.name, (Object)cohortEntry.getTransactionId());
                if (cohortEntry.isDoImmediateCommit()) {
                    ShardCommitCoordinator.this.doCommit(cohortEntry);
                } else {
                    cohortEntry.getReplySender().tell(CanCommitTransactionReply.yes(cohortEntry.getClientVersion()).toSerializable(), cohortEntry.getShard().self());
                }
            }

            public void onFailure(Throwable failure) {
                ShardCommitCoordinator.this.log.debug("{}: An exception occurred during canCommit for {}: {}", new Object[]{ShardCommitCoordinator.this.name, cohortEntry.getTransactionId(), failure});
                ShardCommitCoordinator.this.cohortCache.remove(cohortEntry.getTransactionId());
                cohortEntry.getReplySender().tell((Object)new Status.Failure(failure), cohortEntry.getShard().self());
            }
        });
    }

    void handleCanCommit(Identifier transactionID, ActorRef sender, Shard shard) {
        CohortEntry cohortEntry = this.cohortCache.get(transactionID);
        if (cohortEntry == null) {
            IllegalStateException ex = new IllegalStateException(String.format("%s: Cannot canCommit transaction %s - no cohort entry found", this.name, transactionID));
            this.log.error(ex.getMessage());
            sender.tell((Object)new Status.Failure((Throwable)ex), shard.self());
            return;
        }
        cohortEntry.setReplySender(sender);
        cohortEntry.setShard(shard);
        this.handleCanCommit(cohortEntry);
    }

    void doCommit(final CohortEntry cohortEntry) {
        this.log.debug("{}: Committing transaction {}", (Object)this.name, (Object)cohortEntry.getTransactionId());
        cohortEntry.preCommit(new FutureCallback<DataTreeCandidate>(){

            public void onSuccess(DataTreeCandidate candidate) {
                ShardCommitCoordinator.this.finishCommit(cohortEntry.getReplySender(), cohortEntry);
            }

            public void onFailure(Throwable failure) {
                ShardCommitCoordinator.this.log.error("{} An exception occurred while preCommitting transaction {}", new Object[]{ShardCommitCoordinator.this.name, cohortEntry.getTransactionId(), failure});
                ShardCommitCoordinator.this.cohortCache.remove(cohortEntry.getTransactionId());
                cohortEntry.getReplySender().tell((Object)new Status.Failure(failure), cohortEntry.getShard().self());
            }
        });
    }

    void finishCommit(final @Nonnull ActorRef sender, final @Nonnull CohortEntry cohortEntry) {
        this.log.debug("{}: Finishing commit for transaction {}", (Object)this.persistenceId(), (Object)cohortEntry.getTransactionId());
        cohortEntry.commit(new FutureCallback<UnsignedLong>(){

            public void onSuccess(UnsignedLong result) {
                TransactionIdentifier txId = cohortEntry.getTransactionId();
                ShardCommitCoordinator.this.log.debug("{}: Transaction {} committed as {}, sending response to {}", new Object[]{ShardCommitCoordinator.this.persistenceId(), txId, result, sender});
                cohortEntry.getShard().getDataStore().purgeTransaction(txId, null);
                ShardCommitCoordinator.this.cohortCache.remove(cohortEntry.getTransactionId());
                sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), cohortEntry.getShard().self());
            }

            public void onFailure(Throwable failure) {
                TransactionIdentifier txId = cohortEntry.getTransactionId();
                ShardCommitCoordinator.this.log.error("{}, An exception occurred while committing transaction {}", new Object[]{ShardCommitCoordinator.this.persistenceId(), txId, failure});
                cohortEntry.getShard().getDataStore().purgeTransaction(txId, null);
                ShardCommitCoordinator.this.cohortCache.remove(cohortEntry.getTransactionId());
                sender.tell((Object)new Status.Failure(failure), cohortEntry.getShard().self());
            }
        });
    }

    void handleCommit(Identifier transactionID, ActorRef sender, Shard shard) {
        CohortEntry cohortEntry = this.cohortCache.get(transactionID);
        if (cohortEntry == null) {
            IllegalStateException ex = new IllegalStateException(String.format("%s: Cannot commit transaction %s - no cohort entry found", this.name, transactionID));
            this.log.error(ex.getMessage());
            sender.tell((Object)new Status.Failure((Throwable)ex), shard.self());
            return;
        }
        cohortEntry.setReplySender(sender);
        this.doCommit(cohortEntry);
    }

    void handleAbort(Identifier transactionID, final ActorRef sender, final Shard shard) {
        final CohortEntry cohortEntry = this.cohortCache.remove(transactionID);
        if (cohortEntry == null) {
            return;
        }
        this.log.debug("{}: Aborting transaction {}", (Object)this.name, (Object)transactionID);
        final ActorRef self = shard.getSelf();
        cohortEntry.abort(new FutureCallback<Void>(){

            public void onSuccess(Void result) {
                shard.getDataStore().purgeTransaction(cohortEntry.getTransactionId(), null);
                if (sender != null) {
                    sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
                }
            }

            public void onFailure(Throwable failure) {
                ShardCommitCoordinator.this.log.error("{}: An exception happened during abort", (Object)ShardCommitCoordinator.this.name, (Object)failure);
                shard.getDataStore().purgeTransaction(cohortEntry.getTransactionId(), null);
                if (sender != null) {
                    sender.tell((Object)new Status.Failure(failure), self);
                }
            }
        });
        shard.getShardMBean().incrementAbortTransactionsCount();
    }

    void checkForExpiredTransactions(long timeout, Shard shard) {
        Iterator<CohortEntry> iter = this.cohortCache.values().iterator();
        while (iter.hasNext()) {
            CohortEntry cohortEntry = iter.next();
            if (!cohortEntry.isFailed()) continue;
            iter.remove();
        }
    }

    void abortPendingTransactions(String reason, Shard shard) {
        Status.Failure failure = new Status.Failure((Throwable)new RuntimeException(reason));
        Collection<ShardDataTreeCohort> pending = this.dataTree.getAndClearPendingTransactions();
        this.log.debug("{}: Aborting {} pending queued transactions", (Object)this.name, (Object)pending.size());
        for (ShardDataTreeCohort cohort : pending) {
            CohortEntry cohortEntry = this.cohortCache.remove(cohort.getIdentifier());
            if (cohortEntry == null || cohortEntry.getReplySender() == null) continue;
            cohortEntry.getReplySender().tell((Object)failure, shard.self());
        }
        this.cohortCache.clear();
    }

    Collection<?> convertPendingTransactionsToMessages(final int maxModificationsPerBatch) {
        ArrayList messages = new ArrayList();
        for (ShardDataTreeCohort cohort : this.dataTree.getAndClearPendingTransactions()) {
            final CohortEntry cohortEntry = this.cohortCache.remove(cohort.getIdentifier());
            if (cohortEntry == null) continue;
            final ArrayDeque newMessages = new ArrayDeque();
            cohortEntry.getDataTreeModification().applyToCursor((DataTreeModificationCursor)new AbstractBatchedModificationsCursor(){

                @Override
                protected BatchedModifications getModifications() {
                    BatchedModifications lastBatch = (BatchedModifications)newMessages.peekLast();
                    if (lastBatch != null && lastBatch.getModifications().size() >= maxModificationsPerBatch) {
                        return lastBatch;
                    }
                    BatchedModifications ret = new BatchedModifications(cohortEntry.getTransactionId(), cohortEntry.getClientVersion());
                    newMessages.add(ret);
                    return ret;
                }
            });
            BatchedModifications last = (BatchedModifications)newMessages.peekLast();
            if (last == null) continue;
            boolean immediate = cohortEntry.isDoImmediateCommit();
            last.setDoCommitOnReady(immediate);
            last.setReady(true);
            last.setTotalMessagesSent(newMessages.size());
            messages.addAll(newMessages);
            if (immediate) continue;
            switch (cohort.getState()) {
                case CAN_COMMIT_COMPLETE: 
                case CAN_COMMIT_PENDING: {
                    messages.add(new CanCommitTransaction(cohortEntry.getTransactionId(), cohortEntry.getClientVersion()));
                    break;
                }
                case PRE_COMMIT_COMPLETE: 
                case PRE_COMMIT_PENDING: {
                    messages.add(new CommitTransaction(cohortEntry.getTransactionId(), cohortEntry.getClientVersion()));
                    break;
                }
            }
        }
        return messages;
    }

    @VisibleForTesting
    void setCohortDecorator(CohortDecorator cohortDecorator) {
        this.cohortDecorator = cohortDecorator;
    }

    @VisibleForTesting
    public static interface CohortDecorator {
        public ShardDataTreeCohort decorate(Identifier var1, ShardDataTreeCohort var2);
    }
}

