/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.transaction.coordinator.impl;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionLogReplayCallback;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
import org.apache.pulsar.transaction.coordinator.impl.TxnMetaImpl;
import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MLTransactionMetadataStore
extends TransactionMetadataStoreState
implements TransactionMetadataStore {
    private static final Logger log = LoggerFactory.getLogger(MLTransactionMetadataStore.class);
    private final TransactionCoordinatorID tcID;
    private final AtomicLong sequenceId = new AtomicLong(-1L);
    private final MLTransactionLogImpl transactionLog;
    private static final long TC_ID_NOT_USED = -1L;
    private final ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>> txnMetaMap = new ConcurrentHashMap<TxnID, Pair<TxnMeta, List<Position>>>();

    public MLTransactionMetadataStore(TransactionCoordinatorID tcID, MLTransactionLogImpl mlTransactionLog) {
        super(TransactionMetadataStoreState.State.None);
        this.tcID = tcID;
        this.transactionLog = mlTransactionLog;
        if (!this.changeToInitializingState()) {
            log.error("Managed ledger transaction metadata store change state error when init it");
            return;
        }
        new Thread(() -> this.transactionLog.replayAsync(new TransactionLogReplayCallback(){

            @Override
            public void replayComplete() {
                if (!MLTransactionMetadataStore.this.changeToReadyState()) {
                    log.error("Managed ledger transaction metadata store change state error when replay complete");
                }
            }

            @Override
            public void handleMetadataEntry(Position position, PulsarTransactionMetadata.TransactionMetadataEntry transactionMetadataEntry) {
                try {
                    TxnID txnID = new TxnID(transactionMetadataEntry.getTxnidMostBits(), transactionMetadataEntry.getTxnidLeastBits());
                    switch (transactionMetadataEntry.getMetadataOp()) {
                        case NEW: {
                            if (MLTransactionMetadataStore.this.sequenceId.get() < transactionMetadataEntry.getTxnidLeastBits()) {
                                MLTransactionMetadataStore.this.sequenceId.set(transactionMetadataEntry.getTxnidLeastBits());
                            }
                            if (MLTransactionMetadataStore.this.txnMetaMap.containsKey(txnID)) {
                                ((List)((Pair)MLTransactionMetadataStore.this.txnMetaMap.get(txnID)).getRight()).add(position);
                                break;
                            }
                            ArrayList<Position> positions = new ArrayList<Position>();
                            positions.add(position);
                            MLTransactionMetadataStore.this.txnMetaMap.put(txnID, MutablePair.of(TxnMetaImpl.create(txnID), positions));
                            break;
                        }
                        case ADD_PARTITION: {
                            if (!MLTransactionMetadataStore.this.txnMetaMap.containsKey(txnID)) {
                                MLTransactionMetadataStore.this.transactionLog.deletePosition(Collections.singletonList(position));
                                break;
                            }
                            ((TxnMeta)((Pair)MLTransactionMetadataStore.this.txnMetaMap.get(txnID)).getLeft()).addProducedPartitions(transactionMetadataEntry.getPartitionsList());
                            ((List)((Pair)MLTransactionMetadataStore.this.txnMetaMap.get(txnID)).getRight()).add(position);
                            break;
                        }
                        case ADD_SUBSCRIPTION: {
                            if (!MLTransactionMetadataStore.this.txnMetaMap.containsKey(txnID)) {
                                MLTransactionMetadataStore.this.transactionLog.deletePosition(Collections.singletonList(position));
                                break;
                            }
                            ((TxnMeta)((Pair)MLTransactionMetadataStore.this.txnMetaMap.get(txnID)).getLeft()).addAckedPartitions(MLTransactionMetadataStore.subscriptionToTxnSubscription(transactionMetadataEntry.getSubscriptionsList()));
                            ((List)((Pair)MLTransactionMetadataStore.this.txnMetaMap.get(txnID)).getRight()).add(position);
                            break;
                        }
                        case UPDATE: {
                            if (!MLTransactionMetadataStore.this.txnMetaMap.containsKey(txnID)) {
                                MLTransactionMetadataStore.this.transactionLog.deletePosition(Collections.singletonList(position));
                                break;
                            }
                            PulsarTransactionMetadata.TxnStatus newStatus = transactionMetadataEntry.getNewStatus();
                            if (newStatus == PulsarTransactionMetadata.TxnStatus.COMMITTED || newStatus == PulsarTransactionMetadata.TxnStatus.ABORTED) {
                                MLTransactionMetadataStore.this.transactionLog.deletePosition((List)((Pair)MLTransactionMetadataStore.this.txnMetaMap.get(txnID)).getRight()).thenAccept(v -> {
                                    TxnMeta txnMeta = (TxnMeta)((Pair)MLTransactionMetadataStore.this.txnMetaMap.remove(txnID)).getLeft();
                                    ((TxnMetaImpl)txnMeta).recycle();
                                });
                            } else {
                                ((TxnMeta)((Pair)MLTransactionMetadataStore.this.txnMetaMap.get(txnID)).getLeft()).updateTxnStatus(transactionMetadataEntry.getNewStatus(), transactionMetadataEntry.getExpectedStatus());
                            }
                            ((List)((Pair)MLTransactionMetadataStore.this.txnMetaMap.get(txnID)).getRight()).add(position);
                            break;
                        }
                        default: {
                            throw new CoordinatorException.InvalidTxnStatusException("Transaction `" + txnID + "` load replay metadata operation from transaction log with unknown operation");
                        }
                    }
                }
                catch (CoordinatorException.InvalidTxnStatusException e) {
                    log.error(e.getMessage(), (Throwable)e);
                }
            }
        })).start();
    }

    @Override
    public CompletableFuture<PulsarTransactionMetadata.TxnStatus> getTxnStatus(TxnID txnID) {
        return CompletableFuture.completedFuture(((TxnMeta)((Pair)this.txnMetaMap.get(txnID)).getLeft()).status());
    }

    @Override
    public CompletableFuture<TxnMeta> getTxnMeta(TxnID txnID) {
        Pair txnMetaListPair = (Pair)this.txnMetaMap.get(txnID);
        CompletableFuture<TxnMeta> completableFuture = new CompletableFuture<TxnMeta>();
        if (txnMetaListPair == null) {
            completableFuture.completeExceptionally(new CoordinatorException.TransactionNotFoundException(txnID));
        } else {
            completableFuture.complete((TxnMeta)txnMetaListPair.getLeft());
        }
        return completableFuture;
    }

    @Override
    public CompletableFuture<TxnID> newTransaction(long timeOut) {
        if (!this.checkIfReady()) {
            return FutureUtil.failedFuture(new CoordinatorException.TransactionMetadataStoreStateException(this.tcID, TransactionMetadataStoreState.State.Ready, this.getState(), "new Transaction"));
        }
        long mostSigBits = this.tcID.getId();
        long leastSigBits = this.sequenceId.incrementAndGet();
        TxnID txnID = new TxnID(mostSigBits, leastSigBits);
        long currentTimeMillis = System.currentTimeMillis();
        PulsarTransactionMetadata.TransactionMetadataEntry transactionMetadataEntry = PulsarTransactionMetadata.TransactionMetadataEntry.newBuilder().setTxnidMostBits(mostSigBits).setTxnidLeastBits(leastSigBits).setStartTime(currentTimeMillis).setTimeoutMs(timeOut).setMetadataOp(PulsarTransactionMetadata.TransactionMetadataEntry.TransactionMetadataOp.NEW).setLastModificationTime(currentTimeMillis).build();
        return this.transactionLog.append(transactionMetadataEntry).thenCompose(position -> {
            TxnMetaImpl txn = TxnMetaImpl.create(txnID);
            ArrayList<Position> positions = new ArrayList<Position>();
            positions.add((Position)position);
            MutablePair pair = MutablePair.of(txn, positions);
            this.txnMetaMap.put(txnID, pair);
            transactionMetadataEntry.recycle();
            return CompletableFuture.completedFuture(txnID);
        });
    }

    @Override
    public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnID, List<String> partitions) {
        if (!this.checkIfReady()) {
            return FutureUtil.failedFuture(new CoordinatorException.TransactionMetadataStoreStateException(this.tcID, TransactionMetadataStoreState.State.Ready, this.getState(), "add produced partition"));
        }
        return this.getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> {
            PulsarTransactionMetadata.TransactionMetadataEntry transactionMetadataEntry = PulsarTransactionMetadata.TransactionMetadataEntry.newBuilder().setTxnidMostBits(txnID.getMostSigBits()).setTxnidLeastBits(txnID.getLeastSigBits()).setMetadataOp(PulsarTransactionMetadata.TransactionMetadataEntry.TransactionMetadataOp.ADD_PARTITION).addAllPartitions(partitions).setLastModificationTime(System.currentTimeMillis()).build();
            return this.transactionLog.append(transactionMetadataEntry).thenCompose(position -> {
                try {
                    ((TxnMeta)txnMetaListPair.getLeft()).addProducedPartitions(partitions);
                    ((List)((Pair)this.txnMetaMap.get(txnID)).getRight()).add(position);
                    CompletableFuture<Object> completableFuture = CompletableFuture.completedFuture(null);
                    return completableFuture;
                }
                catch (CoordinatorException.InvalidTxnStatusException e) {
                    ((List)((Pair)this.txnMetaMap.get(txnID)).getRight()).add(position);
                    log.error("TxnID : " + ((TxnMeta)txnMetaListPair.getLeft()).id().toString() + " add produced partition error with TxnStatus : " + ((TxnMeta)txnMetaListPair.getLeft()).status().name(), (Throwable)e);
                    CompletableFuture completableFuture = FutureUtil.failedFuture(e);
                    return completableFuture;
                }
                finally {
                    transactionMetadataEntry.recycle();
                }
            });
        });
    }

    @Override
    public CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnID, List<TransactionSubscription> txnSubscriptions) {
        if (!this.checkIfReady()) {
            return FutureUtil.failedFuture(new CoordinatorException.TransactionMetadataStoreStateException(this.tcID, TransactionMetadataStoreState.State.Ready, this.getState(), "add acked partition"));
        }
        return this.getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> {
            PulsarTransactionMetadata.TransactionMetadataEntry transactionMetadataEntry = PulsarTransactionMetadata.TransactionMetadataEntry.newBuilder().setTxnidMostBits(txnID.getMostSigBits()).setTxnidLeastBits(txnID.getLeastSigBits()).setMetadataOp(PulsarTransactionMetadata.TransactionMetadataEntry.TransactionMetadataOp.ADD_SUBSCRIPTION).addAllSubscriptions(MLTransactionMetadataStore.txnSubscriptionToSubscription(txnSubscriptions)).setLastModificationTime(System.currentTimeMillis()).build();
            return this.transactionLog.append(transactionMetadataEntry).thenCompose(position -> {
                try {
                    ((TxnMeta)txnMetaListPair.getLeft()).addAckedPartitions(txnSubscriptions);
                    ((List)((Pair)this.txnMetaMap.get(txnID)).getRight()).add(position);
                    CompletableFuture<Object> completableFuture = CompletableFuture.completedFuture(null);
                    return completableFuture;
                }
                catch (CoordinatorException.InvalidTxnStatusException e) {
                    ((List)((Pair)this.txnMetaMap.get(txnID)).getRight()).add(position);
                    log.error("TxnID : " + ((TxnMeta)txnMetaListPair.getLeft()).id().toString() + " add acked subscription error with TxnStatus : " + ((TxnMeta)txnMetaListPair.getLeft()).status().name(), (Throwable)e);
                    CompletableFuture completableFuture = FutureUtil.failedFuture(e);
                    return completableFuture;
                }
                finally {
                    transactionMetadataEntry.recycle();
                }
            });
        });
    }

    @Override
    public CompletableFuture<Void> updateTxnStatus(TxnID txnID, PulsarTransactionMetadata.TxnStatus newStatus, PulsarTransactionMetadata.TxnStatus expectedStatus) {
        if (!this.checkIfReady()) {
            return FutureUtil.failedFuture(new CoordinatorException.TransactionMetadataStoreStateException(this.tcID, TransactionMetadataStoreState.State.Ready, this.getState(), "update transaction status"));
        }
        return this.getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> {
            PulsarTransactionMetadata.TransactionMetadataEntry transactionMetadataEntry = PulsarTransactionMetadata.TransactionMetadataEntry.newBuilder().setTxnidMostBits(txnID.getMostSigBits()).setTxnidLeastBits(txnID.getLeastSigBits()).setExpectedStatus(expectedStatus).setMetadataOp(PulsarTransactionMetadata.TransactionMetadataEntry.TransactionMetadataOp.UPDATE).setLastModificationTime(System.currentTimeMillis()).setNewStatus(newStatus).build();
            return this.transactionLog.append(transactionMetadataEntry).thenCompose(position -> {
                try {
                    ((TxnMeta)txnMetaListPair.getLeft()).updateTxnStatus(newStatus, expectedStatus);
                    ((List)txnMetaListPair.getRight()).add(position);
                    if (newStatus == PulsarTransactionMetadata.TxnStatus.COMMITTED || newStatus == PulsarTransactionMetadata.TxnStatus.ABORTED) {
                        CompletionStage completionStage = this.transactionLog.deletePosition((List)txnMetaListPair.getRight()).thenCompose(v -> {
                            this.txnMetaMap.remove(txnID);
                            ((TxnMetaImpl)txnMetaListPair.getLeft()).recycle();
                            return CompletableFuture.completedFuture(null);
                        });
                        return completionStage;
                    }
                    CompletableFuture<Object> completableFuture = CompletableFuture.completedFuture(null);
                    return completableFuture;
                }
                catch (CoordinatorException.InvalidTxnStatusException e) {
                    ((List)txnMetaListPair.getRight()).add(position);
                    log.error("TxnID : " + ((TxnMeta)txnMetaListPair.getLeft()).id().toString() + " add update txn status error with TxnStatus : " + ((TxnMeta)txnMetaListPair.getLeft()).status().name(), (Throwable)e);
                    CompletableFuture completableFuture = FutureUtil.failedFuture(e);
                    return completableFuture;
                }
                finally {
                    transactionMetadataEntry.recycle();
                }
            });
        });
    }

    @Override
    public TransactionCoordinatorID getTransactionCoordinatorID() {
        return this.tcID;
    }

    private CompletableFuture<Pair<TxnMeta, List<Position>>> getTxnPositionPair(TxnID txnID) {
        CompletableFuture<Pair<TxnMeta, List<Position>>> completableFuture = new CompletableFuture<Pair<TxnMeta, List<Position>>>();
        Pair txnMetaListPair = (Pair)this.txnMetaMap.get(txnID);
        if (txnMetaListPair == null) {
            completableFuture.completeExceptionally(new CoordinatorException.TransactionNotFoundException(txnID));
        } else {
            completableFuture.complete(txnMetaListPair);
        }
        return completableFuture;
    }

    @Override
    public CompletableFuture<Void> closeAsync() {
        return this.transactionLog.closeAsync().thenCompose(v -> {
            this.txnMetaMap.clear();
            if (!this.changeToCloseState()) {
                return FutureUtil.failedFuture(new IllegalStateException("Managed ledger transaction metadata store state to close error!"));
            }
            return CompletableFuture.completedFuture(null);
        });
    }

    public static List<PulsarApi.Subscription> txnSubscriptionToSubscription(List<TransactionSubscription> tnxSubscriptions) {
        ArrayList<PulsarApi.Subscription> subscriptions = new ArrayList<PulsarApi.Subscription>(tnxSubscriptions.size());
        for (TransactionSubscription transactionSubscription : tnxSubscriptions) {
            PulsarApi.Subscription.Builder subscriptionBuilder = PulsarApi.Subscription.newBuilder();
            PulsarApi.Subscription subscription = subscriptionBuilder.setSubscription(transactionSubscription.getSubscription()).setTopic(transactionSubscription.getTopic()).build();
            subscriptions.add(subscription);
            subscriptionBuilder.recycle();
        }
        return subscriptions;
    }

    public static List<TransactionSubscription> subscriptionToTxnSubscription(List<PulsarApi.Subscription> subscriptions) {
        ArrayList<TransactionSubscription> transactionSubscriptions = new ArrayList<TransactionSubscription>(subscriptions.size());
        for (PulsarApi.Subscription subscription : subscriptions) {
            TransactionSubscription.TransactionSubscriptionBuilder transactionSubscriptionBuilder = TransactionSubscription.builder();
            transactionSubscriptionBuilder.subscription(subscription.getSubscription());
            transactionSubscriptionBuilder.topic(subscription.getTopic());
            transactionSubscriptions.add(transactionSubscriptionBuilder.build());
            subscription.recycle();
        }
        return transactionSubscriptions;
    }
}

