package io.camunda.zeebe.gateway.impl.broker;

import io.camunda.zeebe.gateway.Loggers;
import io.camunda.zeebe.gateway.cmd.BrokerErrorException;
import io.camunda.zeebe.gateway.cmd.NoTopologyAvailableException;
import io.camunda.zeebe.gateway.impl.broker.cluster.BrokerClusterState;
import io.camunda.zeebe.gateway.impl.broker.cluster.BrokerTopologyManager;
import io.camunda.zeebe.gateway.impl.broker.request.BrokerRequest;
import io.camunda.zeebe.gateway.impl.broker.response.BrokerResponse;
import io.camunda.zeebe.protocol.record.ErrorCode;
import java.net.ConnectException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;

/* loaded from: input_file:io/camunda/zeebe/gateway/impl/broker/RequestRetryHandler.class */
public final class RequestRetryHandler {
    private final BrokerClient brokerClient;
    private final RequestDispatchStrategy roundRobinDispatchStrategy;
    private final BrokerTopologyManager topologyManager;

    public RequestRetryHandler(BrokerClient brokerClient, BrokerTopologyManager brokerTopologyManager) {
        this.brokerClient = brokerClient;
        this.roundRobinDispatchStrategy = new RoundRobinDispatchStrategy(brokerTopologyManager);
        this.topologyManager = brokerTopologyManager;
    }

    public <BrokerResponseT> void sendRequest(BrokerRequest<BrokerResponseT> brokerRequest, BrokerResponseConsumer<BrokerResponseT> brokerResponseConsumer, Consumer<Throwable> consumer) {
        BrokerClient brokerClient = this.brokerClient;
        Objects.requireNonNull(brokerClient);
        sendRequestInternal(brokerRequest, brokerClient::sendRequest, brokerResponseConsumer, consumer);
    }

    public <BrokerResponseT> void sendRequest(BrokerRequest<BrokerResponseT> brokerRequest, BrokerResponseConsumer<BrokerResponseT> brokerResponseConsumer, Consumer<Throwable> consumer, Duration duration) {
        sendRequestInternal(brokerRequest, brokerRequest2 -> {
            return this.brokerClient.sendRequest(brokerRequest2, duration);
        }, brokerResponseConsumer, consumer);
    }

    private <BrokerResponseT> void sendRequestInternal(BrokerRequest<BrokerResponseT> brokerRequest, Function<BrokerRequest<BrokerResponseT>, CompletableFuture<BrokerResponse<BrokerResponseT>>> function, BrokerResponseConsumer<BrokerResponseT> brokerResponseConsumer, Consumer<Throwable> consumer) {
        BrokerClusterState topology = this.topologyManager.getTopology();
        if (topology == null || topology.getPartitionsCount() == 0) {
            consumer.accept(new NoTopologyAvailableException());
        } else {
            sendRequestWithRetry(brokerRequest, function, partitionIdIteratorForType(topology.getPartitionsCount()), brokerResponseConsumer, consumer, new ArrayList());
        }
    }

    private <BrokerResponseT> void sendRequestWithRetry(BrokerRequest<BrokerResponseT> brokerRequest, Function<BrokerRequest<BrokerResponseT>, CompletableFuture<BrokerResponse<BrokerResponseT>>> function, PartitionIdIterator partitionIdIterator, BrokerResponseConsumer<BrokerResponseT> brokerResponseConsumer, Consumer<Throwable> consumer, Collection<Throwable> collection) {
        if (partitionIdIterator.hasNext()) {
            brokerRequest.setPartitionId(partitionIdIterator.next().intValue());
            function.apply(brokerRequest).whenComplete((brokerResponse, th) -> {
                if (th == null) {
                    brokerResponseConsumer.accept(brokerResponse.getKey(), brokerResponse.getResponse());
                } else {
                    if (!shouldRetryWithNextPartition(th)) {
                        consumer.accept(th);
                        return;
                    }
                    Loggers.GATEWAY_LOGGER.trace("Failed to create process on partition {}", Integer.valueOf(partitionIdIterator.getCurrentPartitionId()), th);
                    collection.add(th);
                    sendRequestWithRetry(brokerRequest, function, partitionIdIterator, brokerResponseConsumer, consumer, collection);
                }
            });
        } else {
            RequestRetriesExhaustedException requestRetriesExhaustedException = new RequestRetriesExhaustedException();
            Objects.requireNonNull(requestRetriesExhaustedException);
            collection.forEach(requestRetriesExhaustedException::addSuppressed);
            consumer.accept(requestRetriesExhaustedException);
        }
    }

    private boolean shouldRetryWithNextPartition(Throwable th) {
        if (th instanceof ConnectException) {
            return true;
        }
        if (!(th instanceof BrokerErrorException)) {
            return false;
        }
        ErrorCode code = ((BrokerErrorException) th).getError().getCode();
        return code == ErrorCode.PARTITION_LEADER_MISMATCH || code == ErrorCode.RESOURCE_EXHAUSTED;
    }

    private PartitionIdIterator partitionIdIteratorForType(int i) {
        return new PartitionIdIterator(this.roundRobinDispatchStrategy.determinePartition(), i, this.topologyManager);
    }
}
