package io.camunda.zeebe.transport.impl;

import io.atomix.cluster.messaging.MessagingService;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.transport.RequestHandler;
import io.camunda.zeebe.transport.RequestType;
import io.camunda.zeebe.transport.ServerResponse;
import io.camunda.zeebe.transport.ServerTransport;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.concurrent.UnsafeBuffer;
import org.slf4j.Logger;

/* loaded from: input_file:io/camunda/zeebe/transport/impl/AtomixServerTransport.class */
public class AtomixServerTransport extends Actor implements ServerTransport {
    private static final Logger LOG = Loggers.TRANSPORT_LOGGER;
    private static final String API_TOPIC_FORMAT = "%s-api-%d";
    private static final String ERROR_MSG_MISSING_PARTITON_MAP = "Node already unsubscribed from partition %d, this can only happen when atomix does not cleanly remove its handlers.";
    private final Int2ObjectHashMap<Long2ObjectHashMap<CompletableFuture<byte[]>>> partitionsRequestMap = new Int2ObjectHashMap<>();
    private final AtomicLong requestCount = new AtomicLong(0);
    private final MessagingService messagingService;

    public AtomixServerTransport(MessagingService messagingService) {
        this.messagingService = messagingService;
    }

    public String getName() {
        return "ServerTransport";
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.actor.call(() -> {
            Int2ObjectHashMap.KeyIterator it = this.partitionsRequestMap.keySet().iterator();
            while (it.hasNext()) {
                removePartition(((Integer) it.next()).intValue());
            }
            this.actor.close();
        }).join();
    }

    @Override // io.camunda.zeebe.transport.ServerTransport
    public ActorFuture<Void> subscribe(int i, RequestType requestType, RequestHandler requestHandler) {
        return this.actor.call(() -> {
            String str = topicName(i, requestType);
            LOG.trace("Subscribe for topic {}", str);
            this.partitionsRequestMap.computeIfAbsent(i, i2 -> {
                return new Long2ObjectHashMap();
            });
            this.messagingService.registerHandler(str, (address, bArr) -> {
                return handleAtomixRequest(bArr, i, requestType, requestHandler);
            });
        });
    }

    @Override // io.camunda.zeebe.transport.ServerTransport
    public ActorFuture<Void> unsubscribe(int i, RequestType requestType) {
        return this.actor.call(() -> {
            removeRequestHandlers(i, requestType);
        });
    }

    private void removePartition(int i) {
        Arrays.stream(RequestType.values()).forEach(requestType -> {
            removeRequestHandlers(i, requestType);
        });
        Long2ObjectHashMap long2ObjectHashMap = (Long2ObjectHashMap) this.partitionsRequestMap.remove(i);
        if (long2ObjectHashMap != null) {
            long2ObjectHashMap.clear();
        }
    }

    private void removeRequestHandlers(int i, RequestType requestType) {
        String str = topicName(i, requestType);
        LOG.trace("Unsubscribe from topic {}", str);
        this.messagingService.unregisterHandler(str);
    }

    private CompletableFuture<byte[]> handleAtomixRequest(byte[] bArr, int i, RequestType requestType, RequestHandler requestHandler) {
        CompletableFuture<byte[]> completableFuture = new CompletableFuture<>();
        this.actor.call(() -> {
            long andIncrement = this.requestCount.getAndIncrement();
            Long2ObjectHashMap long2ObjectHashMap = (Long2ObjectHashMap) this.partitionsRequestMap.get(i);
            if (long2ObjectHashMap == null) {
                String format = String.format(ERROR_MSG_MISSING_PARTITON_MAP, Integer.valueOf(i));
                LOG.trace(format);
                completableFuture.completeExceptionally(new IllegalStateException(format));
                return;
            }
            try {
                requestHandler.onRequest(this, i, andIncrement, new UnsafeBuffer(bArr), 0, bArr.length);
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Handled request {} for topic {}", Long.valueOf(andIncrement), topicName(i, requestType));
                }
                long2ObjectHashMap.put(andIncrement, completableFuture);
            } catch (Exception e) {
                LOG.error("Unexpected exception on handling request for partition {}.", Integer.valueOf(i), e);
                completableFuture.completeExceptionally(e);
            }
        });
        return completableFuture;
    }

    @Override // io.camunda.zeebe.transport.ServerOutput
    public void sendResponse(ServerResponse serverResponse) {
        long requestId = serverResponse.getRequestId();
        int partitionId = serverResponse.getPartitionId();
        byte[] bArr = new byte[serverResponse.getLength()];
        serverResponse.write(new UnsafeBuffer(bArr), 0);
        this.actor.run(() -> {
            Long2ObjectHashMap long2ObjectHashMap = (Long2ObjectHashMap) this.partitionsRequestMap.get(partitionId);
            if (long2ObjectHashMap == null) {
                LOG.warn("Node is no longer leader for partition {}, tried to respond on request with id {}", Integer.valueOf(partitionId), Long.valueOf(requestId));
                return;
            }
            CompletableFuture completableFuture = (CompletableFuture) long2ObjectHashMap.remove(requestId);
            if (completableFuture != null) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Send response to request {}", Long.valueOf(requestId));
                }
                completableFuture.complete(bArr);
            } else if (LOG.isTraceEnabled()) {
                LOG.trace("Wasn't able to send response to request {}", Long.valueOf(requestId));
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static String topicName(int i, RequestType requestType) {
        return String.format(API_TOPIC_FORMAT, requestType.getId(), Integer.valueOf(i));
    }
}
