/*
 * Decompiled with CFR 0.152.
 */
package org.opendaylight.controller.cluster.databroker;

import com.google.common.annotations.Beta;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
import org.opendaylight.controller.cluster.databroker.DataBrokerCommitExecutor;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
import org.opendaylight.mdsal.common.api.CommitInfo;
import org.opendaylight.mdsal.common.api.DataStoreUnavailableException;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
import org.opendaylight.mdsal.dom.api.DOMDataBroker;
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
import org.opendaylight.mdsal.dom.spi.AbstractDOMDataBroker;
import org.opendaylight.mdsal.dom.spi.TransactionCommitFailedExceptionMapper;
import org.opendaylight.mdsal.dom.spi.store.DOMStore;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.util.DurationStatisticsTracker;
import org.opendaylight.yangtools.yang.common.Empty;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Beta
@Component(service={DOMDataBroker.class}, property={"type=default"})
public class ConcurrentDOMDataBroker
extends AbstractDOMDataBroker {
    private static final Logger LOG = LoggerFactory.getLogger(ConcurrentDOMDataBroker.class);
    private static final String CAN_COMMIT = "CAN_COMMIT";
    private static final String PRE_COMMIT = "PRE_COMMIT";
    private static final String COMMIT = "COMMIT";
    private final DurationStatisticsTracker commitStatsTracker;
    private final Executor clientFutureCallbackExecutor;

    public ConcurrentDOMDataBroker(Map<LogicalDatastoreType, DOMStore> datastores, Executor listenableFutureExecutor) {
        this(datastores, listenableFutureExecutor, DurationStatisticsTracker.createConcurrent());
    }

    public ConcurrentDOMDataBroker(Map<LogicalDatastoreType, DOMStore> datastores, Executor listenableFutureExecutor, DurationStatisticsTracker commitStatsTracker) {
        super(datastores);
        this.clientFutureCallbackExecutor = Objects.requireNonNull(listenableFutureExecutor);
        this.commitStatsTracker = Objects.requireNonNull(commitStatsTracker);
    }

    @Activate
    public ConcurrentDOMDataBroker(@Reference DataBrokerCommitExecutor commitExecutor, @Reference(target="(type=distributed-config)") DOMStore configDatastore, @Reference(target="(type=distributed-operational)") DOMStore operDatastore) {
        this(Map.of(LogicalDatastoreType.CONFIGURATION, configDatastore, LogicalDatastoreType.OPERATIONAL, operDatastore), commitExecutor.executor(), commitExecutor.commitStatsTracker());
        LOG.info("DOM Data Broker started");
    }

    @Deactivate
    public void close() {
        LOG.info("DOM Data Broker stopping");
        super.close();
        LOG.info("DOM Data Broker stopped");
    }

    protected FluentFuture<? extends CommitInfo> commit(DOMDataTreeWriteTransaction transaction, DOMStoreThreePhaseCommitCohort cohort) {
        Preconditions.checkArgument((transaction != null ? 1 : 0) != 0, (Object)"Transaction must not be null.");
        Preconditions.checkArgument((cohort != null ? 1 : 0) != 0, (Object)"Cohorts must not be null.");
        LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
        AsyncNotifyingSettableFuture clientSubmitFuture = new AsyncNotifyingSettableFuture(this.clientFutureCallbackExecutor);
        this.doCanCommit(clientSubmitFuture, transaction, cohort);
        return FluentFuture.from((ListenableFuture)clientSubmitFuture);
    }

    private void doCanCommit(final AsyncNotifyingSettableFuture clientSubmitFuture, final DOMDataTreeWriteTransaction transaction, final DOMStoreThreePhaseCommitCohort cohort) {
        final long startTime = System.nanoTime();
        Futures.addCallback((ListenableFuture)cohort.canCommit(), (FutureCallback)new FutureCallback<Boolean>(){

            public void onSuccess(Boolean result) {
                if (result == null || !result.booleanValue()) {
                    this.onFailure((Throwable)new TransactionCommitFailedException("Can Commit failed, no detailed cause available.", new RpcError[0]));
                } else {
                    ConcurrentDOMDataBroker.this.doPreCommit(startTime, clientSubmitFuture, transaction, cohort);
                }
            }

            public void onFailure(Throwable failure) {
                ConcurrentDOMDataBroker.handleException(clientSubmitFuture, transaction, cohort, ConcurrentDOMDataBroker.CAN_COMMIT, TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER, failure);
            }
        }, (Executor)MoreExecutors.directExecutor());
    }

    private void doPreCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture, final DOMDataTreeWriteTransaction transaction, final DOMStoreThreePhaseCommitCohort cohort) {
        Futures.addCallback((ListenableFuture)cohort.preCommit(), (FutureCallback)new FutureCallback<Empty>(){

            public void onSuccess(Empty result) {
                ConcurrentDOMDataBroker.this.doCommit(startTime, clientSubmitFuture, transaction, cohort);
            }

            public void onFailure(Throwable failure) {
                ConcurrentDOMDataBroker.handleException(clientSubmitFuture, transaction, cohort, ConcurrentDOMDataBroker.PRE_COMMIT, TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER, failure);
            }
        }, (Executor)MoreExecutors.directExecutor());
    }

    private void doCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture, final DOMDataTreeWriteTransaction transaction, final DOMStoreThreePhaseCommitCohort cohort) {
        Futures.addCallback((ListenableFuture)cohort.commit(), (FutureCallback)new FutureCallback<CommitInfo>(){

            public void onSuccess(CommitInfo result) {
                ConcurrentDOMDataBroker.this.commitStatsTracker.addDuration(System.nanoTime() - startTime);
                clientSubmitFuture.set();
            }

            public void onFailure(Throwable throwable) {
                ConcurrentDOMDataBroker.handleException(clientSubmitFuture, transaction, cohort, ConcurrentDOMDataBroker.COMMIT, TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER, throwable);
            }
        }, (Executor)MoreExecutors.directExecutor());
    }

    private static void handleException(AsyncNotifyingSettableFuture clientSubmitFuture, final DOMDataTreeWriteTransaction transaction, DOMStoreThreePhaseCommitCohort cohort, String phase, TransactionCommitFailedExceptionMapper exMapper, Throwable throwable) {
        Object e;
        if (clientSubmitFuture.isDone()) {
            return;
        }
        LOG.debug("Tx: {} Error during phase {}, starting Abort", new Object[]{transaction.getIdentifier(), phase, throwable});
        if (throwable instanceof NoShardLeaderException || throwable instanceof ShardLeaderNotRespondingException) {
            e = new DataStoreUnavailableException(throwable.getMessage(), throwable);
        } else if (throwable instanceof Exception) {
            Exception ex = (Exception)throwable;
            e = ex;
        } else {
            e = new RuntimeException("Unexpected error occurred", throwable);
        }
        clientSubmitFuture.setException(exMapper.apply((Exception)e));
        Futures.addCallback((ListenableFuture)cohort.abort(), (FutureCallback)new FutureCallback<Empty>(){

            public void onSuccess(Empty result) {
                LOG.debug("Tx: {} aborted successfully", transaction.getIdentifier());
            }

            public void onFailure(Throwable failure) {
                LOG.error("Tx: {} Error during Abort.", transaction.getIdentifier(), (Object)failure);
            }
        }, (Executor)MoreExecutors.directExecutor());
    }

    public String toString() {
        return "Clustered ConcurrentDOMDataBroker";
    }

    private static class AsyncNotifyingSettableFuture
    extends AbstractFuture<CommitInfo> {
        private static final ThreadLocal<Boolean> ON_TASK_COMPLETION_THREAD_TL = new ThreadLocal();
        private final Executor listenerExecutor;

        AsyncNotifyingSettableFuture(Executor listenerExecutor) {
            this.listenerExecutor = Objects.requireNonNull(listenerExecutor);
        }

        public void addListener(Runnable listener, Executor executor) {
            super.addListener((Runnable)new DelegatingRunnable(listener, this.listenerExecutor), executor);
        }

        boolean set() {
            ON_TASK_COMPLETION_THREAD_TL.set(Boolean.TRUE);
            try {
                boolean bl = super.set((Object)CommitInfo.empty());
                return bl;
            }
            finally {
                ON_TASK_COMPLETION_THREAD_TL.set(null);
            }
        }

        protected boolean setException(Throwable throwable) {
            ON_TASK_COMPLETION_THREAD_TL.set(Boolean.TRUE);
            try {
                boolean bl = super.setException(throwable);
                return bl;
            }
            finally {
                ON_TASK_COMPLETION_THREAD_TL.set(null);
            }
        }

        private static final class DelegatingRunnable
        implements Runnable {
            private final Runnable delegate;
            private final Executor executor;

            DelegatingRunnable(Runnable delegate, Executor executor) {
                this.delegate = Objects.requireNonNull(delegate);
                this.executor = Objects.requireNonNull(executor);
            }

            @Override
            public void run() {
                if (ON_TASK_COMPLETION_THREAD_TL.get() != null) {
                    LOG.trace("Submitting ListenenableFuture Runnable from thread {} to executor {}", (Object)Thread.currentThread().getName(), (Object)this.executor);
                    this.executor.execute(this.delegate);
                } else {
                    LOG.trace("Executing ListenenableFuture Runnable on this thread: {}", (Object)Thread.currentThread().getName());
                    this.delegate.run();
                }
            }
        }
    }
}

