package io.camunda.zeebe.transport.stream.impl;

import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.ActorControl;
import io.camunda.zeebe.scheduler.ActorSchedulingService;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.transport.stream.api.ClientStream;
import io.camunda.zeebe.transport.stream.api.ClientStreamConsumer;
import io.camunda.zeebe.transport.stream.api.ClientStreamId;
import io.camunda.zeebe.transport.stream.api.ClientStreamMetrics;
import io.camunda.zeebe.transport.stream.api.ClientStreamService;
import io.camunda.zeebe.transport.stream.api.ClientStreamer;
import io.camunda.zeebe.transport.stream.impl.messages.MessageUtil;
import io.camunda.zeebe.transport.stream.impl.messages.StreamTopics;
import io.camunda.zeebe.util.buffer.BufferUtil;
import io.camunda.zeebe.util.buffer.BufferWriter;
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.agrona.DirectBuffer;

/* loaded from: input_file:io/camunda/zeebe/transport/stream/impl/ClientStreamServiceImpl.class */
public final class ClientStreamServiceImpl<M extends BufferWriter> extends Actor implements ClientStreamer<M>, ClientStreamService<M> {
    private final ClientStreamManager<M> clientStreamManager;
    private final ClusterCommunicationService communicationService;
    private final ClientStreamRegistry<M> registry;
    private final ClientStreamApiHandler apiHandler;

    public ClientStreamServiceImpl(ClusterCommunicationService clusterCommunicationService, ClientStreamMetrics clientStreamMetrics) {
        this.communicationService = clusterCommunicationService;
        this.registry = new ClientStreamRegistry<>(clientStreamMetrics);
        this.clientStreamManager = new ClientStreamManager<>(this.registry, new ClientStreamRequestManager(clusterCommunicationService, this.actor), clientStreamMetrics);
        this.apiHandler = new ClientStreamApiHandler(this.clientStreamManager, this.actor);
    }

    protected void onActorStarted() {
        ClusterCommunicationService clusterCommunicationService = this.communicationService;
        String str = StreamTopics.PUSH.topic();
        Function function = MessageUtil::parsePushRequest;
        ClientStreamApiHandler clientStreamApiHandler = this.apiHandler;
        Objects.requireNonNull(clientStreamApiHandler);
        Function function2 = clientStreamApiHandler::handlePushRequest;
        Function function3 = (v0) -> {
            return BufferUtil.bufferAsArray(v0);
        };
        ActorControl actorControl = this.actor;
        Objects.requireNonNull(actorControl);
        clusterCommunicationService.replyToAsync(str, function, function2, function3, actorControl::run);
        ClusterCommunicationService clusterCommunicationService2 = this.communicationService;
        String str2 = StreamTopics.RESTART_STREAMS.topic();
        Function identity = Function.identity();
        ClientStreamApiHandler clientStreamApiHandler2 = this.apiHandler;
        Objects.requireNonNull(clientStreamApiHandler2);
        BiFunction biFunction = clientStreamApiHandler2::handleRestartRequest;
        Function identity2 = Function.identity();
        ActorControl actorControl2 = this.actor;
        Objects.requireNonNull(actorControl2);
        clusterCommunicationService2.replyTo(str2, identity, biFunction, identity2, actorControl2::run);
    }

    protected void onActorCloseRequested() {
        this.clientStreamManager.close();
    }

    @Override // io.camunda.zeebe.transport.stream.api.ClientStreamer
    public ActorFuture<ClientStreamId> add(DirectBuffer directBuffer, M m, ClientStreamConsumer clientStreamConsumer) {
        return this.actor.call(() -> {
            return this.clientStreamManager.add(directBuffer, m, clientStreamConsumer);
        });
    }

    @Override // io.camunda.zeebe.transport.stream.api.ClientStreamer
    public ActorFuture<Void> remove(ClientStreamId clientStreamId) {
        return this.actor.call(() -> {
            this.clientStreamManager.remove(clientStreamId);
        });
    }

    @Override // io.camunda.zeebe.transport.stream.api.ClientStreamService
    public ActorFuture<Void> start(ActorSchedulingService actorSchedulingService) {
        return actorSchedulingService.submitActor(this);
    }

    @Override // io.camunda.zeebe.transport.stream.api.ClientStreamService
    public void onServerJoined(MemberId memberId) {
        this.actor.run(() -> {
            this.clientStreamManager.onServerJoined(memberId);
        });
    }

    @Override // io.camunda.zeebe.transport.stream.api.ClientStreamService
    public void onServerRemoved(MemberId memberId) {
        this.actor.run(() -> {
            this.clientStreamManager.onServerRemoved(memberId);
        });
    }

    @Override // io.camunda.zeebe.transport.stream.api.ClientStreamService
    public ClientStreamer<M> streamer() {
        return this;
    }

    @Override // io.camunda.zeebe.transport.stream.api.ClientStreamService
    public ActorFuture<Optional<ClientStream<M>>> streamFor(ClientStreamId clientStreamId) {
        return this.actor.call(() -> {
            return this.registry.getClient(clientStreamId).map(clientStreamImpl -> {
                return clientStreamImpl;
            });
        });
    }

    @Override // io.camunda.zeebe.transport.stream.api.ClientStreamService
    public ActorFuture<Collection<ClientStream<M>>> streams() {
        return this.actor.call(() -> {
            return this.registry.list().stream().flatMap(aggregatedClientStream -> {
                return aggregatedClientStream.list().stream();
            }).map(clientStreamImpl -> {
                return clientStreamImpl;
            }).toList();
        });
    }
}
