package org.apache.pulsar.broker;

import com.google.common.annotations.VisibleForTesting;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.transaction.exception.coordinator.TransactionCoordinatorException;
import org.apache.pulsar.broker.transaction.recover.TransactionRecoverTrackerImpl;
import org.apache.pulsar.broker.transaction.timeout.TransactionTimeoutTrackerFactoryImpl;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTrackerFactory;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/TransactionMetadataStoreService.class */
public class TransactionMetadataStoreService {
    private static final Logger LOG = LoggerFactory.getLogger(TransactionMetadataStoreService.class);
    private final TransactionMetadataStoreProvider transactionMetadataStoreProvider;
    private final PulsarService pulsarService;
    private final TransactionBufferClient tbClient;
    private final TransactionTimeoutTrackerFactory timeoutTrackerFactory;
    private static final long endTransactionRetryIntervalTime = 1000;
    private final Timer transactionOpRetryTimer;
    private static final long HANDLE_PENDING_CONNECT_TIME_OUT = 30000;
    private final ThreadFactory threadFactory = new ExecutorProvider.ExtendedThreadFactory("transaction-coordinator-thread-factory");
    private final Map<TransactionCoordinatorID, TransactionMetadataStore> stores = new ConcurrentHashMap();
    private final ConcurrentLongHashMap<Semaphore> tcLoadSemaphores = ConcurrentLongHashMap.newBuilder().build();
    private final ConcurrentLongHashMap<ConcurrentLinkedDeque<CompletableFuture<Void>>> pendingConnectRequests = ConcurrentLongHashMap.newBuilder().build();
    private final ExecutorService internalPinnedExecutor = Executors.newSingleThreadScheduledExecutor(this.threadFactory);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.pulsar.broker.TransactionMetadataStoreService$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/pulsar/broker/TransactionMetadataStoreService$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$pulsar$transaction$coordinator$proto$TxnStatus = new int[TxnStatus.values().length];

        static {
            try {
                $SwitchMap$org$apache$pulsar$transaction$coordinator$proto$TxnStatus[TxnStatus.COMMITTING.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$pulsar$transaction$coordinator$proto$TxnStatus[TxnStatus.ABORTING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public TransactionMetadataStoreService(TransactionMetadataStoreProvider transactionMetadataStoreProvider, PulsarService pulsarService, TransactionBufferClient transactionBufferClient, HashedWheelTimer hashedWheelTimer) {
        this.pulsarService = pulsarService;
        this.transactionMetadataStoreProvider = transactionMetadataStoreProvider;
        this.tbClient = transactionBufferClient;
        this.timeoutTrackerFactory = new TransactionTimeoutTrackerFactoryImpl(this, hashedWheelTimer);
        this.transactionOpRetryTimer = hashedWheelTimer;
    }

    @Deprecated
    public void start() {
        this.pulsarService.getNamespaceService().addNamespaceBundleOwnershipListener(new NamespaceBundleOwnershipListener() { // from class: org.apache.pulsar.broker.TransactionMetadataStoreService.1
            @Override // org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener
            public void onLoad(NamespaceBundle namespaceBundle) {
                TransactionMetadataStoreService.this.pulsarService.getNamespaceService().getOwnedTopicListForNamespaceBundle(namespaceBundle).whenComplete((list, th) -> {
                    if (th != null) {
                        TransactionMetadataStoreService.LOG.error("Failed to get owned topic list when triggering on-loading bundle {}.", namespaceBundle, th);
                        return;
                    }
                    Iterator it = list.iterator();
                    while (it.hasNext()) {
                        TopicName topicName = TopicName.get((String) it.next());
                        if (TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName().equals(TopicName.get(topicName.getPartitionedTopicName()).getLocalName()) && topicName.isPartitioned()) {
                            TransactionMetadataStoreService.this.handleTcClientConnect(TransactionCoordinatorID.get(topicName.getPartitionIndex()));
                        }
                    }
                });
            }

            @Override // org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener
            public void unLoad(NamespaceBundle namespaceBundle) {
                TransactionMetadataStoreService.this.pulsarService.getNamespaceService().getOwnedTopicListForNamespaceBundle(namespaceBundle).whenComplete((list, th) -> {
                    if (th != null) {
                        TransactionMetadataStoreService.LOG.error("Failed to get owned topic list error when triggering un-loading bundle {}.", namespaceBundle, th);
                        return;
                    }
                    Iterator it = list.iterator();
                    while (it.hasNext()) {
                        TopicName topicName = TopicName.get((String) it.next());
                        if (TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName().equals(TopicName.get(topicName.getPartitionedTopicName()).getLocalName()) && topicName.isPartitioned()) {
                            TransactionMetadataStoreService.this.removeTransactionMetadataStore(TransactionCoordinatorID.get(topicName.getPartitionIndex()));
                        }
                    }
                });
            }

            @Override // java.util.function.Predicate
            public boolean test(NamespaceBundle namespaceBundle) {
                return namespaceBundle.getNamespaceObject().equals(NamespaceName.SYSTEM_NAMESPACE);
            }
        });
    }

    public CompletableFuture<Void> handleTcClientConnect(TransactionCoordinatorID transactionCoordinatorID) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.internalPinnedExecutor.execute(() -> {
            if (this.stores.get(transactionCoordinatorID) != null) {
                completableFuture.complete(null);
            } else {
                this.pulsarService.getBrokerService().checkTopicNsOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition((int) transactionCoordinatorID.getId()).toString()).thenRun(() -> {
                    this.internalPinnedExecutor.execute(() -> {
                        Semaphore semaphore = (Semaphore) this.tcLoadSemaphores.computeIfAbsent(transactionCoordinatorID.getId(), j -> {
                            return new Semaphore(1);
                        });
                        Deque deque = (Deque) this.pendingConnectRequests.computeIfAbsent(transactionCoordinatorID.getId(), j2 -> {
                            return new ConcurrentLinkedDeque();
                        });
                        if (!semaphore.tryAcquire()) {
                            deque.add(completableFuture);
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Handle tc client connect added into pending queue! tcId : {}", transactionCoordinatorID.toString());
                                return;
                            }
                            return;
                        }
                        if (this.stores.get(transactionCoordinatorID) != null) {
                            completableFuture.complete(null);
                            semaphore.release();
                        } else {
                            TransactionTimeoutTracker newTracker = this.timeoutTrackerFactory.newTracker(transactionCoordinatorID);
                            TransactionRecoverTrackerImpl transactionRecoverTrackerImpl = new TransactionRecoverTrackerImpl(this, newTracker, transactionCoordinatorID.getId());
                            openTransactionMetadataStore(transactionCoordinatorID, newTracker, transactionRecoverTrackerImpl).thenAccept(transactionMetadataStore -> {
                                this.internalPinnedExecutor.execute(() -> {
                                    this.stores.put(transactionCoordinatorID, transactionMetadataStore);
                                    LOG.info("Added new transaction meta store {}", transactionCoordinatorID);
                                    transactionRecoverTrackerImpl.handleCommittingAndAbortingTransaction();
                                    newTracker.start();
                                    long currentTimeMillis = System.currentTimeMillis() + 30000;
                                    while (true) {
                                        if (System.currentTimeMillis() >= currentTimeMillis) {
                                            deque.clear();
                                            break;
                                        }
                                        CompletableFuture completableFuture2 = (CompletableFuture) deque.poll();
                                        if (completableFuture2 == null) {
                                            break;
                                        } else {
                                            completableFuture2.complete(null);
                                        }
                                    }
                                    completableFuture.complete(null);
                                    semaphore.release();
                                });
                            }).exceptionally(th -> {
                                this.internalPinnedExecutor.execute(() -> {
                                    completableFuture.completeExceptionally(th.getCause());
                                    semaphore.release();
                                    long currentTimeMillis = System.currentTimeMillis() + 30000;
                                    while (true) {
                                        if (System.currentTimeMillis() >= currentTimeMillis) {
                                            deque.clear();
                                            break;
                                        }
                                        CompletableFuture completableFuture2 = (CompletableFuture) deque.poll();
                                        if (completableFuture2 == null) {
                                            break;
                                        } else {
                                            completableFuture2.completeExceptionally(th);
                                        }
                                    }
                                    LOG.error("Add transaction metadata store with id {} error", Long.valueOf(transactionCoordinatorID.getId()), th);
                                });
                                return null;
                            });
                        }
                    });
                }).exceptionally(th -> {
                    completableFuture.completeExceptionally(FutureUtil.unwrapCompletionException(th));
                    return null;
                });
            }
        });
        return completableFuture;
    }

    public CompletableFuture<TransactionMetadataStore> openTransactionMetadataStore(TransactionCoordinatorID transactionCoordinatorID, TransactionTimeoutTracker transactionTimeoutTracker, TransactionRecoverTracker transactionRecoverTracker) {
        return this.pulsarService.getBrokerService().getManagedLedgerConfig(MLTransactionLogImpl.getMLTransactionLogName(transactionCoordinatorID)).thenCompose(managedLedgerConfig -> {
            return this.transactionMetadataStoreProvider.openStore(transactionCoordinatorID, this.pulsarService.getManagedLedgerFactory(), managedLedgerConfig, transactionTimeoutTracker, transactionRecoverTracker);
        });
    }

    public CompletableFuture<Void> removeTransactionMetadataStore(TransactionCoordinatorID transactionCoordinatorID) {
        Semaphore semaphore = (Semaphore) this.tcLoadSemaphores.computeIfAbsent(transactionCoordinatorID.getId(), j -> {
            return new Semaphore(1);
        });
        if (!semaphore.tryAcquire()) {
            return FutureUtil.failedFuture(new BrokerServiceException.ServiceUnitNotReadyException("Could not remove TransactionMetadataStore, it is doing other operations!"));
        }
        TransactionMetadataStore remove = this.stores.remove(transactionCoordinatorID);
        if (remove != null) {
            remove.closeAsync().whenComplete((r5, th) -> {
                if (th != null) {
                    LOG.error("Close transaction metadata store with id " + transactionCoordinatorID, th);
                } else {
                    LOG.info("Removed and closed transaction meta store {}", transactionCoordinatorID);
                }
            });
        }
        semaphore.release();
        return CompletableFuture.completedFuture(null);
    }

    public CompletableFuture<TxnID> newTransaction(TransactionCoordinatorID transactionCoordinatorID, long j) {
        TransactionMetadataStore transactionMetadataStore = this.stores.get(transactionCoordinatorID);
        return transactionMetadataStore == null ? FutureUtil.failedFuture(new CoordinatorException.CoordinatorNotFoundException(transactionCoordinatorID)) : transactionMetadataStore.newTransaction(j);
    }

    public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnID, List<String> list) {
        TransactionCoordinatorID tcIdFromTxnId = getTcIdFromTxnId(txnID);
        TransactionMetadataStore transactionMetadataStore = this.stores.get(tcIdFromTxnId);
        return transactionMetadataStore == null ? FutureUtil.failedFuture(new CoordinatorException.CoordinatorNotFoundException(tcIdFromTxnId)) : transactionMetadataStore.addProducedPartitionToTxn(txnID, list);
    }

    public CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnID, List<TransactionSubscription> list) {
        TransactionCoordinatorID tcIdFromTxnId = getTcIdFromTxnId(txnID);
        TransactionMetadataStore transactionMetadataStore = this.stores.get(tcIdFromTxnId);
        return transactionMetadataStore == null ? FutureUtil.failedFuture(new CoordinatorException.CoordinatorNotFoundException(tcIdFromTxnId)) : transactionMetadataStore.addAckedPartitionToTxn(txnID, list);
    }

    public CompletableFuture<TxnMeta> getTxnMeta(TxnID txnID) {
        TransactionCoordinatorID tcIdFromTxnId = getTcIdFromTxnId(txnID);
        TransactionMetadataStore transactionMetadataStore = this.stores.get(tcIdFromTxnId);
        return transactionMetadataStore == null ? FutureUtil.failedFuture(new CoordinatorException.CoordinatorNotFoundException(tcIdFromTxnId)) : transactionMetadataStore.getTxnMeta(txnID);
    }

    public long getLowWaterMark(TxnID txnID) {
        TransactionMetadataStore transactionMetadataStore = this.stores.get(getTcIdFromTxnId(txnID));
        if (transactionMetadataStore == null) {
            return 0L;
        }
        return transactionMetadataStore.getLowWaterMark();
    }

    public CompletableFuture<Void> updateTxnStatus(TxnID txnID, TxnStatus txnStatus, TxnStatus txnStatus2, boolean z) {
        TransactionCoordinatorID tcIdFromTxnId = getTcIdFromTxnId(txnID);
        TransactionMetadataStore transactionMetadataStore = this.stores.get(tcIdFromTxnId);
        return transactionMetadataStore == null ? FutureUtil.failedFuture(new CoordinatorException.CoordinatorNotFoundException(tcIdFromTxnId)) : transactionMetadataStore.updateTxnStatus(txnID, txnStatus, txnStatus2, z);
    }

    public CompletableFuture<Void> endTransaction(TxnID txnID, int i, boolean z) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        endTransaction(txnID, i, z, completableFuture);
        return completableFuture;
    }

    public void endTransaction(TxnID txnID, int i, boolean z, CompletableFuture<Void> completableFuture) {
        TxnStatus txnStatus;
        switch (i) {
            case 0:
                txnStatus = TxnStatus.COMMITTING;
                break;
            case 1:
                txnStatus = TxnStatus.ABORTING;
                break;
            default:
                TransactionCoordinatorException.UnsupportedTxnActionException unsupportedTxnActionException = new TransactionCoordinatorException.UnsupportedTxnActionException(txnID, i);
                LOG.error(unsupportedTxnActionException.getMessage());
                completableFuture.completeExceptionally(unsupportedTxnActionException);
                return;
        }
        TxnStatus txnStatus2 = txnStatus;
        getTxnMeta(txnID).thenCompose(txnMeta -> {
            return txnMeta.status() == TxnStatus.OPEN ? updateTxnStatus(txnID, txnStatus2, TxnStatus.OPEN, z).thenCompose(r7 -> {
                return endTxnInTransactionBuffer(txnID, i);
            }) : fakeAsyncCheckTxnStatus(txnMeta.status(), i, txnID, txnStatus2).thenCompose(r72 -> {
                return endTxnInTransactionBuffer(txnID, i);
            });
        }).whenComplete((BiConsumer<? super U, ? super Throwable>) (r12, th) -> {
            if (th == null) {
                completableFuture.complete(null);
                return;
            }
            Throwable unwrapCompletionException = FutureUtil.unwrapCompletionException(th);
            if (!isRetryableException(unwrapCompletionException)) {
                LOG.error("End transaction fail! TxnId : {}, TxnAction : {}", new Object[]{txnID, Integer.valueOf(i), unwrapCompletionException});
                completableFuture.completeExceptionally(th);
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("EndTxnInTransactionBuffer retry! TxnId : {}, TxnAction : {}", new Object[]{txnID, Integer.valueOf(i), unwrapCompletionException});
                }
                this.transactionOpRetryTimer.newTimeout(timeout -> {
                    endTransaction(txnID, i, z, completableFuture);
                }, endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS);
            }
        });
    }

    private CompletionStage<Void> fakeAsyncCheckTxnStatus(TxnStatus txnStatus, int i, TxnID txnID, TxnStatus txnStatus2) {
        boolean z;
        switch (AnonymousClass2.$SwitchMap$org$apache$pulsar$transaction$coordinator$proto$TxnStatus[txnStatus.ordinal()]) {
            case 1:
                z = i == TxnAction.COMMIT.getValue();
                break;
            case 2:
                z = i == TxnAction.ABORT.getValue();
                break;
            default:
                z = false;
                break;
        }
        if (z) {
            return CompletableFuture.completedFuture(null);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("EndTxnInTransactionBuffer op retry! TxnId : {}, TxnAction : {}", txnID, Integer.valueOf(i));
        }
        return FutureUtil.failedFuture(new CoordinatorException.InvalidTxnStatusException(txnID, txnStatus2, txnStatus));
    }

    public void handleOpFail(Throwable th, TransactionCoordinatorID transactionCoordinatorID) {
        if (th instanceof ManagedLedgerException.ManagedLedgerFencedException) {
            removeTransactionMetadataStore(transactionCoordinatorID);
        }
    }

    public void endTransactionForTimeout(TxnID txnID) {
        getTxnMeta(txnID).thenCompose(txnMeta -> {
            if (txnMeta.status() == TxnStatus.OPEN) {
                return endTransaction(txnID, 1, true);
            }
            return null;
        }).exceptionally((Function<Throwable, ? extends U>) th -> {
            if (isRetryableException(th.getCause())) {
                endTransaction(txnID, 1, true);
                return null;
            }
            if (!LOG.isDebugEnabled()) {
                return null;
            }
            LOG.debug("Transaction have been handle complete, don't need to handle by transaction timeout! TxnId : {}", txnID);
            return null;
        });
    }

    private CompletableFuture<Void> endTxnInTransactionBuffer(TxnID txnID, int i) {
        return getTxnMeta(txnID).thenCompose(txnMeta -> {
            long lowWaterMark = getLowWaterMark(txnID);
            return FutureUtil.waitForAll((List) Stream.concat(txnMeta.ackedPartitions().stream().map(transactionSubscription -> {
                switch (i) {
                    case 0:
                        return this.tbClient.commitTxnOnSubscription(transactionSubscription.getTopic(), transactionSubscription.getSubscription(), txnID.getMostSigBits(), txnID.getLeastSigBits(), lowWaterMark);
                    case 1:
                        return this.tbClient.abortTxnOnSubscription(transactionSubscription.getTopic(), transactionSubscription.getSubscription(), txnID.getMostSigBits(), txnID.getLeastSigBits(), lowWaterMark);
                    default:
                        return FutureUtil.failedFuture(new IllegalStateException("Unsupported txnAction " + i));
                }
            }), txnMeta.producedPartitions().stream().map(str -> {
                switch (i) {
                    case 0:
                        return this.tbClient.commitTxnOnTopic(str, txnID.getMostSigBits(), txnID.getLeastSigBits(), lowWaterMark);
                    case 1:
                        return this.tbClient.abortTxnOnTopic(str, txnID.getMostSigBits(), txnID.getLeastSigBits(), lowWaterMark);
                    default:
                        return FutureUtil.failedFuture(new IllegalStateException("Unsupported txnAction " + i));
                }
            })).collect(Collectors.toList())).thenCompose(r7 -> {
                return endTxnInTransactionMetadataStore(txnID, i);
            });
        });
    }

    private static boolean isRetryableException(Throwable th) {
        return ((th instanceof CoordinatorException.TransactionMetadataStoreStateException) || (th instanceof TransactionBufferClientException.RequestTimeoutException) || (th instanceof ManagedLedgerException) || (th instanceof PulsarClientException.BrokerPersistenceException) || (th instanceof PulsarClientException.LookupException) || (th instanceof TransactionBufferClientException.ReachMaxPendingOpsException) || (th instanceof PulsarClientException.ConnectException)) && !(th instanceof ManagedLedgerException.ManagedLedgerFencedException);
    }

    private CompletableFuture<Void> endTxnInTransactionMetadataStore(TxnID txnID, int i) {
        return TxnAction.COMMIT.getValue() == i ? updateTxnStatus(txnID, TxnStatus.COMMITTED, TxnStatus.COMMITTING, false) : TxnAction.ABORT.getValue() == i ? updateTxnStatus(txnID, TxnStatus.ABORTED, TxnStatus.ABORTING, false) : FutureUtil.failedFuture(new CoordinatorException.InvalidTxnStatusException("Unsupported txnAction " + i));
    }

    private TransactionCoordinatorID getTcIdFromTxnId(TxnID txnID) {
        return new TransactionCoordinatorID(txnID.getMostSigBits());
    }

    @VisibleForTesting
    public Map<TransactionCoordinatorID, TransactionMetadataStore> getStores() {
        return Collections.unmodifiableMap(this.stores);
    }

    public synchronized void close() {
        this.internalPinnedExecutor.shutdown();
        this.stores.forEach((transactionCoordinatorID, transactionMetadataStore) -> {
            transactionMetadataStore.closeAsync().whenComplete((r5, th) -> {
                if (th != null) {
                    LOG.error("Close transaction metadata store with id " + transactionCoordinatorID, th);
                } else {
                    LOG.info("Removed and closed transaction meta store {}", transactionCoordinatorID);
                }
            });
        });
        this.stores.clear();
    }
}
