package io.camunda.zeebe.transport.stream.impl;

import io.atomix.cluster.ClusterMembershipEvent;
import io.atomix.cluster.Member;
import io.camunda.zeebe.scheduler.ActorSchedulingService;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.ActorFutureCollector;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.transport.stream.api.RemoteStreamInfo;
import io.camunda.zeebe.transport.stream.api.RemoteStreamService;
import io.camunda.zeebe.transport.stream.api.RemoteStreamer;
import io.camunda.zeebe.util.buffer.BufferWriter;
import java.util.Collection;
import java.util.HashSet;
import java.util.stream.Stream;

/* loaded from: input_file:io/camunda/zeebe/transport/stream/impl/RemoteStreamServiceImpl.class */
public class RemoteStreamServiceImpl<M, P extends BufferWriter> implements RemoteStreamService<M, P> {
    private final RemoteStreamerImpl<M, P> streamer;
    private final RemoteStreamTransport<M> apiServer;
    private final RemoteStreamRegistry<M> registry;

    public RemoteStreamServiceImpl(RemoteStreamerImpl<M, P> remoteStreamerImpl, RemoteStreamTransport<M> remoteStreamTransport, RemoteStreamRegistry<M> remoteStreamRegistry) {
        this.streamer = remoteStreamerImpl;
        this.apiServer = remoteStreamTransport;
        this.registry = remoteStreamRegistry;
    }

    @Override // io.camunda.zeebe.transport.stream.api.RemoteStreamService
    public ActorFuture<RemoteStreamer<M, P>> start(ActorSchedulingService actorSchedulingService, ConcurrencyControl concurrencyControl) {
        CompletableActorFuture completableActorFuture = new CompletableActorFuture();
        concurrencyControl.runOnCompletion((ActorFuture) Stream.of((Object[]) new ActorFuture[]{actorSchedulingService.submitActor(this.streamer), actorSchedulingService.submitActor(this.apiServer)}).collect(new ActorFutureCollector(concurrencyControl)), (list, th) -> {
            if (th != null) {
                completableActorFuture.completeExceptionally(th);
            } else {
                completableActorFuture.complete(this.streamer);
            }
        });
        return completableActorFuture;
    }

    @Override // io.camunda.zeebe.transport.stream.api.RemoteStreamService
    public ActorFuture<Void> closeAsync(ConcurrencyControl concurrencyControl) {
        CompletableActorFuture completableActorFuture = new CompletableActorFuture();
        concurrencyControl.runOnCompletion((ActorFuture) Stream.of((Object[]) new ActorFuture[]{this.streamer.closeAsync(), this.apiServer.closeAsync()}).collect(new ActorFutureCollector(concurrencyControl)), (list, th) -> {
            if (th != null) {
                completableActorFuture.completeExceptionally(th);
            } else {
                completableActorFuture.complete((Object) null);
            }
        });
        return completableActorFuture;
    }

    @Override // io.camunda.zeebe.transport.stream.api.RemoteStreamService
    public Collection<RemoteStreamInfo<M>> streams() {
        return new HashSet(this.registry.list());
    }

    public boolean isRelevant(ClusterMembershipEvent clusterMembershipEvent) {
        return clusterMembershipEvent.type() == ClusterMembershipEvent.Type.MEMBER_REMOVED || clusterMembershipEvent.type() == ClusterMembershipEvent.Type.MEMBER_ADDED;
    }

    public void event(ClusterMembershipEvent clusterMembershipEvent) {
        ClusterMembershipEvent.Type type = clusterMembershipEvent.type();
        if (type == ClusterMembershipEvent.Type.MEMBER_REMOVED) {
            this.apiServer.removeAll(((Member) clusterMembershipEvent.subject()).id());
        } else if (type == ClusterMembershipEvent.Type.MEMBER_ADDED) {
            this.apiServer.restartStreams(((Member) clusterMembershipEvent.subject()).id());
        }
    }
}
