package org.opendaylight.controller.cluster.datastore;

import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Status;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.Creator;
import akka.persistence.RecoveryFailure;
import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
import org.opendaylight.controller.cluster.datastore.TransactionProxy;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
import org.opendaylight.controller.cluster.raft.RaftActor;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
import org.opendaylight.controller.sal.core.api.model.SchemaService;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;

/* loaded from: input_file:org/opendaylight/controller/cluster/datastore/Shard.class */
public class Shard extends RaftActor {
    public static final String DEFAULT_NAME = "default";
    private final InMemoryDOMDataStore store;
    private final Map<Object, DOMStoreThreePhaseCommitCohort> modificationToCohort;
    private final LoggingAdapter LOG;
    private final boolean persistent;
    private final ShardIdentifier name;
    private final ShardStats shardMBean;
    private final List<ActorSelection> dataChangeListeners;
    private final DatastoreContext datastoreContext;
    private SchemaContext schemaContext;
    private ActorRef createSnapshotTransaction;
    private ShardRecoveryCoordinator recoveryCoordinator;
    private List<Object> currentLogRecoveryBatch;
    private final Map<String, DOMStoreTransactionChain> transactionChains;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/opendaylight/controller/cluster/datastore/Shard$ShardCreator.class */
    public static class ShardCreator implements Creator<Shard> {
        private static final long serialVersionUID = 1;
        final ShardIdentifier name;
        final Map<ShardIdentifier, String> peerAddresses;
        final DatastoreContext datastoreContext;
        final SchemaContext schemaContext;

        ShardCreator(ShardIdentifier shardIdentifier, Map<ShardIdentifier, String> map, DatastoreContext datastoreContext, SchemaContext schemaContext) {
            this.name = shardIdentifier;
            this.peerAddresses = map;
            this.datastoreContext = datastoreContext;
            this.schemaContext = schemaContext;
        }

        /* renamed from: create, reason: merged with bridge method [inline-methods] */
        public Shard m6create() throws Exception {
            return new Shard(this.name, this.peerAddresses, this.datastoreContext, this.schemaContext);
        }
    }

    protected Shard(ShardIdentifier shardIdentifier, Map<ShardIdentifier, String> map, DatastoreContext datastoreContext, SchemaContext schemaContext) {
        super(shardIdentifier.toString(), mapPeerAddresses(map), Optional.of(datastoreContext.getShardRaftConfig()));
        this.modificationToCohort = new HashMap();
        this.LOG = Logging.getLogger(getContext().system(), this);
        this.dataChangeListeners = new ArrayList();
        this.transactionChains = new HashMap();
        this.name = shardIdentifier;
        this.datastoreContext = datastoreContext;
        this.schemaContext = schemaContext;
        this.persistent = !"false".equals(System.getProperty("shard.persistent"));
        this.LOG.info("Shard created : {} persistent : {}", shardIdentifier, Boolean.valueOf(this.persistent));
        this.store = InMemoryDOMDataStoreFactory.create(shardIdentifier.toString(), (SchemaService) null, datastoreContext.getDataStoreProperties());
        if (schemaContext != null) {
            this.store.onGlobalContextUpdated(schemaContext);
        }
        this.shardMBean = ShardMBeanFactory.getShardStatsMBean(shardIdentifier.toString(), datastoreContext.getDataStoreMXBeanType());
        this.shardMBean.setDataStoreExecutor(this.store.getDomStoreExecutor());
        this.shardMBean.setNotificationManager(this.store.getDataChangeListenerNotificationManager());
        if (isMetricsCaptureEnabled()) {
            getContext().become(new MeteringBehavior(this));
        }
    }

    private static Map<String, String> mapPeerAddresses(Map<ShardIdentifier, String> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<ShardIdentifier, String> entry : map.entrySet()) {
            hashMap.put(entry.getKey().toString(), entry.getValue());
        }
        return hashMap;
    }

    public static Props props(ShardIdentifier shardIdentifier, Map<ShardIdentifier, String> map, DatastoreContext datastoreContext, SchemaContext schemaContext) {
        Preconditions.checkNotNull(shardIdentifier, "name should not be null");
        Preconditions.checkNotNull(map, "peerAddresses should not be null");
        Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
        Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
        return Props.create(new ShardCreator(shardIdentifier, map, datastoreContext, schemaContext));
    }

    public void onReceiveRecover(Object obj) {
        if (this.LOG.isDebugEnabled()) {
            this.LOG.debug("onReceiveRecover: Received message {} from {}", obj.getClass().toString(), getSender());
        }
        if (obj instanceof RecoveryFailure) {
            this.LOG.error(((RecoveryFailure) obj).cause(), "Recovery failed because of this cause");
        } else {
            super.onReceiveRecover(obj);
        }
    }

    public void onReceiveCommand(Object obj) {
        if (this.LOG.isDebugEnabled()) {
            this.LOG.debug("onReceiveCommand: Received message {} from {}", obj.getClass().toString(), getSender());
        }
        if (obj.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
            self().tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(obj)), self());
            this.createSnapshotTransaction = null;
            getSender().tell(PoisonPill.getInstance(), self());
            return;
        }
        if (obj.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)) {
            closeTransactionChain(CloseTransactionChain.fromSerializable(obj));
            return;
        }
        if (obj instanceof RegisterChangeListener) {
            registerChangeListener((RegisterChangeListener) obj);
            return;
        }
        if (obj instanceof UpdateSchemaContext) {
            updateSchemaContext((UpdateSchemaContext) obj);
            return;
        }
        if (obj instanceof ForwardedCommitTransaction) {
            handleForwardedCommit((ForwardedCommitTransaction) obj);
            return;
        }
        if (!obj.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
            if (!(obj instanceof PeerAddressResolved)) {
                super.onReceiveCommand(obj);
                return;
            } else {
                PeerAddressResolved peerAddressResolved = (PeerAddressResolved) obj;
                setPeerAddress(peerAddressResolved.getPeerId().toString(), peerAddressResolved.getPeerAddress());
                return;
            }
        }
        if (isLeader()) {
            createTransaction(CreateTransaction.fromSerializable(obj));
        } else if (getLeader() != null) {
            getLeader().forward(obj, getContext());
        } else {
            getSender().tell(new Status.Failure(new IllegalStateException("Could not find leader so transaction cannot be created")), getSelf());
        }
    }

    private void closeTransactionChain(CloseTransactionChain closeTransactionChain) {
        DOMStoreTransactionChain remove = this.transactionChains.remove(closeTransactionChain.getTransactionChainId());
        if (remove != null) {
            remove.close();
        }
    }

    private ActorRef createTypedTransactionActor(int i, ShardTransactionIdentifier shardTransactionIdentifier, String str) {
        DOMStoreTransactionChain dOMStoreTransactionChain = this.store;
        if (!str.isEmpty()) {
            dOMStoreTransactionChain = (DOMStoreTransactionFactory) this.transactionChains.get(str);
            if (dOMStoreTransactionChain == null) {
                DOMStoreTransactionChain createTransactionChain = this.store.createTransactionChain();
                this.transactionChains.put(str, createTransactionChain);
                dOMStoreTransactionChain = createTransactionChain;
            }
        }
        if (this.schemaContext == null) {
            throw new NullPointerException("schemaContext should not be null");
        }
        if (i == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
            this.shardMBean.incrementReadOnlyTransactionCount();
            return getContext().actorOf(ShardTransaction.props(dOMStoreTransactionChain.newReadOnlyTransaction(), getSelf(), this.schemaContext, this.datastoreContext, this.shardMBean), shardTransactionIdentifier.toString());
        }
        if (i == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
            this.shardMBean.incrementReadWriteTransactionCount();
            return getContext().actorOf(ShardTransaction.props(dOMStoreTransactionChain.newReadWriteTransaction(), getSelf(), this.schemaContext, this.datastoreContext, this.shardMBean), shardTransactionIdentifier.toString());
        }
        if (i != TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
            throw new IllegalArgumentException("Shard=" + this.name + ":CreateTransaction message has unidentified transaction type=" + i);
        }
        this.shardMBean.incrementWriteOnlyTransactionCount();
        return getContext().actorOf(ShardTransaction.props(dOMStoreTransactionChain.newWriteOnlyTransaction(), getSelf(), this.schemaContext, this.datastoreContext, this.shardMBean), shardTransactionIdentifier.toString());
    }

    private void createTransaction(CreateTransaction createTransaction) {
        createTransaction(createTransaction.getTransactionType(), createTransaction.getTransactionId(), createTransaction.getTransactionChainId());
    }

    private ActorRef createTransaction(int i, String str, String str2) {
        ShardTransactionIdentifier build = ShardTransactionIdentifier.builder().remoteTransactionId(str).build();
        if (this.LOG.isDebugEnabled()) {
            this.LOG.debug("Creating transaction : {} ", build);
        }
        ActorRef createTypedTransactionActor = createTypedTransactionActor(i, build, str2);
        getSender().tell(new CreateTransactionReply(Serialization.serializedActorPath(createTypedTransactionActor), str).toSerializable(), getSelf());
        return createTypedTransactionActor;
    }

    private void syncCommitTransaction(DOMStoreWriteTransaction dOMStoreWriteTransaction) throws ExecutionException, InterruptedException {
        DOMStoreThreePhaseCommitCohort ready = dOMStoreWriteTransaction.ready();
        ready.preCommit().get();
        ready.commit().get();
    }

    private void commit(final ActorRef actorRef, Object obj) {
        MutableCompositeModification.fromSerializable(obj, this.schemaContext);
        DOMStoreThreePhaseCommitCohort remove = this.modificationToCohort.remove(obj);
        if (remove == null) {
            commitWithNewTransaction(obj);
        } else if (actorRef == null) {
            this.LOG.error("Commit failed. Sender cannot be null");
        } else {
            Futures.addCallback(remove.commit(), new FutureCallback<Void>() { // from class: org.opendaylight.controller.cluster.datastore.Shard.1
                public void onSuccess(Void r5) {
                    actorRef.tell(new CommitTransactionReply().toSerializable(), Shard.this.getSelf());
                    Shard.this.shardMBean.incrementCommittedTransactionCount();
                    Shard.this.shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
                }

                public void onFailure(Throwable th) {
                    Shard.this.LOG.error(th, "An exception happened during commit");
                    Shard.this.shardMBean.incrementFailedTransactionsCount();
                    actorRef.tell(new Status.Failure(th), Shard.this.getSelf());
                }
            });
        }
    }

    private void commitWithNewTransaction(Object obj) {
        DOMStoreWriteTransaction newWriteOnlyTransaction = this.store.newWriteOnlyTransaction();
        MutableCompositeModification.fromSerializable(obj, this.schemaContext).apply(newWriteOnlyTransaction);
        try {
            syncCommitTransaction(newWriteOnlyTransaction);
            this.shardMBean.incrementCommittedTransactionCount();
        } catch (InterruptedException | ExecutionException e) {
            this.shardMBean.incrementFailedTransactionsCount();
            this.LOG.error(e, "Failed to commit");
        }
    }

    private void handleForwardedCommit(ForwardedCommitTransaction forwardedCommitTransaction) {
        Object serializable = forwardedCommitTransaction.getModification().toSerializable();
        this.modificationToCohort.put(serializable, forwardedCommitTransaction.getCohort());
        if (this.persistent) {
            persistData(getSender(), "identifier", new CompositeModificationPayload(serializable));
        } else {
            commit(getSender(), serializable);
        }
    }

    private void updateSchemaContext(UpdateSchemaContext updateSchemaContext) {
        this.schemaContext = updateSchemaContext.getSchemaContext();
        updateSchemaContext(updateSchemaContext.getSchemaContext());
        this.store.onGlobalContextUpdated(updateSchemaContext.getSchemaContext());
    }

    @VisibleForTesting
    void updateSchemaContext(SchemaContext schemaContext) {
        this.store.onGlobalContextUpdated(schemaContext);
    }

    private void registerChangeListener(RegisterChangeListener registerChangeListener) {
        if (this.LOG.isDebugEnabled()) {
            this.LOG.debug("registerDataChangeListener for {}", registerChangeListener.getPath());
        }
        ActorSelection actorSelection = getContext().system().actorSelection(registerChangeListener.getDataChangeListenerPath());
        actorSelection.tell(new EnableNotification(isLeader()), getSelf());
        this.dataChangeListeners.add(actorSelection);
        ActorRef actorOf = getContext().actorOf(DataChangeListenerRegistration.props(this.store.registerChangeListener(registerChangeListener.getPath(), new DataChangeListenerProxy(this.schemaContext, actorSelection), registerChangeListener.getScope())));
        if (this.LOG.isDebugEnabled()) {
            this.LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = {} ", actorOf.path().toString());
        }
        getSender().tell(new RegisterChangeListenerReply(actorOf.path()), getSelf());
    }

    private boolean isMetricsCaptureEnabled() {
        return new CommonConfig(getContext().system().settings().config()).isMetricCaptureEnabled();
    }

    protected void startLogRecoveryBatch(int i) {
        this.currentLogRecoveryBatch = Lists.newArrayListWithCapacity(i);
        if (this.LOG.isDebugEnabled()) {
            this.LOG.debug("{} : starting log recovery batch with max size {}", persistenceId(), Integer.valueOf(i));
        }
    }

    protected void appendRecoveredLogEntry(Payload payload) {
        if (payload instanceof CompositeModificationPayload) {
            this.currentLogRecoveryBatch.add(((CompositeModificationPayload) payload).getModification());
        } else {
            this.LOG.error("Unknown state received {} during recovery", payload);
        }
    }

    protected void applyRecoverySnapshot(ByteString byteString) {
        if (this.recoveryCoordinator == null) {
            this.recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), this.schemaContext);
        }
        this.recoveryCoordinator.submit(byteString, this.store.newWriteOnlyTransaction());
        if (this.LOG.isDebugEnabled()) {
            this.LOG.debug("{} : submitted recovery sbapshot", persistenceId());
        }
    }

    protected void applyCurrentLogRecoveryBatch() {
        if (this.recoveryCoordinator == null) {
            this.recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), this.schemaContext);
        }
        this.recoveryCoordinator.submit(this.currentLogRecoveryBatch, this.store.newWriteOnlyTransaction());
        if (this.LOG.isDebugEnabled()) {
            this.LOG.debug("{} : submitted log recovery batch with size {}", persistenceId(), Integer.valueOf(this.currentLogRecoveryBatch.size()));
        }
    }

    protected void onRecoveryComplete() {
        if (this.recoveryCoordinator != null) {
            Collection<DOMStoreWriteTransaction> transactions = this.recoveryCoordinator.getTransactions();
            if (this.LOG.isDebugEnabled()) {
                this.LOG.debug("{} : recovery complete - committing {} Tx's", persistenceId(), Integer.valueOf(transactions.size()));
            }
            Iterator<DOMStoreWriteTransaction> it = transactions.iterator();
            while (it.hasNext()) {
                try {
                    syncCommitTransaction(it.next());
                    this.shardMBean.incrementCommittedTransactionCount();
                } catch (InterruptedException | ExecutionException e) {
                    this.shardMBean.incrementFailedTransactionsCount();
                    this.LOG.error(e, "Failed to commit");
                }
            }
        }
        this.recoveryCoordinator = null;
        this.currentLogRecoveryBatch = null;
        updateJournalStats();
    }

    protected void applyState(ActorRef actorRef, String str, Object obj) {
        if (obj instanceof CompositeModificationPayload) {
            Object modification = ((CompositeModificationPayload) obj).getModification();
            if (modification != null) {
                commit(actorRef, modification);
            } else {
                this.LOG.error("modification is null - this is very unexpected, clientActor = {}, identifier = {}", str, actorRef != null ? actorRef.path().toString() : null);
            }
        } else {
            this.LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}", obj, obj.getClass().getClassLoader(), CompositeModificationPayload.class.getClassLoader());
        }
        updateJournalStats();
    }

    private void updateJournalStats() {
        ReplicatedLogEntry lastLogEntry = getLastLogEntry();
        if (lastLogEntry != null) {
            this.shardMBean.setLastLogIndex(lastLogEntry.getIndex());
            this.shardMBean.setLastLogTerm(lastLogEntry.getTerm());
        }
        this.shardMBean.setCommitIndex(getCommitIndex().longValue());
        this.shardMBean.setLastApplied(getLastApplied().longValue());
    }

    protected void createSnapshot() {
        if (this.createSnapshotTransaction == null) {
            this.createSnapshotTransaction = createTransaction(TransactionProxy.TransactionType.READ_ONLY.ordinal(), "createSnapshot", "");
            this.createSnapshotTransaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());
        }
    }

    @VisibleForTesting
    protected void applySnapshot(ByteString byteString) {
        this.LOG.info("Applying snapshot");
        try {
            try {
                DOMStoreWriteTransaction newWriteOnlyTransaction = this.store.newWriteOnlyTransaction();
                NormalizedNode decode = new NormalizedNodeToNodeCodec(this.schemaContext).decode(YangInstanceIdentifier.builder().build(), NormalizedNodeMessages.Node.parseFrom(byteString));
                newWriteOnlyTransaction.delete(YangInstanceIdentifier.builder().build());
                newWriteOnlyTransaction.write(YangInstanceIdentifier.builder().build(), decode);
                syncCommitTransaction(newWriteOnlyTransaction);
                this.LOG.info("Done applying snapshot");
            } catch (InvalidProtocolBufferException | InterruptedException | ExecutionException e) {
                this.LOG.error(e, "An exception occurred when applying snapshot");
                this.LOG.info("Done applying snapshot");
            }
        } catch (Throwable th) {
            this.LOG.info("Done applying snapshot");
            throw th;
        }
    }

    protected void onStateChanged() {
        Iterator<ActorSelection> it = this.dataChangeListeners.iterator();
        while (it.hasNext()) {
            it.next().tell(new EnableNotification(isLeader()), getSelf());
        }
        this.shardMBean.setRaftState(getRaftState().name());
        this.shardMBean.setCurrentTerm(getCurrentTerm().longValue());
        if (isLeader()) {
            return;
        }
        for (Map.Entry<String, DOMStoreTransactionChain> entry : this.transactionChains.entrySet()) {
            if (this.LOG.isDebugEnabled()) {
                this.LOG.debug("onStateChanged: Closing transaction chain {} because shard {} is no longer the leader", entry.getKey(), getId());
            }
            entry.getValue().close();
        }
        this.transactionChains.clear();
    }

    protected void onLeaderChanged(String str, String str2) {
        this.shardMBean.setLeader(str2);
    }

    public String persistenceId() {
        return this.name.toString();
    }

    @VisibleForTesting
    NormalizedNode<?, ?> readStore(YangInstanceIdentifier yangInstanceIdentifier) throws ExecutionException, InterruptedException {
        DOMStoreReadTransaction newReadOnlyTransaction = this.store.newReadOnlyTransaction();
        Optional optional = (Optional) newReadOnlyTransaction.read(yangInstanceIdentifier).get();
        NormalizedNode<?, ?> normalizedNode = optional.isPresent() ? (NormalizedNode) optional.get() : null;
        newReadOnlyTransaction.close();
        return normalizedNode;
    }

    @VisibleForTesting
    void writeToStore(YangInstanceIdentifier yangInstanceIdentifier, NormalizedNode<?, ?> normalizedNode) throws ExecutionException, InterruptedException {
        DOMStoreWriteTransaction newWriteOnlyTransaction = this.store.newWriteOnlyTransaction();
        newWriteOnlyTransaction.write(yangInstanceIdentifier, normalizedNode);
        syncCommitTransaction(newWriteOnlyTransaction);
    }

    @VisibleForTesting
    ShardStats getShardMBean() {
        return this.shardMBean;
    }
}
