/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.transaction.pendingack.impl;

import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.exception.pendingack.TransactionPendingAckException;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.TopicName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MLPendingAckStoreProvider
implements TransactionPendingAckStoreProvider {
    private static final Logger log = LoggerFactory.getLogger(MLPendingAckStoreProvider.class);

    @Override
    public CompletableFuture<PendingAckStore> newPendingAckStore(final PersistentSubscription subscription) {
        final CompletableFuture<PendingAckStore> pendingAckStoreFuture = new CompletableFuture<PendingAckStore>();
        if (subscription == null) {
            pendingAckStoreFuture.completeExceptionally(new TransactionPendingAckException.TransactionPendingAckStoreProviderException("The subscription is null."));
            return pendingAckStoreFuture;
        }
        final PersistentTopic originPersistentTopic = (PersistentTopic)subscription.getTopic();
        String pendingAckTopicName = MLPendingAckStore.getTransactionPendingAckStoreSuffix(originPersistentTopic.getName(), subscription.getName());
        ((CompletableFuture)originPersistentTopic.getBrokerService().getManagedLedgerFactory().asyncExists(TopicName.get(pendingAckTopicName).getPersistenceNamingEncoding()).thenAccept(exist -> {
            TopicName topicName = exist != false ? TopicName.get(pendingAckTopicName) : TopicName.get(originPersistentTopic.getName());
            originPersistentTopic.getBrokerService().getManagedLedgerConfig(topicName).thenAccept(config -> {
                config.setCreateIfMissing(true);
                originPersistentTopic.getBrokerService().getManagedLedgerFactory().asyncOpen(TopicName.get(pendingAckTopicName).getPersistenceNamingEncoding(), (ManagedLedgerConfig)config, new AsyncCallbacks.OpenLedgerCallback(){

                    @Override
                    public void openLedgerComplete(final ManagedLedger ledger, Object ctx) {
                        ledger.asyncOpenCursor(MLPendingAckStore.getTransactionPendingAckStoreCursorName(), CommandSubscribe.InitialPosition.Earliest, new AsyncCallbacks.OpenCursorCallback(){

                            @Override
                            public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                                pendingAckStoreFuture.complete(new MLPendingAckStore(ledger, cursor, subscription.getCursor(), originPersistentTopic.getBrokerService().getPulsar().getConfiguration().getTransactionPendingAckLogIndexMinLag()));
                                if (log.isDebugEnabled()) {
                                    log.debug("{},{} open MLPendingAckStore cursor success", (Object)originPersistentTopic.getName(), (Object)subscription.getName());
                                }
                            }

                            @Override
                            public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
                                log.error("{},{} open MLPendingAckStore cursor failed.", new Object[]{originPersistentTopic.getName(), subscription.getName(), exception});
                                pendingAckStoreFuture.completeExceptionally(exception);
                            }
                        }, null);
                    }

                    @Override
                    public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
                        log.error("{}, {} open MLPendingAckStore managedLedger failed.", new Object[]{originPersistentTopic.getName(), subscription.getName(), exception});
                        pendingAckStoreFuture.completeExceptionally(exception);
                    }
                }, () -> true, null);
            });
        })).exceptionally(e -> {
            log.error("Failed to obtain the existence of ManagerLedger with topic and subscription : " + originPersistentTopic.getSubscriptions() + "  " + subscription.getName());
            pendingAckStoreFuture.completeExceptionally(e.getCause());
            return null;
        });
        return pendingAckStoreFuture;
    }

    @Override
    public CompletableFuture<Boolean> checkInitializedBefore(PersistentSubscription subscription) {
        PersistentTopic originPersistentTopic = (PersistentTopic)subscription.getTopic();
        String pendingAckTopicName = MLPendingAckStore.getTransactionPendingAckStoreSuffix(originPersistentTopic.getName(), subscription.getName());
        return originPersistentTopic.getBrokerService().getManagedLedgerFactory().asyncExists(TopicName.get(pendingAckTopicName).getPersistenceNamingEncoding());
    }
}

