package io.camunda.zeebe.transport.stream.impl;

import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.atomix.cluster.messaging.MessagingException;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.transport.stream.impl.ClientStreamRegistration;
import io.camunda.zeebe.transport.stream.impl.messages.AddStreamRequest;
import io.camunda.zeebe.transport.stream.impl.messages.AddStreamResponse;
import io.camunda.zeebe.transport.stream.impl.messages.ErrorResponse;
import io.camunda.zeebe.transport.stream.impl.messages.RemoveStreamRequest;
import io.camunda.zeebe.transport.stream.impl.messages.RemoveStreamResponse;
import io.camunda.zeebe.transport.stream.impl.messages.StreamResponseDecoder;
import io.camunda.zeebe.transport.stream.impl.messages.StreamTopics;
import io.camunda.zeebe.util.Either;
import io.camunda.zeebe.util.buffer.BufferUtil;
import io.camunda.zeebe.util.buffer.BufferWriter;
import io.camunda.zeebe.util.exception.UnrecoverableException;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.SwitchBootstraps;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/camunda/zeebe/transport/stream/impl/ClientStreamRequestManager.class */
public final class ClientStreamRequestManager<M extends BufferWriter> {
    private static final Logger LOGGER = LoggerFactory.getLogger(ClientStreamRequestManager.class);
    private static final byte[] REMOVE_ALL_REQUEST = new byte[0];
    private static final Duration RETRY_DELAY = Duration.ofSeconds(1);
    private static final Duration REQUEST_TIMEOUT = Duration.ofSeconds(5);
    private final Map<MemberId, Map<UUID, ClientStreamRegistration<M>>> registrations = new HashMap();
    private final StreamResponseDecoder responseDecoder = new StreamResponseDecoder();
    private final ClusterCommunicationService communicationService;
    private final ConcurrencyControl executor;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ClientStreamRequestManager(ClusterCommunicationService clusterCommunicationService, ConcurrencyControl concurrencyControl) {
        this.communicationService = clusterCommunicationService;
        this.executor = concurrencyControl;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void add(AggregatedClientStream<M> aggregatedClientStream, Collection<MemberId> collection) {
        Iterator<MemberId> it = collection.iterator();
        while (it.hasNext()) {
            add(aggregatedClientStream, it.next());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void add(AggregatedClientStream<M> aggregatedClientStream, MemberId memberId) {
        add(registrationFor(aggregatedClientStream, memberId));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void remove(AggregatedClientStream<M> aggregatedClientStream, Collection<MemberId> collection) {
        Iterator<MemberId> it = collection.iterator();
        while (it.hasNext()) {
            remove(aggregatedClientStream, it.next());
        }
    }

    void remove(AggregatedClientStream<M> aggregatedClientStream, MemberId memberId) {
        ClientStreamRegistration<M> clientStreamRegistration;
        Map<UUID, ClientStreamRegistration<M>> map = this.registrations.get(memberId);
        if (map == null || (clientStreamRegistration = map.get(aggregatedClientStream.streamId())) == null) {
            return;
        }
        remove(clientStreamRegistration);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeAll(Collection<MemberId> collection) {
        this.registrations.values().stream().flatMap(map -> {
            return map.values().stream();
        }).forEach((v0) -> {
            v0.transitionToClosed();
        });
        this.registrations.clear();
        collection.forEach(this::doRemoveAll);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeUnreliable(UUID uuid, Collection<MemberId> collection) {
        byte[] bufferAsArray = BufferUtil.bufferAsArray(new RemoveStreamRequest().streamId(uuid));
        collection.forEach(memberId -> {
            this.communicationService.unicast(StreamTopics.REMOVE.topic(), bufferAsArray, Function.identity(), memberId, true);
            purgeRegistration(uuid, memberId);
        });
    }

    private void add(ClientStreamRegistration<M> clientStreamRegistration) {
        if (clientStreamRegistration.state() == ClientStreamRegistration.State.ADDING || !clientStreamRegistration.transitionToAdding()) {
            return;
        }
        AddStreamRequest metadata = new AddStreamRequest().streamId(clientStreamRegistration.streamId()).streamType(clientStreamRegistration.logicalId().streamType()).metadata(clientStreamRegistration.logicalId().metadata());
        if (clientStreamRegistration.pendingRequest() != null) {
            throw new IllegalStateException("Failed to add remote client stream %s to %s; there is an incomplete pending request".formatted(clientStreamRegistration.streamId(), clientStreamRegistration.serverId()));
        }
        sendAddRequest(clientStreamRegistration, BufferUtil.bufferAsArray(metadata));
    }

    private void remove(ClientStreamRegistration<M> clientStreamRegistration) {
        if (clientStreamRegistration.state() == ClientStreamRegistration.State.INITIAL) {
            clientStreamRegistration.transitionToRemoved();
            return;
        }
        if (clientStreamRegistration.state() == ClientStreamRegistration.State.REMOVING || !clientStreamRegistration.transitionToRemoving()) {
            return;
        }
        byte[] bufferAsArray = BufferUtil.bufferAsArray(new RemoveStreamRequest().streamId(clientStreamRegistration.streamId()));
        CompletionStage<byte[]> pendingRequest = clientStreamRegistration.pendingRequest();
        if (pendingRequest == null) {
            sendRemoveRequest(clientStreamRegistration, bufferAsArray);
            return;
        }
        BiConsumer<? super byte[], ? super Throwable> biConsumer = (bArr, th) -> {
            sendRemoveRequest(clientStreamRegistration, bufferAsArray);
        };
        ConcurrencyControl concurrencyControl = this.executor;
        Objects.requireNonNull(concurrencyControl);
        pendingRequest.whenCompleteAsync(biConsumer, concurrencyControl::run);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onServerRemoved(MemberId memberId) {
        Map<UUID, ClientStreamRegistration<M>> remove = this.registrations.remove(memberId);
        if (remove == null) {
            return;
        }
        LOGGER.trace("Closing all registrations for server {}", memberId);
        remove.values().forEach((v0) -> {
            v0.transitionToClosed();
        });
    }

    ClientStreamRegistration<M> registrationFor(AggregatedClientStream<M> aggregatedClientStream, MemberId memberId) {
        return this.registrations.computeIfAbsent(memberId, memberId2 -> {
            return new HashMap();
        }).computeIfAbsent(aggregatedClientStream.streamId(), uuid -> {
            return new ClientStreamRegistration(aggregatedClientStream, memberId);
        });
    }

    private void sendAddRequest(ClientStreamRegistration<M> clientStreamRegistration, byte[] bArr) {
        if (clientStreamRegistration.state() != ClientStreamRegistration.State.ADDING) {
            return;
        }
        CompletableFuture send = this.communicationService.send(StreamTopics.ADD.topic(), bArr, Function.identity(), Function.identity(), clientStreamRegistration.serverId(), REQUEST_TIMEOUT);
        clientStreamRegistration.setPendingRequest(send);
        BiConsumer biConsumer = (bArr2, th) -> {
            handleAddResponse(clientStreamRegistration, bArr, bArr2, th);
        };
        ConcurrencyControl concurrencyControl = this.executor;
        Objects.requireNonNull(concurrencyControl);
        send.whenCompleteAsync(biConsumer, concurrencyControl::run);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v17, types: [io.camunda.zeebe.transport.stream.api.StreamResponseException] */
    private void handleAddResponse(ClientStreamRegistration<M> clientStreamRegistration, byte[] bArr, byte[] bArr2, Throwable th) {
        Throwable th2;
        ClientStreamRegistration.State state = clientStreamRegistration.state();
        if (state != ClientStreamRegistration.State.ADDING) {
            LOGGER.trace("Skip handling ADD response since the state is {}", state, th);
            return;
        }
        if (th == null) {
            Either decode = this.responseDecoder.decode(bArr2, new AddStreamResponse());
            if (decode.isRight()) {
                clientStreamRegistration.transitionToAdded();
                return;
            }
            th2 = ((ErrorResponse) decode.getLeft()).asException();
        } else {
            th2 = th;
        }
        LOGGER.warn("Failed to add stream {} on {}; will retry in {}", new Object[]{clientStreamRegistration.streamId(), clientStreamRegistration.serverId(), RETRY_DELAY, th2});
        this.executor.schedule(RETRY_DELAY, () -> {
            sendAddRequest(clientStreamRegistration, bArr);
        });
    }

    private void sendRemoveRequest(ClientStreamRegistration<M> clientStreamRegistration, byte[] bArr) {
        if (clientStreamRegistration.state() != ClientStreamRegistration.State.REMOVING) {
            return;
        }
        CompletableFuture send = this.communicationService.send(StreamTopics.REMOVE.topic(), bArr, Function.identity(), Function.identity(), clientStreamRegistration.serverId(), REQUEST_TIMEOUT);
        clientStreamRegistration.setPendingRequest(send);
        BiConsumer biConsumer = (bArr2, th) -> {
            handleRemoveResponse(clientStreamRegistration, bArr, bArr2, th);
        };
        ConcurrencyControl concurrencyControl = this.executor;
        Objects.requireNonNull(concurrencyControl);
        send.whenCompleteAsync(biConsumer, concurrencyControl::run);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v33, types: [io.camunda.zeebe.transport.stream.api.StreamResponseException] */
    private void handleRemoveResponse(ClientStreamRegistration<M> clientStreamRegistration, byte[] bArr, byte[] bArr2, Throwable th) {
        Throwable th2;
        ClientStreamRegistration.State state = clientStreamRegistration.state();
        if (state != ClientStreamRegistration.State.REMOVING) {
            LOGGER.trace("Skip handling REMOVE response since the state is {}", state, th);
            return;
        }
        if (th == null) {
            Either decode = this.responseDecoder.decode(bArr2, new RemoveStreamResponse());
            if (decode.isRight()) {
                clientStreamRegistration.transitionToRemoved();
                purgeRegistration(clientStreamRegistration.streamId(), clientStreamRegistration.serverId());
                return;
            }
            th2 = ((ErrorResponse) decode.getLeft()).asException();
        } else {
            th2 = th;
        }
        Throwable th3 = th2;
        Objects.requireNonNull(th3);
        switch ((int) SwitchBootstraps.typeSwitch(MethodHandles.lookup(), "typeSwitch", MethodType.methodType(Integer.TYPE, Object.class, Integer.TYPE), UnrecoverableException.class, MessagingException.RemoteHandlerFailure.class, MessagingException.NoSuchMemberException.class, MessagingException.ProtocolException.class).dynamicInvoker().invoke(th3, 0) /* invoke-custom */) {
            case 0:
                handleUnrecoverableExceptionOnRemove(clientStreamRegistration, (UnrecoverableException) th3);
                return;
            case 1:
                handleUnrecoverableExceptionOnRemove(clientStreamRegistration, (MessagingException.RemoteHandlerFailure) th3);
                return;
            case 2:
                handleUnrecoverableExceptionOnRemove(clientStreamRegistration, (MessagingException.NoSuchMemberException) th3);
                return;
            case 3:
                handleUnrecoverableExceptionOnRemove(clientStreamRegistration, (MessagingException.ProtocolException) th3);
                return;
            default:
                LOGGER.debug("Failed to remove remote stream {} on {}, will retry in {}", new Object[]{clientStreamRegistration.streamId(), clientStreamRegistration.serverId(), RETRY_DELAY, th2});
                this.executor.schedule(RETRY_DELAY, () -> {
                    sendRemoveRequest(clientStreamRegistration, bArr);
                });
                return;
        }
    }

    private void handleUnrecoverableExceptionOnRemove(ClientStreamRegistration<M> clientStreamRegistration, Throwable th) {
        LOGGER.debug("Failed to remove stream '{}' for member '{}'; unrecoverable error occurred on recipient\nside, will not retry.", new Object[]{clientStreamRegistration.streamId(), clientStreamRegistration.serverId(), th});
        clientStreamRegistration.transitionToRemoved();
        purgeRegistration(clientStreamRegistration.streamId(), clientStreamRegistration.serverId());
    }

    private void purgeRegistration(UUID uuid, MemberId memberId) {
        Map<UUID, ClientStreamRegistration<M>> map = this.registrations.get(memberId);
        if (map != null) {
            ClientStreamRegistration<M> remove = map.remove(uuid);
            if (remove != null) {
                remove.transitionToClosed();
            }
            if (map.isEmpty()) {
                this.registrations.remove(memberId);
            }
        }
    }

    private void doRemoveAll(MemberId memberId) {
        this.communicationService.unicast(StreamTopics.REMOVE_ALL.topic(), REMOVE_ALL_REQUEST, Function.identity(), memberId, true);
    }
}
