package io.camunda.zeebe.gateway.impl.broker;

import io.camunda.zeebe.gateway.cmd.BrokerErrorException;
import io.camunda.zeebe.gateway.cmd.BrokerRejectionException;
import io.camunda.zeebe.gateway.cmd.BrokerResponseException;
import io.camunda.zeebe.gateway.cmd.ClientResponseException;
import io.camunda.zeebe.gateway.cmd.IllegalBrokerResponseException;
import io.camunda.zeebe.gateway.cmd.NoTopologyAvailableException;
import io.camunda.zeebe.gateway.cmd.PartitionNotFoundException;
import io.camunda.zeebe.gateway.impl.ErrorResponseHandler;
import io.camunda.zeebe.gateway.impl.broker.cluster.BrokerClusterState;
import io.camunda.zeebe.gateway.impl.broker.cluster.BrokerTopologyManagerImpl;
import io.camunda.zeebe.gateway.impl.broker.request.BrokerPublishMessageRequest;
import io.camunda.zeebe.gateway.impl.broker.request.BrokerRequest;
import io.camunda.zeebe.gateway.impl.broker.response.BrokerResponse;
import io.camunda.zeebe.gateway.metrics.GatewayMetrics;
import io.camunda.zeebe.protocol.impl.SubscriptionUtil;
import io.camunda.zeebe.protocol.record.ErrorCode;
import io.camunda.zeebe.protocol.record.MessageHeaderDecoder;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.transport.ClientRequest;
import io.camunda.zeebe.transport.ClientTransport;
import io.camunda.zeebe.util.buffer.BufferUtil;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import java.util.function.ToIntFunction;
import org.agrona.DirectBuffer;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/camunda/zeebe/gateway/impl/broker/BrokerRequestManager.class */
public final class BrokerRequestManager extends Actor {
    private static final TransportRequestSender SENDER_WITH_RETRY = (clientTransport, supplier, clientRequest, duration) -> {
        return clientTransport.sendRequestWithRetry(supplier, BrokerRequestManager::responseValidation, clientRequest, duration);
    };
    private static final TransportRequestSender SENDER_WITHOUT_RETRY = (v0, v1, v2, v3) -> {
        return v0.sendRequest(v1, v2, v3);
    };
    private final ClientTransport clientTransport;
    private final RequestDispatchStrategy dispatchStrategy;
    private final BrokerTopologyManagerImpl topologyManager;
    private final Duration requestTimeout;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/camunda/zeebe/gateway/impl/broker/BrokerRequestManager$BrokerAddressProvider.class */
    public class BrokerAddressProvider implements Supplier<String> {
        private final ToIntFunction<BrokerClusterState> nodeIdSelector;

        BrokerAddressProvider(BrokerRequestManager brokerRequestManager) {
            this((ToIntFunction<BrokerClusterState>) (v0) -> {
                return v0.getRandomBroker();
            });
        }

        BrokerAddressProvider(BrokerRequestManager brokerRequestManager, int i) {
            this((ToIntFunction<BrokerClusterState>) brokerClusterState -> {
                return brokerClusterState.getLeaderForPartition(i);
            });
        }

        BrokerAddressProvider(ToIntFunction<BrokerClusterState> toIntFunction) {
            this.nodeIdSelector = toIntFunction;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.function.Supplier
        public String get() {
            BrokerClusterState topology = BrokerRequestManager.this.topologyManager.getTopology();
            if (topology != null) {
                return topology.getBrokerAddress(this.nodeIdSelector.applyAsInt(topology));
            }
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/camunda/zeebe/gateway/impl/broker/BrokerRequestManager$RequestResult.class */
    public static class RequestResult {
        private final boolean processed;
        private final ErrorCode errorCode;

        RequestResult(boolean z, ErrorCode errorCode) {
            this.processed = z;
            this.errorCode = errorCode;
        }

        boolean wasProcessed() {
            return this.processed;
        }

        public ErrorCode getErrorCode() {
            return this.errorCode;
        }

        static RequestResult processed() {
            return new RequestResult(true, null);
        }

        static RequestResult failed(ErrorCode errorCode) {
            return new RequestResult(false, errorCode);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/camunda/zeebe/gateway/impl/broker/BrokerRequestManager$TransportRequestSender.class */
    public interface TransportRequestSender {
        ActorFuture<DirectBuffer> send(ClientTransport clientTransport, Supplier<String> supplier, ClientRequest clientRequest, Duration duration);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BrokerRequestManager(ClientTransport clientTransport, BrokerTopologyManagerImpl brokerTopologyManagerImpl, RequestDispatchStrategy requestDispatchStrategy, Duration duration) {
        this.clientTransport = clientTransport;
        this.dispatchStrategy = requestDispatchStrategy;
        this.topologyManager = brokerTopologyManagerImpl;
        this.requestTimeout = duration;
    }

    private static boolean responseValidation(DirectBuffer directBuffer) {
        ErrorResponseHandler errorResponseHandler = new ErrorResponseHandler();
        MessageHeaderDecoder messageHeaderDecoder = new MessageHeaderDecoder();
        messageHeaderDecoder.wrap(directBuffer, 0);
        if (!errorResponseHandler.handlesResponse(messageHeaderDecoder)) {
            return true;
        }
        errorResponseHandler.wrap(directBuffer, messageHeaderDecoder.encodedLength(), messageHeaderDecoder.blockLength(), messageHeaderDecoder.version());
        return errorResponseHandler.getErrorCode() != ErrorCode.PARTITION_LEADER_MISMATCH;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T> CompletableFuture<BrokerResponse<T>> sendRequestWithRetry(BrokerRequest<T> brokerRequest) {
        return sendRequestWithRetry(brokerRequest, this.requestTimeout);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T> CompletableFuture<BrokerResponse<T>> sendRequest(BrokerRequest<T> brokerRequest) {
        return sendRequest(brokerRequest, this.requestTimeout);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T> CompletableFuture<BrokerResponse<T>> sendRequest(BrokerRequest<T> brokerRequest, Duration duration) {
        return sendRequestInternal(brokerRequest, SENDER_WITHOUT_RETRY, duration);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T> CompletableFuture<BrokerResponse<T>> sendRequestWithRetry(BrokerRequest<T> brokerRequest, Duration duration) {
        return sendRequestInternal(brokerRequest, SENDER_WITH_RETRY, duration);
    }

    private <T> CompletableFuture<BrokerResponse<T>> sendRequestInternal(BrokerRequest<T> brokerRequest, TransportRequestSender transportRequestSender, Duration duration) {
        CompletableFuture<BrokerResponse<T>> completableFuture = new CompletableFuture<>();
        brokerRequest.serializeValue();
        this.actor.run(() -> {
            sendRequestInternal(brokerRequest, completableFuture, transportRequestSender, duration);
        });
        return completableFuture;
    }

    private <T> void sendRequestInternal(BrokerRequest<T> brokerRequest, CompletableFuture<BrokerResponse<T>> completableFuture, TransportRequestSender transportRequestSender, Duration duration) {
        try {
            ActorFuture<DirectBuffer> send = transportRequestSender.send(this.clientTransport, determineBrokerNodeIdProvider(brokerRequest), brokerRequest, duration);
            long currentTimeMillis = System.currentTimeMillis();
            this.actor.runOnCompletion(send, (directBuffer, th) -> {
                RequestResult requestResult = null;
                try {
                    if (th == null) {
                        requestResult = handleResponse(brokerRequest.getResponse(directBuffer), completableFuture);
                        if (requestResult.wasProcessed()) {
                            GatewayMetrics.registerSuccessfulRequest(brokerRequest.getPartitionId(), brokerRequest.getType(), System.currentTimeMillis() - currentTimeMillis);
                            return;
                        }
                    } else {
                        completableFuture.completeExceptionally(th);
                    }
                } catch (RuntimeException e) {
                    completableFuture.completeExceptionally(new ClientResponseException(e));
                }
                registerFailure(brokerRequest, requestResult, th);
            });
        } catch (NoTopologyAvailableException e) {
            completableFuture.completeExceptionally(e);
            GatewayMetrics.registerFailedRequest(brokerRequest.getPartitionId(), brokerRequest.getType(), "NO_TOPOLOGY");
        } catch (PartitionNotFoundException e2) {
            completableFuture.completeExceptionally(e2);
            GatewayMetrics.registerFailedRequest(brokerRequest.getPartitionId(), brokerRequest.getType(), "PARTITION_NOT_FOUND");
        }
    }

    private <T> void registerFailure(BrokerRequest<T> brokerRequest, RequestResult requestResult, Throwable th) {
        if (requestResult == null || requestResult.getErrorCode() != ErrorCode.RESOURCE_EXHAUSTED) {
            GatewayMetrics.registerFailedRequest(brokerRequest.getPartitionId(), brokerRequest.getType(), (requestResult == null || requestResult.getErrorCode() == ErrorCode.NULL_VAL) ? (th == null || !th.getClass().equals(TimeoutException.class)) ? "UNKNOWN" : "TIMEOUT" : requestResult.getErrorCode().toString());
        }
    }

    private <T> RequestResult handleResponse(BrokerResponse<T> brokerResponse, CompletableFuture<BrokerResponse<T>> completableFuture) {
        try {
        } catch (RuntimeException e) {
            completableFuture.completeExceptionally(new BrokerResponseException(e));
        }
        if (brokerResponse.isResponse()) {
            completableFuture.complete(brokerResponse);
            return RequestResult.processed();
        }
        if (brokerResponse.isRejection()) {
            completableFuture.completeExceptionally(new BrokerRejectionException(brokerResponse.getRejection()));
            return RequestResult.processed();
        }
        if (brokerResponse.isError()) {
            completableFuture.completeExceptionally(new BrokerErrorException(brokerResponse.getError()));
            return RequestResult.failed(brokerResponse.getError().getCode());
        }
        completableFuture.completeExceptionally(new IllegalBrokerResponseException("Expected broker response to be either response, rejection, or error, but is neither of them"));
        return RequestResult.failed(ErrorCode.NULL_VAL);
    }

    private BrokerAddressProvider determineBrokerNodeIdProvider(BrokerRequest<?> brokerRequest) {
        if (brokerRequest.getBrokerId().isPresent()) {
            return new BrokerAddressProvider((ToIntFunction<BrokerClusterState>) brokerClusterState -> {
                return brokerRequest.getBrokerId().orElseThrow().intValue();
            });
        }
        if (brokerRequest.addressesSpecificPartition()) {
            BrokerClusterState topology = this.topologyManager.getTopology();
            if (topology == null || topology.getPartitions().contains(Integer.valueOf(brokerRequest.getPartitionId()))) {
                return new BrokerAddressProvider(this, brokerRequest.getPartitionId());
            }
            throw new PartitionNotFoundException(brokerRequest.getPartitionId());
        }
        if (!brokerRequest.requiresPartitionId()) {
            return new BrokerAddressProvider(this);
        }
        if (brokerRequest instanceof BrokerPublishMessageRequest) {
            determinePartitionIdForPublishMessageRequest((BrokerPublishMessageRequest) brokerRequest);
        } else {
            int determinePartition = this.dispatchStrategy.determinePartition();
            if (determinePartition == -3) {
                determinePartition = 1;
            }
            brokerRequest.setPartitionId(determinePartition);
        }
        return new BrokerAddressProvider(this, brokerRequest.getPartitionId());
    }

    private void determinePartitionIdForPublishMessageRequest(BrokerPublishMessageRequest brokerPublishMessageRequest) {
        BrokerClusterState topology = this.topologyManager.getTopology();
        if (topology == null || topology.getPartitionsCount() == 0) {
            throw new NoTopologyAvailableException(String.format("Expected to pick partition for message with correlation key '%s', but no topology is available", BufferUtil.bufferAsString(brokerPublishMessageRequest.getCorrelationKey())));
        }
        brokerPublishMessageRequest.setPartitionId(SubscriptionUtil.getSubscriptionPartitionId(brokerPublishMessageRequest.getCorrelationKey(), topology.getPartitionsCount()));
    }
}
