package org.joyqueue.broker.kafka.coordinator.transaction.synchronizer;

import com.google.common.collect.Lists;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.joyqueue.broker.kafka.config.KafkaConfig;
import org.joyqueue.broker.kafka.coordinator.transaction.TransactionIdManager;
import org.joyqueue.broker.kafka.coordinator.transaction.domain.TransactionMetadata;
import org.joyqueue.broker.kafka.coordinator.transaction.domain.TransactionPrepare;
import org.joyqueue.broker.kafka.coordinator.transaction.helper.TransactionHelper;
import org.joyqueue.broker.producer.transaction.command.TransactionRollbackRequest;
import org.joyqueue.domain.Broker;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.network.transport.command.Command;
import org.joyqueue.network.transport.command.CommandCallback;
import org.joyqueue.network.transport.command.JoyQueueCommand;
import org.joyqueue.network.transport.session.session.TransportSession;
import org.joyqueue.network.transport.session.session.TransportSessionManager;
import org.joyqueue.toolkit.service.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/kafka/coordinator/transaction/synchronizer/TransactionAbortSynchronizer.class */
public class TransactionAbortSynchronizer extends Service {
    protected static final Logger logger = LoggerFactory.getLogger(TransactionAbortSynchronizer.class);
    private KafkaConfig config;
    private TransportSessionManager sessionManager;
    private TransactionIdManager transactionIdManager;

    public TransactionAbortSynchronizer(KafkaConfig kafkaConfig, TransportSessionManager transportSessionManager, TransactionIdManager transactionIdManager) {
        this.config = kafkaConfig;
        this.sessionManager = transportSessionManager;
        this.transactionIdManager = transactionIdManager;
    }

    public boolean abort(TransactionMetadata transactionMetadata, Set<TransactionPrepare> set) throws Exception {
        Map<Broker, List<TransactionPrepare>> splitPrepareByBroker = TransactionHelper.splitPrepareByBroker(set);
        final CountDownLatch countDownLatch = new CountDownLatch(splitPrepareByBroker.size());
        final boolean[] zArr = {true};
        for (Map.Entry<Broker, List<TransactionPrepare>> entry : splitPrepareByBroker.entrySet()) {
            final Broker key = entry.getKey();
            List<TransactionPrepare> value = entry.getValue();
            TransactionPrepare transactionPrepare = value.get(0);
            LinkedList newLinkedList = Lists.newLinkedList();
            for (TransactionPrepare transactionPrepare2 : value) {
                newLinkedList.add(this.transactionIdManager.generateId(transactionPrepare2.getTopic(), transactionPrepare2.getPartition(), transactionPrepare2.getApp(), transactionPrepare2.getTransactionId(), transactionPrepare2.getProducerId(), transactionPrepare2.getProducerEpoch()));
            }
            TransportSession orCreateSession = this.sessionManager.getOrCreateSession(key);
            final TransactionRollbackRequest transactionRollbackRequest = new TransactionRollbackRequest(transactionPrepare.getTopic(), transactionPrepare.getApp(), newLinkedList);
            orCreateSession.async(new JoyQueueCommand(transactionRollbackRequest), this.config.getTransactionSyncTimeout(), new CommandCallback() { // from class: org.joyqueue.broker.kafka.coordinator.transaction.synchronizer.TransactionAbortSynchronizer.1
                public void onSuccess(Command command, Command command2) {
                    if (command2.getHeader().getStatus() != JoyQueueCode.SUCCESS.getCode() && command2.getHeader().getStatus() != JoyQueueCode.CN_TRANSACTION_NOT_EXISTS.getCode()) {
                        TransactionAbortSynchronizer.logger.error("abort transaction error, broker: {}, request: {}", key, transactionRollbackRequest);
                        zArr[0] = false;
                    }
                    countDownLatch.countDown();
                }

                public void onException(Command command, Throwable th) {
                    TransactionAbortSynchronizer.logger.error("abort transaction error, broker: {}, request: {}", new Object[]{key, transactionRollbackRequest, th});
                    zArr[0] = false;
                    countDownLatch.countDown();
                }
            });
        }
        if (countDownLatch.await(this.config.getTransactionSyncTimeout(), TimeUnit.MILLISECONDS)) {
            return zArr[0];
        }
        logger.error("abort transaction timeout, metadata: {}", set);
        return false;
    }
}
