package io.camunda.zeebe.transport.stream.impl;

import io.atomix.cluster.MemberId;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.transport.stream.api.ClientStreamMetrics;
import io.camunda.zeebe.transport.stream.api.NoSuchStreamException;
import io.camunda.zeebe.transport.stream.api.StreamExhaustedException;
import io.camunda.zeebe.util.buffer.BufferWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.agrona.DirectBuffer;
import org.agrona.collections.Int2ObjectHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/camunda/zeebe/transport/stream/impl/AggregatedClientStream.class */
public final class AggregatedClientStream<M extends BufferWriter> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AggregatedClientStream.class);
    private final UUID streamId;
    private final LogicalId<M> logicalId;
    private final Set<MemberId> liveConnections;
    private final ClientStreamMetrics metrics;
    private final Int2ObjectHashMap<ClientStreamImpl<M>> clientStreams;
    private boolean isOpened;
    private int nextLocalId;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AggregatedClientStream(UUID uuid, LogicalId<M> logicalId) {
        this(uuid, logicalId, ClientStreamMetrics.noop());
    }

    AggregatedClientStream(UUID uuid, LogicalId<M> logicalId, ClientStreamMetrics clientStreamMetrics) {
        this.liveConnections = new HashSet();
        this.clientStreams = new Int2ObjectHashMap<>();
        this.streamId = uuid;
        this.logicalId = logicalId;
        this.metrics = clientStreamMetrics;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addClient(ClientStreamImpl<M> clientStreamImpl) {
        this.clientStreams.put(clientStreamImpl.streamId().localId(), clientStreamImpl);
        this.metrics.observeAggregatedClientCount(this.clientStreams.size());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public UUID getStreamId() {
        return this.streamId;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Collection<ClientStreamImpl<M>> list() {
        return this.clientStreams.values();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int nextLocalId() {
        int i = this.nextLocalId;
        this.nextLocalId++;
        return i;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void add(MemberId memberId) {
        this.liveConnections.add(memberId);
    }

    boolean isConnected(MemberId memberId) {
        return this.liveConnections.contains(memberId);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void remove(MemberId memberId) {
        this.liveConnections.remove(memberId);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close() {
        this.isOpened = false;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeClient(ClientStreamIdImpl clientStreamIdImpl) {
        this.clientStreams.remove(clientStreamIdImpl.localId());
        this.metrics.observeAggregatedClientCount(this.clientStreams.size());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isEmpty() {
        return this.clientStreams.isEmpty();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public LogicalId<M> logicalId() {
        return this.logicalId;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<MemberId> liveConnections() {
        return this.liveConnections;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void push(DirectBuffer directBuffer, ActorFuture<Void> actorFuture) {
        Int2ObjectHashMap.ValueCollection values = this.clientStreams.values();
        if (values.isEmpty()) {
            throw new NoSuchStreamException("Cannot forward remote payload as there is no known client streams for aggregated stream %s".formatted(this.logicalId));
        }
        tryPush(new ArrayList<>((Collection) values), ThreadLocalRandom.current().nextInt(values.size()), 1, directBuffer, actorFuture);
    }

    private void tryPush(ArrayList<ClientStreamImpl<M>> arrayList, int i, int i2, DirectBuffer directBuffer, ActorFuture<Void> actorFuture) {
        ClientStreamImpl<M> clientStreamImpl = arrayList.get(i);
        LOGGER.trace("Pushing data from stream [{}] to client [{}]", this.streamId, clientStreamImpl.streamId());
        clientStreamImpl.push(directBuffer).onComplete((r14, th) -> {
            if (th == null) {
                actorFuture.complete((Object) null);
                return;
            }
            if (i2 < arrayList.size()) {
                LOGGER.warn("Failed to push data to client [{}], retrying with next client.", clientStreamImpl.streamId(), th);
                tryPush(arrayList, (i + 1) % arrayList.size(), i2 + 1, directBuffer, actorFuture);
            } else {
                StreamExhaustedException streamExhaustedException = new StreamExhaustedException("Failed to push data to all available clients. No more clients left to retry.");
                streamExhaustedException.addSuppressed(th);
                actorFuture.completeExceptionally(streamExhaustedException);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void open(ClientStreamRequestManager<M> clientStreamRequestManager, Set<MemberId> set) {
        if (this.isOpened) {
            return;
        }
        clientStreamRequestManager.add(this, set);
        this.isOpened = true;
    }

    public String toString() {
        return "AggregatedClientStream{streamId=" + this.streamId + ", logicalId=" + this.logicalId + ", liveConnections=" + this.liveConnections + ", clientStreams=" + this.clientStreams.size() + ", isOpened=" + this.isOpened + ", nextLocalId=" + this.nextLocalId + "}";
    }
}
