package io.camunda.zeebe.transport.stream.impl;

import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.ActorControl;
import io.camunda.zeebe.scheduler.ActorSchedulingService;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.transport.stream.api.ClientStream;
import io.camunda.zeebe.transport.stream.api.ClientStreamConsumer;
import io.camunda.zeebe.transport.stream.api.ClientStreamId;
import io.camunda.zeebe.transport.stream.api.ClientStreamMetrics;
import io.camunda.zeebe.transport.stream.api.ClientStreamService;
import io.camunda.zeebe.transport.stream.api.ClientStreamer;
import io.camunda.zeebe.transport.stream.impl.messages.MessageUtil;
import io.camunda.zeebe.transport.stream.impl.messages.StreamTopics;
import io.camunda.zeebe.util.buffer.BufferWriter;
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.agrona.DirectBuffer;

/* loaded from: input_file:io/camunda/zeebe/transport/stream/impl/ClientStreamServiceImpl.class */
public final class ClientStreamServiceImpl<M extends BufferWriter> extends Actor implements ClientStreamer<M>, ClientStreamService<M> {
    private static final byte[] SUCCESS_RESPONSE = new byte[0];
    private final ClientStreamManager<M> clientStreamManager;
    private final ClusterCommunicationService communicationService;
    private final ClientStreamRegistry<M> registry;

    public ClientStreamServiceImpl(ClusterCommunicationService clusterCommunicationService, ClientStreamMetrics clientStreamMetrics) {
        this.communicationService = clusterCommunicationService;
        this.registry = new ClientStreamRegistry<>(clientStreamMetrics);
        this.clientStreamManager = new ClientStreamManager<>(this.registry, new ClientStreamRequestManager(clusterCommunicationService, this.actor), clientStreamMetrics);
    }

    protected void onActorStarted() {
        this.communicationService.replyTo(StreamTopics.PUSH.topic(), MessageUtil::parsePushRequest, pushStreamRequest -> {
            CompletableFuture completableFuture = new CompletableFuture();
            this.actor.run(() -> {
                try {
                    ActorFuture<Void> completableActorFuture = new CompletableActorFuture<>();
                    this.clientStreamManager.onPayloadReceived(pushStreamRequest, completableActorFuture);
                    completableActorFuture.onComplete((r4, th) -> {
                        if (th == null) {
                            completableFuture.complete(null);
                        } else {
                            completableFuture.completeExceptionally(th);
                        }
                    });
                } catch (Exception e) {
                    completableFuture.completeExceptionally(e);
                }
            });
            return completableFuture;
        }, r2 -> {
            return SUCCESS_RESPONSE;
        });
        ClusterCommunicationService clusterCommunicationService = this.communicationService;
        String str = StreamTopics.RESTART_STREAMS.topic();
        Function identity = Function.identity();
        BiFunction biFunction = (memberId, bArr) -> {
            this.clientStreamManager.onServerRemoved(MemberId.from((String) memberId.id()));
            this.clientStreamManager.onServerJoined(MemberId.from((String) memberId.id()));
            return SUCCESS_RESPONSE;
        };
        Function identity2 = Function.identity();
        ActorControl actorControl = this.actor;
        Objects.requireNonNull(actorControl);
        clusterCommunicationService.replyTo(str, identity, biFunction, identity2, actorControl::run);
    }

    protected void onActorCloseRequested() {
        this.clientStreamManager.close();
    }

    @Override // io.camunda.zeebe.transport.stream.api.ClientStreamer
    public ActorFuture<ClientStreamId> add(DirectBuffer directBuffer, M m, ClientStreamConsumer clientStreamConsumer) {
        return this.actor.call(() -> {
            return this.clientStreamManager.add(directBuffer, m, clientStreamConsumer);
        });
    }

    @Override // io.camunda.zeebe.transport.stream.api.ClientStreamer
    public ActorFuture<Void> remove(ClientStreamId clientStreamId) {
        return this.actor.call(() -> {
            this.clientStreamManager.remove(clientStreamId);
        });
    }

    @Override // io.camunda.zeebe.transport.stream.api.ClientStreamService
    public ActorFuture<Void> start(ActorSchedulingService actorSchedulingService) {
        return actorSchedulingService.submitActor(this);
    }

    @Override // io.camunda.zeebe.transport.stream.api.ClientStreamService
    public void onServerJoined(MemberId memberId) {
        this.actor.run(() -> {
            this.clientStreamManager.onServerJoined(memberId);
        });
    }

    @Override // io.camunda.zeebe.transport.stream.api.ClientStreamService
    public void onServerRemoved(MemberId memberId) {
        this.actor.run(() -> {
            this.clientStreamManager.onServerRemoved(memberId);
        });
    }

    @Override // io.camunda.zeebe.transport.stream.api.ClientStreamService
    public ClientStreamer<M> streamer() {
        return this;
    }

    @Override // io.camunda.zeebe.transport.stream.api.ClientStreamService
    public ActorFuture<Optional<ClientStream<M>>> streamFor(ClientStreamId clientStreamId) {
        return this.actor.call(() -> {
            return this.registry.getClient(clientStreamId).map(clientStreamImpl -> {
                return clientStreamImpl;
            });
        });
    }

    @Override // io.camunda.zeebe.transport.stream.api.ClientStreamService
    public ActorFuture<Collection<ClientStream<M>>> streams() {
        return this.actor.call(() -> {
            return this.registry.list().stream().flatMap(aggregatedClientStream -> {
                return aggregatedClientStream.list().stream();
            }).map(clientStreamImpl -> {
                return clientStreamImpl;
            }).toList();
        });
    }
}
