package org.joyqueue.broker.kafka.handler;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.joyqueue.broker.kafka.KafkaCommandType;
import org.joyqueue.broker.kafka.KafkaContext;
import org.joyqueue.broker.kafka.KafkaContextAware;
import org.joyqueue.broker.kafka.KafkaErrorCode;
import org.joyqueue.broker.kafka.command.FindCoordinatorRequest;
import org.joyqueue.broker.kafka.command.FindCoordinatorResponse;
import org.joyqueue.broker.kafka.coordinator.CoordinatorType;
import org.joyqueue.broker.kafka.coordinator.group.GroupCoordinator;
import org.joyqueue.broker.kafka.coordinator.transaction.TransactionCoordinator;
import org.joyqueue.broker.kafka.helper.KafkaClientHelper;
import org.joyqueue.broker.kafka.model.KafkaBroker;
import org.joyqueue.domain.Broker;
import org.joyqueue.domain.Subscription;
import org.joyqueue.network.transport.Transport;
import org.joyqueue.network.transport.command.Command;
import org.joyqueue.nsr.NameService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/kafka/handler/FindCoordinatorRequestHandler.class */
public class FindCoordinatorRequestHandler extends AbstractKafkaCommandHandler implements KafkaContextAware {
    protected static final Logger logger = LoggerFactory.getLogger(FindCoordinatorRequestHandler.class);
    private GroupCoordinator groupCoordinator;
    private TransactionCoordinator transactionCoordinator;
    private NameService nameService;

    @Override // org.joyqueue.broker.kafka.KafkaContextAware
    public void setKafkaContext(KafkaContext kafkaContext) {
        this.groupCoordinator = kafkaContext.getGroupCoordinator();
        this.transactionCoordinator = kafkaContext.getTransactionCoordinator();
        this.nameService = kafkaContext.getBrokerContext().getNameService();
    }

    public Command handle(Transport transport, Command command) {
        FindCoordinatorRequest findCoordinatorRequest = (FindCoordinatorRequest) command.getPayload();
        CoordinatorType coordinatorType = (CoordinatorType) ObjectUtils.defaultIfNull(findCoordinatorRequest.getCoordinatorType(), CoordinatorType.GROUP);
        String clientId = findCoordinatorRequest.getClientId();
        if (coordinatorType.equals(CoordinatorType.TRANSACTION)) {
            clientId = findCoordinatorRequest.getClientId();
        }
        String parseClient = KafkaClientHelper.parseClient(clientId);
        if (StringUtils.isBlank(parseClient)) {
            logger.warn("coordinatorKey error, coordinatorKey: {}, transport: {}", parseClient, transport);
            return new Command(new FindCoordinatorResponse(KafkaErrorCode.INVALID_GROUP_ID.getCode(), KafkaBroker.INVALID));
        }
        Broker broker = null;
        if (coordinatorType.equals(CoordinatorType.GROUP)) {
            if (!this.nameService.hasSubscribe(parseClient, Subscription.Type.CONSUMPTION)) {
                logger.warn("find subscribe for coordinatorKey {}, subscribe not exist, {}", parseClient, transport);
                return new Command(new FindCoordinatorResponse(KafkaErrorCode.GROUP_AUTHORIZATION_FAILED.getCode(), KafkaBroker.INVALID));
            }
            broker = this.groupCoordinator.findCoordinator(parseClient);
        } else if (coordinatorType.equals(CoordinatorType.TRANSACTION)) {
            if (!this.nameService.hasSubscribe(parseClient, Subscription.Type.PRODUCTION)) {
                logger.warn("find subscribe for coordinatorKey {}, subscribe not exist, {}", parseClient, transport);
                return new Command(new FindCoordinatorResponse(KafkaErrorCode.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.getCode(), KafkaBroker.INVALID));
            }
            broker = this.transactionCoordinator.findCoordinator(parseClient);
        }
        if (broker == null) {
            logger.error("find coordinator for coordinatorKey {}, coordinator is null, {}", parseClient, transport);
            return new Command(new FindCoordinatorResponse(KafkaErrorCode.COORDINATOR_NOT_AVAILABLE.getCode(), KafkaBroker.INVALID));
        }
        if (logger.isDebugEnabled()) {
            logger.debug("find coordinator for coordinatorKey {}, broker: {id: {}, ip: {}, port: {}}", new Object[]{parseClient, broker.getId(), broker.getIp(), Integer.valueOf(broker.getPort())});
        }
        return new Command(new FindCoordinatorResponse(KafkaErrorCode.NONE.getCode(), new KafkaBroker(broker.getId().intValue(), broker.getIp(), broker.getPort())));
    }

    public int type() {
        return KafkaCommandType.FIND_COORDINATOR.getCode();
    }
}
