package org.apache.kafka.clients.consumer.internals;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.MembershipManager;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;

/* loaded from: input_file:kafka-clients-3.9.0.jar:org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.class */
public class HeartbeatRequestManager implements RequestManager {
    private final Logger logger;
    private final int maxPollIntervalMs;
    private final CoordinatorRequestManager coordinatorRequestManager;
    private final HeartbeatRequestState heartbeatRequestState;
    private final HeartbeatState heartbeatState;
    private final MembershipManager membershipManager;
    private final BackgroundEventHandler backgroundEventHandler;
    private final Timer pollTimer;
    private final HeartbeatMetricsManager metricsManager;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:kafka-clients-3.9.0.jar:org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager$HeartbeatRequestState.class */
    public static class HeartbeatRequestState extends RequestState {
        private final Timer heartbeatTimer;
        private long heartbeatIntervalMs;

        public HeartbeatRequestState(LogContext logContext, Time time, long j, long j2, long j3, double d) {
            super(logContext, HeartbeatRequestState.class.getName(), j2, 2, j3, d);
            this.heartbeatIntervalMs = j;
            this.heartbeatTimer = time.timer(j);
        }

        private void update(long j) {
            this.heartbeatTimer.update(j);
        }

        public void resetTimer() {
            this.heartbeatTimer.reset(this.heartbeatIntervalMs);
        }

        @Override // org.apache.kafka.clients.consumer.internals.RequestState
        public String toStringBase() {
            return super.toStringBase() + ", remainingMs=" + this.heartbeatTimer.remainingMs() + ", heartbeatIntervalMs=" + this.heartbeatIntervalMs;
        }

        @Override // org.apache.kafka.clients.consumer.internals.RequestState
        public boolean canSendRequest(long j) {
            update(j);
            return this.heartbeatTimer.isExpired() && super.canSendRequest(j);
        }

        public long timeToNextHeartbeatMs(long j) {
            return this.heartbeatTimer.isExpired() ? remainingBackoffMs(j) : this.heartbeatTimer.remainingMs();
        }

        @Override // org.apache.kafka.clients.consumer.internals.RequestState
        public void onFailedAttempt(long j) {
            this.heartbeatTimer.reset(0L);
            super.onFailedAttempt(j);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void updateHeartbeatIntervalMs(long j) {
            if (this.heartbeatIntervalMs == j) {
                return;
            }
            this.heartbeatIntervalMs = j;
            this.heartbeatTimer.updateAndReset(j);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:kafka-clients-3.9.0.jar:org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager$HeartbeatState.class */
    public static class HeartbeatState {
        private final SubscriptionState subscriptions;
        private final MembershipManager membershipManager;
        private final int rebalanceTimeoutMs;
        private final SentFields sentFields = new SentFields();

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:kafka-clients-3.9.0.jar:org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager$HeartbeatState$SentFields.class */
        public static class SentFields {
            private int rebalanceTimeoutMs = -1;
            private TreeSet<String> subscribedTopicNames = null;
            private String serverAssignor = null;
            private MembershipManager.LocalAssignment localAssignment = null;

            SentFields() {
            }

            void reset() {
                this.subscribedTopicNames = null;
                this.rebalanceTimeoutMs = -1;
                this.serverAssignor = null;
                this.localAssignment = null;
            }
        }

        public HeartbeatState(SubscriptionState subscriptionState, MembershipManager membershipManager, int i) {
            this.subscriptions = subscriptionState;
            this.membershipManager = membershipManager;
            this.rebalanceTimeoutMs = i;
        }

        public void reset() {
            this.sentFields.reset();
        }

        public ConsumerGroupHeartbeatRequestData buildRequestData() {
            ConsumerGroupHeartbeatRequestData consumerGroupHeartbeatRequestData = new ConsumerGroupHeartbeatRequestData();
            consumerGroupHeartbeatRequestData.setGroupId(this.membershipManager.groupId());
            consumerGroupHeartbeatRequestData.setMemberId(this.membershipManager.memberId());
            consumerGroupHeartbeatRequestData.setMemberEpoch(this.membershipManager.memberEpoch());
            Optional<String> groupInstanceId = this.membershipManager.groupInstanceId();
            consumerGroupHeartbeatRequestData.getClass();
            groupInstanceId.ifPresent(consumerGroupHeartbeatRequestData::setInstanceId);
            boolean z = this.membershipManager.state() == MemberState.JOINING;
            if (z || this.sentFields.rebalanceTimeoutMs != this.rebalanceTimeoutMs) {
                consumerGroupHeartbeatRequestData.setRebalanceTimeoutMs(this.rebalanceTimeoutMs);
                this.sentFields.rebalanceTimeoutMs = this.rebalanceTimeoutMs;
            }
            TreeSet treeSet = new TreeSet(this.subscriptions.subscription());
            if (z || !treeSet.equals(this.sentFields.subscribedTopicNames)) {
                consumerGroupHeartbeatRequestData.setSubscribedTopicNames(new ArrayList(this.subscriptions.subscription()));
                this.sentFields.subscribedTopicNames = treeSet;
            }
            this.membershipManager.serverAssignor().ifPresent(str -> {
                if (z || !str.equals(this.sentFields.serverAssignor)) {
                    consumerGroupHeartbeatRequestData.setServerAssignor(str);
                    this.sentFields.serverAssignor = str;
                }
            });
            MembershipManager.LocalAssignment currentAssignment = this.membershipManager.currentAssignment();
            if (z || !currentAssignment.equals(this.sentFields.localAssignment)) {
                consumerGroupHeartbeatRequestData.setTopicPartitions(buildTopicPartitionsList(currentAssignment.partitions));
                this.sentFields.localAssignment = currentAssignment;
            }
            return consumerGroupHeartbeatRequestData;
        }

        private List<ConsumerGroupHeartbeatRequestData.TopicPartitions> buildTopicPartitionsList(Map<Uuid, SortedSet<Integer>> map) {
            return (List) map.entrySet().stream().map(entry -> {
                return new ConsumerGroupHeartbeatRequestData.TopicPartitions().setTopicId((Uuid) entry.getKey()).setPartitions(new ArrayList((Collection) entry.getValue()));
            }).collect(Collectors.toList());
        }
    }

    public HeartbeatRequestManager(LogContext logContext, Time time, ConsumerConfig consumerConfig, CoordinatorRequestManager coordinatorRequestManager, SubscriptionState subscriptionState, MembershipManager membershipManager, BackgroundEventHandler backgroundEventHandler, Metrics metrics) {
        this.coordinatorRequestManager = coordinatorRequestManager;
        this.logger = logContext.logger(getClass());
        this.membershipManager = membershipManager;
        this.backgroundEventHandler = backgroundEventHandler;
        this.maxPollIntervalMs = consumerConfig.getInt("max.poll.interval.ms").intValue();
        long longValue = consumerConfig.getLong("retry.backoff.ms").longValue();
        long longValue2 = consumerConfig.getLong("retry.backoff.max.ms").longValue();
        this.heartbeatState = new HeartbeatState(subscriptionState, membershipManager, this.maxPollIntervalMs);
        this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0L, longValue, longValue2, this.maxPollIntervalMs);
        this.pollTimer = time.timer(this.maxPollIntervalMs);
        this.metricsManager = new HeartbeatMetricsManager(metrics);
    }

    HeartbeatRequestManager(LogContext logContext, Timer timer, ConsumerConfig consumerConfig, CoordinatorRequestManager coordinatorRequestManager, MembershipManager membershipManager, HeartbeatState heartbeatState, HeartbeatRequestState heartbeatRequestState, BackgroundEventHandler backgroundEventHandler, Metrics metrics) {
        this.logger = logContext.logger(getClass());
        this.maxPollIntervalMs = consumerConfig.getInt("max.poll.interval.ms").intValue();
        this.coordinatorRequestManager = coordinatorRequestManager;
        this.heartbeatRequestState = heartbeatRequestState;
        this.heartbeatState = heartbeatState;
        this.membershipManager = membershipManager;
        this.backgroundEventHandler = backgroundEventHandler;
        this.pollTimer = timer;
        this.metricsManager = new HeartbeatMetricsManager(metrics);
    }

    @Override // org.apache.kafka.clients.consumer.internals.RequestManager
    public NetworkClientDelegate.PollResult poll(long j) {
        if (!this.coordinatorRequestManager.coordinator().isPresent() || this.membershipManager.shouldSkipHeartbeat()) {
            this.membershipManager.onHeartbeatRequestSkipped();
            return NetworkClientDelegate.PollResult.EMPTY;
        }
        this.pollTimer.update(j);
        if (!this.pollTimer.isExpired() || this.membershipManager.isLeavingGroup()) {
            boolean z = this.membershipManager.state() == MemberState.LEAVING || (this.membershipManager.shouldHeartbeatNow() && !this.heartbeatRequestState.requestInFlight());
            if (this.heartbeatRequestState.canSendRequest(j) || z) {
                return new NetworkClientDelegate.PollResult(this.heartbeatRequestState.heartbeatIntervalMs, Collections.singletonList(makeHeartbeatRequest(j, false)));
            }
            return new NetworkClientDelegate.PollResult(this.heartbeatRequestState.timeToNextHeartbeatMs(j));
        }
        this.logger.warn("Consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.");
        this.membershipManager.transitionToSendingLeaveGroup(true);
        NetworkClientDelegate.UnsentRequest makeHeartbeatRequest = makeHeartbeatRequest(j, true);
        this.heartbeatRequestState.reset();
        this.heartbeatState.reset();
        return new NetworkClientDelegate.PollResult(this.heartbeatRequestState.heartbeatIntervalMs, Collections.singletonList(makeHeartbeatRequest));
    }

    public MembershipManager membershipManager() {
        return this.membershipManager;
    }

    @Override // org.apache.kafka.clients.consumer.internals.RequestManager
    public long maximumTimeToWait(long j) {
        this.pollTimer.update(j);
        if (this.pollTimer.isExpired()) {
            return 0L;
        }
        if (!this.membershipManager.shouldHeartbeatNow() || this.heartbeatRequestState.requestInFlight()) {
            return Math.min(this.pollTimer.remainingMs() / 2, this.heartbeatRequestState.timeToNextHeartbeatMs(j));
        }
        return 0L;
    }

    public void resetPollTimer(long j) {
        this.pollTimer.update(j);
        if (this.pollTimer.isExpired()) {
            this.logger.warn("Time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, exceeded approximately by {} ms. Member {} will rejoin the group now.", Long.valueOf(this.pollTimer.isExpiredBy()), this.membershipManager.memberId());
            this.membershipManager.maybeRejoinStaleMember();
        }
        this.pollTimer.reset(this.maxPollIntervalMs);
    }

    private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(long j, boolean z) {
        NetworkClientDelegate.UnsentRequest makeHeartbeatRequest = makeHeartbeatRequest(z);
        this.heartbeatRequestState.onSendAttempt(j);
        this.membershipManager.onHeartbeatRequestGenerated();
        this.metricsManager.recordHeartbeatSentMs(j);
        this.heartbeatRequestState.resetTimer();
        return makeHeartbeatRequest;
    }

    private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(boolean z) {
        NetworkClientDelegate.UnsentRequest unsentRequest = new NetworkClientDelegate.UnsentRequest(new ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()), this.coordinatorRequestManager.coordinator());
        return z ? logResponse(unsentRequest) : unsentRequest.whenComplete((clientResponse, th) -> {
            long completionTimeMs = unsentRequest.handler().completionTimeMs();
            if (clientResponse == null) {
                onFailure(th, completionTimeMs);
            } else {
                this.metricsManager.recordRequestLatency(clientResponse.requestLatencyMs());
                onResponse((ConsumerGroupHeartbeatResponse) clientResponse.responseBody(), completionTimeMs);
            }
        });
    }

    private NetworkClientDelegate.UnsentRequest logResponse(NetworkClientDelegate.UnsentRequest unsentRequest) {
        return unsentRequest.whenComplete((clientResponse, th) -> {
            if (clientResponse == null) {
                this.logger.error("GroupHeartbeat failed because of unexpected exception.", th);
                return;
            }
            this.metricsManager.recordRequestLatency(clientResponse.requestLatencyMs());
            Errors forCode = Errors.forCode(((ConsumerGroupHeartbeatResponse) clientResponse.responseBody()).data().errorCode());
            if (forCode == Errors.NONE) {
                this.logger.debug("GroupHeartbeat responded successfully: {}", clientResponse);
            } else {
                this.logger.error("GroupHeartbeat failed because of {}: {}", forCode, clientResponse);
            }
        });
    }

    private void onFailure(Throwable th, long j) {
        this.heartbeatRequestState.onFailedAttempt(j);
        this.heartbeatState.reset();
        this.membershipManager.onHeartbeatFailure(th instanceof RetriableException);
        if (th instanceof RetriableException) {
            this.logger.debug(String.format("GroupHeartbeatRequest failed because of the retriable exception. Will retry in %s ms: %s", Long.valueOf(this.heartbeatRequestState.remainingBackoffMs(j)), th.getMessage()));
        } else {
            this.logger.error("GroupHeartbeatRequest failed due to fatal error: " + th.getMessage());
            handleFatalFailure(th);
        }
    }

    private void onResponse(ConsumerGroupHeartbeatResponse consumerGroupHeartbeatResponse, long j) {
        if (Errors.forCode(consumerGroupHeartbeatResponse.data().errorCode()) != Errors.NONE) {
            onErrorResponse(consumerGroupHeartbeatResponse, j);
            return;
        }
        this.heartbeatRequestState.updateHeartbeatIntervalMs(consumerGroupHeartbeatResponse.data().heartbeatIntervalMs());
        this.heartbeatRequestState.onSuccessfulAttempt(j);
        this.membershipManager.onHeartbeatSuccess(consumerGroupHeartbeatResponse.data());
    }

    private void onErrorResponse(ConsumerGroupHeartbeatResponse consumerGroupHeartbeatResponse, long j) {
        Errors forCode = Errors.forCode(consumerGroupHeartbeatResponse.data().errorCode());
        String errorMessage = consumerGroupHeartbeatResponse.data().errorMessage();
        this.heartbeatState.reset();
        this.heartbeatRequestState.onFailedAttempt(j);
        this.membershipManager.onHeartbeatFailure(false);
        switch (forCode) {
            case NOT_COORDINATOR:
                logInfo(String.format("GroupHeartbeatRequest failed because the group coordinator %s is incorrect. Will attempt to find the coordinator again and retry", this.coordinatorRequestManager.coordinator()), consumerGroupHeartbeatResponse, j);
                this.coordinatorRequestManager.markCoordinatorUnknown(errorMessage, j);
                this.heartbeatRequestState.reset();
                return;
            case COORDINATOR_NOT_AVAILABLE:
                logInfo(String.format("GroupHeartbeatRequest failed because the group coordinator %s is not available. Will attempt to find the coordinator again and retry", this.coordinatorRequestManager.coordinator()), consumerGroupHeartbeatResponse, j);
                this.coordinatorRequestManager.markCoordinatorUnknown(errorMessage, j);
                this.heartbeatRequestState.reset();
                return;
            case COORDINATOR_LOAD_IN_PROGRESS:
                logInfo(String.format("GroupHeartbeatRequest failed because the group coordinator %s is still loading.Will retry", this.coordinatorRequestManager.coordinator()), consumerGroupHeartbeatResponse, j);
                return;
            case GROUP_AUTHORIZATION_FAILED:
                GroupAuthorizationException forGroupId = GroupAuthorizationException.forGroupId(this.membershipManager.groupId());
                this.logger.error("GroupHeartbeatRequest failed due to group authorization failure: {}", forGroupId.getMessage());
                handleFatalFailure(forCode.exception(forGroupId.getMessage()));
                return;
            case UNRELEASED_INSTANCE_ID:
                this.logger.error("GroupHeartbeatRequest failed due to unreleased instance id {}: {}", this.membershipManager.groupInstanceId().orElse("null"), errorMessage);
                handleFatalFailure(forCode.exception(errorMessage));
                return;
            case FENCED_INSTANCE_ID:
                this.logger.error("GroupHeartbeatRequest failed due to fenced instance id {}: {}. This is expected in the case that the member was removed from the group by an admin client, and another member joined using the same group instance id.", this.membershipManager.groupInstanceId().orElse("null"), errorMessage);
                handleFatalFailure(forCode.exception(errorMessage));
                return;
            case INVALID_REQUEST:
            case GROUP_MAX_SIZE_REACHED:
            case UNSUPPORTED_ASSIGNOR:
            case UNSUPPORTED_VERSION:
                this.logger.error("GroupHeartbeatRequest failed due to {}: {}", forCode, errorMessage);
                handleFatalFailure(forCode.exception(errorMessage));
                return;
            case FENCED_MEMBER_EPOCH:
                logInfo(String.format("GroupHeartbeatRequest failed for member %s because epoch %s is fenced.", this.membershipManager.memberId(), Integer.valueOf(this.membershipManager.memberEpoch())), consumerGroupHeartbeatResponse, j);
                this.membershipManager.transitionToFenced();
                this.heartbeatRequestState.reset();
                return;
            case UNKNOWN_MEMBER_ID:
                logInfo(String.format("GroupHeartbeatRequest failed because member %s is unknown.", this.membershipManager.memberId()), consumerGroupHeartbeatResponse, j);
                this.membershipManager.transitionToFenced();
                this.heartbeatRequestState.reset();
                return;
            default:
                this.logger.error("GroupHeartbeatRequest failed due to unexpected error {}: {}", forCode, errorMessage);
                handleFatalFailure(forCode.exception(errorMessage));
                return;
        }
    }

    private void logInfo(String str, ConsumerGroupHeartbeatResponse consumerGroupHeartbeatResponse, long j) {
        this.logger.info("{} in {}ms: {}", str, Long.valueOf(this.heartbeatRequestState.remainingBackoffMs(j)), consumerGroupHeartbeatResponse.data().errorMessage());
    }

    private void handleFatalFailure(Throwable th) {
        this.backgroundEventHandler.add(new ErrorEvent(th));
        this.membershipManager.transitionToFatal();
    }
}
