package org.apache.pulsar.broker;

import com.google.common.annotations.VisibleForTesting;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
import org.apache.pulsar.broker.transaction.buffer.exceptions.UnsupportedTxnActionException;
import org.apache.pulsar.broker.transaction.recover.TransactionRecoverTrackerImpl;
import org.apache.pulsar.broker.transaction.timeout.TransactionTimeoutTrackerFactoryImpl;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTrackerFactory;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/TransactionMetadataStoreService.class */
public class TransactionMetadataStoreService {
    private static final Logger LOG = LoggerFactory.getLogger(TransactionMetadataStoreService.class);
    private final Map<TransactionCoordinatorID, TransactionMetadataStore> stores = new ConcurrentHashMap();
    private final TransactionMetadataStoreProvider transactionMetadataStoreProvider;
    private final PulsarService pulsarService;
    private final TransactionBufferClient tbClient;
    private final TransactionTimeoutTrackerFactory timeoutTrackerFactory;
    private static final long endTransactionRetryIntervalTime = 1000;
    private final Timer transactionOpRetryTimer;

    public TransactionMetadataStoreService(TransactionMetadataStoreProvider transactionMetadataStoreProvider, PulsarService pulsarService, TransactionBufferClient transactionBufferClient, HashedWheelTimer hashedWheelTimer) {
        this.pulsarService = pulsarService;
        this.transactionMetadataStoreProvider = transactionMetadataStoreProvider;
        this.tbClient = transactionBufferClient;
        this.timeoutTrackerFactory = new TransactionTimeoutTrackerFactoryImpl(this, hashedWheelTimer);
        this.transactionOpRetryTimer = hashedWheelTimer;
    }

    public void start() {
        this.pulsarService.getNamespaceService().addNamespaceBundleOwnershipListener(new NamespaceBundleOwnershipListener() { // from class: org.apache.pulsar.broker.TransactionMetadataStoreService.1
            @Override // org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener
            public void onLoad(NamespaceBundle namespaceBundle) {
                TransactionMetadataStoreService.this.pulsarService.getNamespaceService().getOwnedTopicListForNamespaceBundle(namespaceBundle).whenComplete((list, th) -> {
                    if (th != null) {
                        TransactionMetadataStoreService.LOG.error("Failed to get owned topic list when triggering on-loading bundle {}.", namespaceBundle, th);
                        return;
                    }
                    Iterator it = list.iterator();
                    while (it.hasNext()) {
                        TopicName topicName = TopicName.get((String) it.next());
                        if (TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName().equals(TopicName.get(topicName.getPartitionedTopicName()).getLocalName()) && topicName.isPartitioned()) {
                            TransactionMetadataStoreService.this.addTransactionMetadataStore(TransactionCoordinatorID.get(topicName.getPartitionIndex()));
                        }
                    }
                });
            }

            @Override // org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener
            public void unLoad(NamespaceBundle namespaceBundle) {
                TransactionMetadataStoreService.this.pulsarService.getNamespaceService().getOwnedTopicListForNamespaceBundle(namespaceBundle).whenComplete((list, th) -> {
                    if (th != null) {
                        TransactionMetadataStoreService.LOG.error("Failed to get owned topic list error when triggering un-loading bundle {}.", namespaceBundle, th);
                        return;
                    }
                    Iterator it = list.iterator();
                    while (it.hasNext()) {
                        TopicName topicName = TopicName.get((String) it.next());
                        if (TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName().equals(TopicName.get(topicName.getPartitionedTopicName()).getLocalName()) && topicName.isPartitioned()) {
                            TransactionMetadataStoreService.this.removeTransactionMetadataStore(TransactionCoordinatorID.get(topicName.getPartitionIndex()));
                        }
                    }
                });
            }

            @Override // java.util.function.Predicate
            public boolean test(NamespaceBundle namespaceBundle) {
                return namespaceBundle.getNamespaceObject().equals(NamespaceName.SYSTEM_NAMESPACE);
            }
        });
    }

    public void addTransactionMetadataStore(TransactionCoordinatorID transactionCoordinatorID) {
        this.pulsarService.getBrokerService().getManagedLedgerConfig(TopicName.get("__transaction_log_" + transactionCoordinatorID)).whenComplete((managedLedgerConfig, th) -> {
            if (th != null) {
                LOG.error("Add transaction metadata store with id {} error", Long.valueOf(transactionCoordinatorID.getId()), th);
                return;
            }
            TransactionTimeoutTracker newTracker = this.timeoutTrackerFactory.newTracker(transactionCoordinatorID);
            this.transactionMetadataStoreProvider.openStore(transactionCoordinatorID, this.pulsarService.getManagedLedgerFactory(), managedLedgerConfig, newTracker, new TransactionRecoverTrackerImpl(this, newTracker, transactionCoordinatorID.getId())).whenComplete((transactionMetadataStore, th) -> {
                if (th != null) {
                    LOG.error("Add transaction metadata store with id {} error", Long.valueOf(transactionCoordinatorID.getId()), th);
                } else {
                    this.stores.put(transactionCoordinatorID, transactionMetadataStore);
                    LOG.info("Added new transaction meta store {}", transactionCoordinatorID);
                }
            });
        });
    }

    public void removeTransactionMetadataStore(TransactionCoordinatorID transactionCoordinatorID) {
        TransactionMetadataStore remove = this.stores.remove(transactionCoordinatorID);
        if (remove != null) {
            remove.closeAsync().whenComplete((r5, th) -> {
                if (th != null) {
                    LOG.error("Close transaction metadata store with id " + transactionCoordinatorID, th);
                } else {
                    LOG.info("Removed and closed transaction meta store {}", transactionCoordinatorID);
                }
            });
        }
    }

    public CompletableFuture<TxnID> newTransaction(TransactionCoordinatorID transactionCoordinatorID, long j) {
        TransactionMetadataStore transactionMetadataStore = this.stores.get(transactionCoordinatorID);
        return transactionMetadataStore == null ? FutureUtil.failedFuture(new CoordinatorException.CoordinatorNotFoundException(transactionCoordinatorID)) : transactionMetadataStore.newTransaction(j);
    }

    public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnID, List<String> list) {
        TransactionCoordinatorID tcIdFromTxnId = getTcIdFromTxnId(txnID);
        TransactionMetadataStore transactionMetadataStore = this.stores.get(tcIdFromTxnId);
        return transactionMetadataStore == null ? FutureUtil.failedFuture(new CoordinatorException.CoordinatorNotFoundException(tcIdFromTxnId)) : transactionMetadataStore.addProducedPartitionToTxn(txnID, list);
    }

    public CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnID, List<TransactionSubscription> list) {
        TransactionCoordinatorID tcIdFromTxnId = getTcIdFromTxnId(txnID);
        TransactionMetadataStore transactionMetadataStore = this.stores.get(tcIdFromTxnId);
        return transactionMetadataStore == null ? FutureUtil.failedFuture(new CoordinatorException.CoordinatorNotFoundException(tcIdFromTxnId)) : transactionMetadataStore.addAckedPartitionToTxn(txnID, list);
    }

    public CompletableFuture<TxnMeta> getTxnMeta(TxnID txnID) {
        TransactionCoordinatorID tcIdFromTxnId = getTcIdFromTxnId(txnID);
        TransactionMetadataStore transactionMetadataStore = this.stores.get(tcIdFromTxnId);
        return transactionMetadataStore == null ? FutureUtil.failedFuture(new CoordinatorException.CoordinatorNotFoundException(tcIdFromTxnId)) : transactionMetadataStore.getTxnMeta(txnID);
    }

    public long getLowWaterMark(TxnID txnID) {
        TransactionMetadataStore transactionMetadataStore = this.stores.get(getTcIdFromTxnId(txnID));
        if (transactionMetadataStore == null) {
            return 0L;
        }
        return transactionMetadataStore.getLowWaterMark();
    }

    public CompletableFuture<Void> updateTxnStatus(TxnID txnID, TxnStatus txnStatus, TxnStatus txnStatus2) {
        TransactionCoordinatorID tcIdFromTxnId = getTcIdFromTxnId(txnID);
        TransactionMetadataStore transactionMetadataStore = this.stores.get(tcIdFromTxnId);
        return transactionMetadataStore == null ? FutureUtil.failedFuture(new CoordinatorException.CoordinatorNotFoundException(tcIdFromTxnId)) : transactionMetadataStore.updateTxnStatus(txnID, txnStatus, txnStatus2);
    }

    public CompletableFuture<Void> endTransaction(TxnID txnID, int i) {
        TxnStatus txnStatus;
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        switch (i) {
            case 0:
                txnStatus = TxnStatus.COMMITTING;
                break;
            case 1:
                txnStatus = TxnStatus.ABORTING;
                break;
            default:
                UnsupportedTxnActionException unsupportedTxnActionException = new UnsupportedTxnActionException(txnID, i);
                LOG.error(unsupportedTxnActionException.getMessage());
                completableFuture.completeExceptionally(unsupportedTxnActionException);
                return completableFuture;
        }
        TxnStatus txnStatus2 = txnStatus;
        getTxnMeta(txnID).thenAccept(txnMeta -> {
            TxnStatus status = txnMeta.status();
            if (status == TxnStatus.OPEN) {
                updateTxnStatus(txnID, txnStatus2, TxnStatus.OPEN).thenAccept(r10 -> {
                    endTxnInTransactionBuffer(txnID, i).thenAccept(r4 -> {
                        completableFuture.complete(null);
                    }).exceptionally(th -> {
                        if (isRetryableException(th.getCause())) {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("EndTxnInTransactionBuffer retry! TxnId : {}, TxnAction : {}", new Object[]{txnID, Integer.valueOf(i), th});
                            }
                            this.transactionOpRetryTimer.newTimeout(timeout -> {
                                endTransaction(txnID, i);
                            }, endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS);
                        } else {
                            LOG.error("EndTxnInTransactionBuffer fail! TxnId : {}, TxnAction : {}", new Object[]{txnID, Integer.valueOf(i), th});
                        }
                        completableFuture.completeExceptionally(th);
                        return null;
                    });
                }).exceptionally(th -> {
                    if (isRetryableException(th.getCause())) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("EndTransaction UpdateTxnStatus op retry! TxnId : {}, TxnAction : {}", new Object[]{txnID, Integer.valueOf(i), th});
                        }
                        this.transactionOpRetryTimer.newTimeout(timeout -> {
                            endTransaction(txnID, i);
                        }, endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS);
                    } else {
                        LOG.error("EndTransaction UpdateTxnStatus fail! TxnId : {}, TxnAction : {}", new Object[]{txnID, Integer.valueOf(i), th});
                    }
                    completableFuture.completeExceptionally(th);
                    return null;
                });
                return;
            }
            if ((status == TxnStatus.COMMITTING && i == TxnAction.COMMIT.getValue()) || (status == TxnStatus.ABORTING && i == TxnAction.ABORT.getValue())) {
                endTxnInTransactionBuffer(txnID, i).thenAccept(r4 -> {
                    completableFuture.complete(null);
                }).exceptionally(th2 -> {
                    if (isRetryableException(th2.getCause())) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("EndTxnInTransactionBuffer retry! TxnId : {}, TxnAction : {}", new Object[]{txnID, Integer.valueOf(i), th2});
                        }
                        this.transactionOpRetryTimer.newTimeout(timeout -> {
                            endTransaction(txnID, i);
                        }, endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS);
                    } else {
                        LOG.error("EndTxnInTransactionBuffer fail! TxnId : {}, TxnAction : {}", new Object[]{txnID, Integer.valueOf(i), th2});
                    }
                    completableFuture.completeExceptionally(th2);
                    return null;
                });
                return;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("EndTxnInTransactionBuffer op retry! TxnId : {}, TxnAction : {}", txnID, Integer.valueOf(i));
            }
            completableFuture.completeExceptionally(new CoordinatorException.InvalidTxnStatusException(txnID, txnStatus2, status));
        }).exceptionally(th -> {
            if (isRetryableException(th.getCause())) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("End transaction op retry! TxnId : {}, TxnAction : {}", new Object[]{txnID, Integer.valueOf(i), th});
                }
                this.transactionOpRetryTimer.newTimeout(timeout -> {
                    endTransaction(txnID, i);
                }, endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS);
            }
            completableFuture.completeExceptionally(th);
            return null;
        });
        return completableFuture;
    }

    public CompletableFuture<Void> endTransactionForTimeout(TxnID txnID) {
        return getTxnMeta(txnID).thenCompose(txnMeta -> {
            if (txnMeta.status() == TxnStatus.OPEN) {
                return endTransaction(txnID, 1);
            }
            return null;
        }).exceptionally((Function<Throwable, ? extends U>) th -> {
            if (isRetryableException(th.getCause())) {
                endTransaction(txnID, 1);
                return null;
            }
            if (!LOG.isDebugEnabled()) {
                return null;
            }
            LOG.debug("Transaction have been handle complete, don't need to handle by transaction timeout! TxnId : {}", txnID);
            return null;
        });
    }

    private CompletableFuture<Void> endTxnInTransactionBuffer(TxnID txnID, int i) {
        CompletableFuture completableFuture = new CompletableFuture();
        ArrayList arrayList = new ArrayList();
        getTxnMeta(txnID).whenComplete((txnMeta, th) -> {
            if (th != null) {
                completableFuture.completeExceptionally(th);
                return;
            }
            long lowWaterMark = getLowWaterMark(txnID);
            txnMeta.ackedPartitions().forEach(transactionSubscription -> {
                CompletableFuture completableFuture2 = new CompletableFuture();
                if (0 == i) {
                    completableFuture2 = this.tbClient.commitTxnOnSubscription(transactionSubscription.getTopic(), transactionSubscription.getSubscription(), txnID.getMostSigBits(), txnID.getLeastSigBits(), lowWaterMark);
                } else if (1 == i) {
                    completableFuture2 = this.tbClient.abortTxnOnSubscription(transactionSubscription.getTopic(), transactionSubscription.getSubscription(), txnID.getMostSigBits(), txnID.getLeastSigBits(), lowWaterMark);
                } else {
                    completableFuture2.completeExceptionally(new Throwable("Unsupported txnAction " + i));
                }
                arrayList.add(completableFuture2);
            });
            txnMeta.producedPartitions().forEach(str -> {
                CompletableFuture completableFuture2 = new CompletableFuture();
                if (0 == i) {
                    completableFuture2 = this.tbClient.commitTxnOnTopic(str, txnID.getMostSigBits(), txnID.getLeastSigBits(), lowWaterMark);
                } else if (1 == i) {
                    completableFuture2 = this.tbClient.abortTxnOnTopic(str, txnID.getMostSigBits(), txnID.getLeastSigBits(), lowWaterMark);
                } else {
                    completableFuture2.completeExceptionally(new Throwable("Unsupported txnAction " + i));
                }
                arrayList.add(completableFuture2);
            });
            try {
                FutureUtil.waitForAll(arrayList).whenComplete((r4, th) -> {
                    if (th != null) {
                        completableFuture.completeExceptionally(th);
                    } else {
                        completableFuture.complete(null);
                    }
                });
            } catch (Exception e) {
                completableFuture.completeExceptionally(e);
            }
        });
        return completableFuture.thenCompose(r7 -> {
            return endTxnInTransactionMetadataStore(txnID, i);
        });
    }

    private static boolean isRetryableException(Throwable th) {
        return (th instanceof CoordinatorException.TransactionMetadataStoreStateException) || (th instanceof TransactionBufferClientException.RequestTimeoutException) || (th instanceof ManagedLedgerException) || (th instanceof PulsarClientException.BrokerPersistenceException) || (th instanceof PulsarClientException.LookupException) || (th instanceof TransactionBufferClientException.ReachMaxPendingOpsException) || (th instanceof PulsarClientException.ConnectException);
    }

    private CompletableFuture<Void> endTxnInTransactionMetadataStore(TxnID txnID, int i) {
        return TxnAction.COMMIT.getValue() == i ? updateTxnStatus(txnID, TxnStatus.COMMITTED, TxnStatus.COMMITTING) : TxnAction.ABORT.getValue() == i ? updateTxnStatus(txnID, TxnStatus.ABORTED, TxnStatus.ABORTING) : FutureUtil.failedFuture(new CoordinatorException.InvalidTxnStatusException("Unsupported txnAction " + i));
    }

    private TransactionCoordinatorID getTcIdFromTxnId(TxnID txnID) {
        return new TransactionCoordinatorID(txnID.getMostSigBits());
    }

    public TransactionMetadataStoreProvider getTransactionMetadataStoreProvider() {
        return this.transactionMetadataStoreProvider;
    }

    @VisibleForTesting
    public Map<TransactionCoordinatorID, TransactionMetadataStore> getStores() {
        return Collections.unmodifiableMap(this.stores);
    }
}
