package io.camunda.zeebe.transport.stream.impl;

import io.atomix.cluster.MemberId;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.transport.stream.api.ClientStreamConsumer;
import io.camunda.zeebe.transport.stream.api.ClientStreamId;
import io.camunda.zeebe.transport.stream.api.ClientStreamMetrics;
import io.camunda.zeebe.transport.stream.api.NoSuchStreamException;
import io.camunda.zeebe.transport.stream.impl.messages.PushStreamRequest;
import io.camunda.zeebe.util.buffer.BufferWriter;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import org.agrona.DirectBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/camunda/zeebe/transport/stream/impl/ClientStreamManager.class */
public final class ClientStreamManager<M extends BufferWriter> {
    private static final Logger LOG = LoggerFactory.getLogger(ClientStreamManager.class);
    private final Set<MemberId> servers = new HashSet();
    private final ClientStreamRegistry<M> registry;
    private final ClientStreamRequestManager<M> requestManager;
    private final ClientStreamMetrics metrics;
    private final ClientStreamPusher streamPusher;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ClientStreamManager(ClientStreamRegistry<M> clientStreamRegistry, ClientStreamRequestManager<M> clientStreamRequestManager, ClientStreamMetrics clientStreamMetrics) {
        this.registry = clientStreamRegistry;
        this.requestManager = clientStreamRequestManager;
        this.metrics = clientStreamMetrics;
        this.streamPusher = new ClientStreamPusher(clientStreamMetrics);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onServerJoined(MemberId memberId) {
        this.servers.add(memberId);
        this.metrics.serverCount(this.servers.size());
        this.registry.list().forEach(aggregatedClientStream -> {
            this.requestManager.add(aggregatedClientStream, memberId);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onServerRemoved(MemberId memberId) {
        this.servers.remove(memberId);
        this.metrics.serverCount(this.servers.size());
        this.requestManager.onServerRemoved(memberId);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ClientStreamId add(DirectBuffer directBuffer, M m, ClientStreamConsumer clientStreamConsumer) {
        ClientStreamImpl<M> addClient = this.registry.addClient(directBuffer, m, clientStreamConsumer);
        LOG.debug("Added new client stream [{}]", addClient.streamId());
        addClient.serverStream().open(this.requestManager, this.servers);
        return addClient.streamId();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void remove(ClientStreamId clientStreamId) {
        LOG.debug("Removing client stream [{}]", clientStreamId);
        this.registry.removeClient(clientStreamId).ifPresent(aggregatedClientStream -> {
            LOG.debug("Removing aggregated stream [{}]", aggregatedClientStream.streamId());
            aggregatedClientStream.close();
            this.requestManager.remove(aggregatedClientStream, this.servers);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close() {
        this.registry.clear();
        this.requestManager.removeAll(this.servers);
    }

    public void onPayloadReceived(PushStreamRequest pushStreamRequest, ActorFuture<Void> actorFuture) {
        UUID streamId = pushStreamRequest.streamId();
        DirectBuffer payload = pushStreamRequest.payload();
        actorFuture.onComplete((r3, th) -> {
            if (th != null) {
                this.metrics.pushFailed();
            } else {
                this.metrics.pushSucceeded();
            }
        });
        this.registry.get(streamId).ifPresentOrElse(aggregatedClientStream -> {
            try {
                this.streamPusher.push(aggregatedClientStream, payload, actorFuture);
            } catch (Exception e) {
                actorFuture.completeExceptionally(e);
            }
        }, () -> {
            this.requestManager.removeUnreliable(streamId, this.servers);
            LOG.warn("Expected to push payload to stream {}, but no stream found.", streamId);
            actorFuture.completeExceptionally(new NoSuchStreamException("Cannot forward pushed payload as chosen client stream %s was already closed".formatted(streamId)));
        });
    }
}
