package io.camunda.zeebe.transport.stream.impl;

import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.atomix.cluster.messaging.MessagingException;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.ActorControl;
import io.camunda.zeebe.transport.stream.impl.messages.MessageUtil;
import io.camunda.zeebe.transport.stream.impl.messages.StreamTopics;
import io.camunda.zeebe.util.ExponentialBackoff;
import io.camunda.zeebe.util.buffer.BufferUtil;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.SwitchBootstraps;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.LongUnaryOperator;
import org.agrona.collections.ArrayUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/camunda/zeebe/transport/stream/impl/RemoteStreamTransport.class */
public final class RemoteStreamTransport<M> extends Actor {
    private static final Duration REQUEST_TIMEOUT = Duration.ofSeconds(5);
    private static final Logger LOG = LoggerFactory.getLogger(RemoteStreamTransport.class);
    private static final int INITIAL_RETRY_DELAY_MS = 100;
    private final ClusterCommunicationService transport;
    private final RemoteStreamApiHandler<M> requestHandler;
    private final LongUnaryOperator retryDelaySupplier;

    public RemoteStreamTransport(ClusterCommunicationService clusterCommunicationService, RemoteStreamApiHandler<M> remoteStreamApiHandler) {
        this(clusterCommunicationService, remoteStreamApiHandler, new ExponentialBackoff());
    }

    RemoteStreamTransport(ClusterCommunicationService clusterCommunicationService, RemoteStreamApiHandler<M> remoteStreamApiHandler, LongUnaryOperator longUnaryOperator) {
        this.transport = clusterCommunicationService;
        this.requestHandler = remoteStreamApiHandler;
        this.retryDelaySupplier = longUnaryOperator;
    }

    protected void onActorStarting() {
        ClusterCommunicationService clusterCommunicationService = this.transport;
        String str = StreamTopics.ADD.topic();
        Function function = MessageUtil::parseAddRequest;
        RemoteStreamApiHandler<M> remoteStreamApiHandler = this.requestHandler;
        Objects.requireNonNull(remoteStreamApiHandler);
        BiFunction biFunction = remoteStreamApiHandler::add;
        Function function2 = (v0) -> {
            return BufferUtil.bufferAsArray(v0);
        };
        ActorControl actorControl = this.actor;
        Objects.requireNonNull(actorControl);
        clusterCommunicationService.replyTo(str, function, biFunction, function2, actorControl::run);
        ClusterCommunicationService clusterCommunicationService2 = this.transport;
        String str2 = StreamTopics.REMOVE.topic();
        Function function3 = MessageUtil::parseRemoveRequest;
        RemoteStreamApiHandler<M> remoteStreamApiHandler2 = this.requestHandler;
        Objects.requireNonNull(remoteStreamApiHandler2);
        BiFunction biFunction2 = remoteStreamApiHandler2::remove;
        Function function4 = (v0) -> {
            return BufferUtil.bufferAsArray(v0);
        };
        ActorControl actorControl2 = this.actor;
        Objects.requireNonNull(actorControl2);
        clusterCommunicationService2.replyTo(str2, function3, biFunction2, function4, actorControl2::run);
        ClusterCommunicationService clusterCommunicationService3 = this.transport;
        String str3 = StreamTopics.REMOVE_ALL.topic();
        Function identity = Function.identity();
        BiFunction biFunction3 = this::onRemoveAll;
        Function identity2 = Function.identity();
        ActorControl actorControl3 = this.actor;
        Objects.requireNonNull(actorControl3);
        clusterCommunicationService3.replyTo(str3, identity, biFunction3, identity2, actorControl3::run);
    }

    protected void onActorClosing() {
        this.transport.unsubscribe(StreamTopics.ADD.topic());
        this.transport.unsubscribe(StreamTopics.REMOVE.topic());
        this.transport.unsubscribe(StreamTopics.REMOVE_ALL.topic());
        this.requestHandler.close();
    }

    public void removeAll(MemberId memberId) {
        this.actor.run(() -> {
            this.requestHandler.removeAll(memberId);
        });
    }

    private byte[] onRemoveAll(MemberId memberId, byte[] bArr) {
        this.requestHandler.removeAll(memberId);
        return ArrayUtil.EMPTY_BYTE_ARRAY;
    }

    public CompletableFuture<Void> restartStreams(MemberId memberId) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        sendRestartStreamsRequest(memberId, completableFuture, 100L);
        return completableFuture;
    }

    private void sendRestartStreamsRequest(MemberId memberId, CompletableFuture<Void> completableFuture, long j) {
        try {
            sendRestartStreamsRequest(memberId).whenCompleteAsync((r12, th) -> {
                onRestartStreamsResponse(memberId, th, completableFuture, j);
            }, (Executor) this.actor);
        } catch (Exception e) {
            LOG.warn("Failed to restart streams for member '{}'", memberId, e);
        }
    }

    private CompletableFuture<Void> sendRestartStreamsRequest(MemberId memberId) {
        return this.transport.send(StreamTopics.RESTART_STREAMS.topic(), ArrayUtil.EMPTY_BYTE_ARRAY, Function.identity(), Function.identity(), memberId, REQUEST_TIMEOUT).thenApply(bArr -> {
            return null;
        });
    }

    private void onRestartStreamsResponse(MemberId memberId, Throwable th, CompletableFuture<Void> completableFuture, long j) {
        if (th == null) {
            LOG.debug("Requested streams from client service member '{}'", memberId);
            completableFuture.complete(null);
            return;
        }
        MessagingException.NoSuchMemberException cause = th.getCause();
        Objects.requireNonNull(cause);
        switch ((int) SwitchBootstraps.typeSwitch(MethodHandles.lookup(), "typeSwitch", MethodType.methodType(Integer.TYPE, Object.class, Integer.TYPE), MessagingException.NoSuchMemberException.class, MessagingException.NoRemoteHandler.class, MessagingException.RemoteHandlerFailure.class).dynamicInvoker().invoke(cause, 0) /* invoke-custom */) {
            case 0:
                LOG.trace("Failed to restart streams for member '{}', which has been removed from the\nmembership protocol; can be safely ignored.", memberId, cause);
                completableFuture.complete(null);
                return;
            case 1:
                LOG.trace("Failed to restart streams for member '{}'; either it's not a client\nstream service, or it's still starting up. Can be safely ignored.", memberId, (MessagingException.NoRemoteHandler) cause);
                completableFuture.complete(null);
                return;
            case 2:
                MessagingException.RemoteHandlerFailure remoteHandlerFailure = (MessagingException.RemoteHandlerFailure) cause;
                LOG.warn("Failed to restart streams for member '{}'; unrecoverable error occurred on recipient\nside, will not retry.", memberId, remoteHandlerFailure);
                completableFuture.completeExceptionally(remoteHandlerFailure);
                return;
            default:
                LOG.warn("Failed to restart streams for member '{}', retrying in {}ms", new Object[]{memberId, Long.valueOf(j), th});
                long applyAsLong = this.retryDelaySupplier.applyAsLong(j);
                this.actor.schedule(Duration.ofMillis(applyAsLong), () -> {
                    sendRestartStreamsRequest(memberId, completableFuture, applyAsLong);
                });
                return;
        }
    }
}
