package io.camunda.zeebe.transport.stream.impl;

import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.ActorControl;
import io.camunda.zeebe.transport.stream.impl.messages.AddStreamRequest;
import io.camunda.zeebe.transport.stream.impl.messages.RemoveStreamRequest;
import io.camunda.zeebe.transport.stream.impl.messages.StreamTopics;
import io.camunda.zeebe.util.buffer.BufferReader;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.agrona.concurrent.UnsafeBuffer;

/* loaded from: input_file:io/camunda/zeebe/transport/stream/impl/RemoteStreamEndpoint.class */
public final class RemoteStreamEndpoint<M extends BufferReader> extends Actor {
    private final ClusterCommunicationService transport;
    private final RemoteStreamApiHandler<M> requestHandler;

    public RemoteStreamEndpoint(ClusterCommunicationService clusterCommunicationService, RemoteStreamApiHandler<M> remoteStreamApiHandler) {
        this.transport = clusterCommunicationService;
        this.requestHandler = remoteStreamApiHandler;
    }

    public void removeAll(MemberId memberId) {
        this.actor.run(() -> {
            this.requestHandler.removeAll(memberId);
        });
    }

    protected void onActorStarting() {
        ClusterCommunicationService clusterCommunicationService = this.transport;
        String str = StreamTopics.ADD.topic();
        Function function = this::parseAddRequest;
        RemoteStreamApiHandler<M> remoteStreamApiHandler = this.requestHandler;
        Objects.requireNonNull(remoteStreamApiHandler);
        BiConsumer biConsumer = remoteStreamApiHandler::add;
        ActorControl actorControl = this.actor;
        Objects.requireNonNull(actorControl);
        clusterCommunicationService.subscribe(str, function, biConsumer, actorControl::run);
        ClusterCommunicationService clusterCommunicationService2 = this.transport;
        String str2 = StreamTopics.REMOVE.topic();
        Function function2 = this::parseRemoveRequest;
        RemoteStreamApiHandler<M> remoteStreamApiHandler2 = this.requestHandler;
        Objects.requireNonNull(remoteStreamApiHandler2);
        BiConsumer biConsumer2 = remoteStreamApiHandler2::remove;
        ActorControl actorControl2 = this.actor;
        Objects.requireNonNull(actorControl2);
        clusterCommunicationService2.subscribe(str2, function2, biConsumer2, actorControl2::run);
        ClusterCommunicationService clusterCommunicationService3 = this.transport;
        String str3 = StreamTopics.REMOVE_ALL.topic();
        Function identity = Function.identity();
        BiConsumer biConsumer3 = this::onRemoveAll;
        ActorControl actorControl3 = this.actor;
        Objects.requireNonNull(actorControl3);
        clusterCommunicationService3.subscribe(str3, identity, biConsumer3, actorControl3::run);
    }

    protected void onActorClosing() {
        this.transport.unsubscribe(StreamTopics.ADD.topic());
        this.transport.unsubscribe(StreamTopics.REMOVE.topic());
        this.transport.unsubscribe(StreamTopics.REMOVE_ALL.topic());
        this.requestHandler.close();
    }

    private void onRemoveAll(MemberId memberId, byte[] bArr) {
        this.requestHandler.removeAll(memberId);
    }

    private RemoveStreamRequest parseRemoveRequest(byte[] bArr) {
        return (RemoveStreamRequest) parseRequest(bArr, new RemoveStreamRequest());
    }

    private AddStreamRequest parseAddRequest(byte[] bArr) {
        return (AddStreamRequest) parseRequest(bArr, new AddStreamRequest());
    }

    private <R extends BufferReader> R parseRequest(byte[] bArr, R r) {
        UnsafeBuffer unsafeBuffer = new UnsafeBuffer(bArr);
        r.wrap(unsafeBuffer, 0, unsafeBuffer.capacity());
        return r;
    }
}
