package io.camunda.zeebe.transport.impl;

import io.atomix.cluster.messaging.MessagingException;
import io.atomix.cluster.messaging.MessagingService;
import io.atomix.utils.net.Address;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.transport.ClientRequest;
import io.camunda.zeebe.transport.ClientTransport;
import io.camunda.zeebe.transport.RequestType;
import java.net.ConnectException;
import java.time.Duration;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/camunda/zeebe/transport/impl/AtomixClientTransportAdapter.class */
public final class AtomixClientTransportAdapter extends Actor implements ClientTransport {
    private static final Logger LOG = LoggerFactory.getLogger(AtomixClientTransportAdapter.class);
    private static final Duration RETRY_DELAY = Duration.ofMillis(10);
    private static final String NO_REMOTE_ADDRESS_FOUND_ERROR_MESSAGE = "Failed to send request to %s, no remote address found.";
    private final MessagingService messagingService;

    public AtomixClientTransportAdapter(MessagingService messagingService) {
        this.messagingService = messagingService;
    }

    @Override // io.camunda.zeebe.transport.ClientTransport
    public ActorFuture<DirectBuffer> sendRequestWithRetry(Supplier<String> supplier, Predicate<DirectBuffer> predicate, ClientRequest clientRequest, Duration duration) {
        return sendRequestInternal(supplier, predicate, clientRequest, true, duration);
    }

    @Override // io.camunda.zeebe.transport.ClientTransport
    public ActorFuture<DirectBuffer> sendRequest(Supplier<String> supplier, ClientRequest clientRequest, Duration duration) {
        return sendRequestInternal(supplier, directBuffer -> {
            return true;
        }, clientRequest, false, duration);
    }

    private ActorFuture<DirectBuffer> sendRequestInternal(Supplier<String> supplier, Predicate<DirectBuffer> predicate, ClientRequest clientRequest, boolean z, Duration duration) {
        byte[] bArr = new byte[clientRequest.getLength()];
        clientRequest.write(new UnsafeBuffer(bArr), 0);
        int partitionId = clientRequest.getPartitionId();
        RequestType requestType = clientRequest.getRequestType();
        CompletableActorFuture completableActorFuture = new CompletableActorFuture();
        RequestContext requestContext = new RequestContext(completableActorFuture, supplier, partitionId, requestType, bArr, predicate, z, duration);
        this.actor.call(() -> {
            requestContext.setScheduledTimer(this.actor.schedule(duration, () -> {
                timeoutFuture(requestContext);
            }));
            tryToSend(requestContext);
        });
        return completableActorFuture;
    }

    private void tryToSend(RequestContext requestContext) {
        if (requestContext.isDone()) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Request {} is already done", Integer.valueOf(requestContext.hashCode()));
                return;
            }
            return;
        }
        if (!this.messagingService.isRunning()) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Messaging service is not running.");
            }
            requestContext.completeExceptionally(new IllegalStateException("Messaging service is not running."));
            return;
        }
        Duration calculateTimeout = requestContext.calculateTimeout();
        if (calculateTimeout.toMillis() <= 0) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Request {} reached timeout of {}, current calculation {}", new Object[]{Integer.valueOf(requestContext.hashCode()), requestContext.getTimeout(), calculateTimeout});
                return;
            }
            return;
        }
        Address nodeAddress = requestContext.getNodeAddress();
        if (nodeAddress != null) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Send request {} to {} with topic {}", new Object[]{Integer.valueOf(requestContext.hashCode()), requestContext.getNodeAddress(), requestContext.getTopicName()});
            }
            this.messagingService.sendAndReceive(nodeAddress, requestContext.getTopicName(), requestContext.getRequestBytes(), calculateTimeout).whenComplete((bArr, th) -> {
                this.actor.run(() -> {
                    handleResponse(requestContext, bArr, th);
                });
            });
        } else if (requestContext.shouldRetry()) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("No target address for request {}, retry after {}.", Integer.valueOf(requestContext.hashCode()), RETRY_DELAY);
            }
            this.actor.schedule(RETRY_DELAY, () -> {
                tryToSend(requestContext);
            });
        } else {
            if (LOG.isTraceEnabled()) {
                LOG.trace("No target address for request {}, will fail request.", Integer.valueOf(requestContext.hashCode()));
            }
            requestContext.completeExceptionally(new ConnectException(String.format(NO_REMOTE_ADDRESS_FOUND_ERROR_MESSAGE, requestContext.getTopicName())));
        }
    }

    private void handleResponse(RequestContext requestContext, byte[] bArr, Throwable th) {
        if (requestContext.isDone()) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Handle response, but request {} is already done", Integer.valueOf(requestContext.hashCode()));
                return;
            }
            return;
        }
        if (th == null) {
            UnsafeBuffer unsafeBuffer = new UnsafeBuffer(bArr);
            if (requestContext.verifyResponse(unsafeBuffer)) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Got valid response for request {}.", Integer.valueOf(requestContext.hashCode()));
                }
                requestContext.complete(unsafeBuffer);
                return;
            } else {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Got invalid response for request {}, retry in {}.", Integer.valueOf(requestContext.hashCode()), RETRY_DELAY);
                }
                this.actor.schedule(RETRY_DELAY, () -> {
                    tryToSend(requestContext);
                });
                return;
            }
        }
        Throwable cause = th.getCause();
        if ((exceptionShowsConnectionIssue(th) || exceptionShowsConnectionIssue(cause)) && requestContext.shouldRetry()) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Request {} failed, but will retry after delay {}", new Object[]{Integer.valueOf(requestContext.hashCode()), RETRY_DELAY, th});
            }
            this.actor.schedule(RETRY_DELAY, () -> {
                tryToSend(requestContext);
            });
        } else {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Request {} failed, will not retry!", Integer.valueOf(requestContext.hashCode()), th);
            }
            requestContext.completeExceptionally(th);
        }
    }

    private boolean exceptionShowsConnectionIssue(Throwable th) {
        return (th instanceof ConnectException) || (th instanceof MessagingException.NoRemoteHandler);
    }

    private void timeoutFuture(RequestContext requestContext) {
        if (requestContext.isDone()) {
            return;
        }
        requestContext.timeout();
    }
}
