package io.camunda.zeebe.transport.stream.impl;

import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.ActorControl;
import io.camunda.zeebe.transport.stream.api.RemoteStream;
import io.camunda.zeebe.transport.stream.api.RemoteStreamErrorHandler;
import io.camunda.zeebe.transport.stream.api.RemoteStreamMetrics;
import io.camunda.zeebe.transport.stream.api.RemoteStreamer;
import io.camunda.zeebe.transport.stream.impl.RemoteStreamPusher;
import io.camunda.zeebe.transport.stream.impl.messages.PushStreamRequest;
import io.camunda.zeebe.transport.stream.impl.messages.StreamTopics;
import io.camunda.zeebe.util.buffer.BufferUtil;
import io.camunda.zeebe.util.buffer.BufferWriter;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;

/* loaded from: input_file:io/camunda/zeebe/transport/stream/impl/RemoteStreamerImpl.class */
public final class RemoteStreamerImpl<M, P extends BufferWriter> extends Actor implements RemoteStreamer<M, P> {
    private static final Duration REQUEST_TIMEOUT = Duration.ofSeconds(5);
    private final ClusterCommunicationService transport;
    private final ImmutableStreamRegistry<M> registry;
    private final RemoteStreamPusher<P> remoteStreamPusher;
    private final RemoteStreamErrorHandler<P> errorHandler;

    public RemoteStreamerImpl(ClusterCommunicationService clusterCommunicationService, ImmutableStreamRegistry<M> immutableStreamRegistry, RemoteStreamErrorHandler<P> remoteStreamErrorHandler, RemoteStreamMetrics remoteStreamMetrics) {
        this.transport = (ClusterCommunicationService) Objects.requireNonNull(clusterCommunicationService, "must specify a network transport");
        this.registry = (ImmutableStreamRegistry) Objects.requireNonNull(immutableStreamRegistry, "must specify a job stream registry");
        this.errorHandler = (RemoteStreamErrorHandler) Objects.requireNonNull(remoteStreamErrorHandler, "must specify an error handler");
        RemoteStreamPusher.Transport transport = this::send;
        ActorControl actorControl = this.actor;
        Objects.requireNonNull(actorControl);
        this.remoteStreamPusher = new RemoteStreamPusher<>(transport, actorControl::run, remoteStreamMetrics);
    }

    @Override // io.camunda.zeebe.transport.stream.api.RemoteStreamer
    public Optional<RemoteStream<M, P>> streamFor(DirectBuffer directBuffer, Predicate<M> predicate) {
        Set<AggregatedRemoteStream<M>> set = (Set) this.registry.get(new UnsafeBuffer(directBuffer)).stream().filter(aggregatedRemoteStream -> {
            return predicate.test(aggregatedRemoteStream.metadata());
        }).collect(Collectors.toSet());
        return set.isEmpty() ? Optional.empty() : (Optional<RemoteStream<M, P>>) pickStream(set).map(aggregatedRemoteStream2 -> {
            return new RemoteStreamImpl(aggregatedRemoteStream2, this.remoteStreamPusher, this.errorHandler);
        });
    }

    private Optional<AggregatedRemoteStream<M>> pickStream(Set<AggregatedRemoteStream<M>> set) {
        ArrayList arrayList = new ArrayList(set);
        Collections.shuffle(arrayList);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            AggregatedRemoteStream aggregatedRemoteStream = (AggregatedRemoteStream) it.next();
            if (!aggregatedRemoteStream.streamConsumers().isEmpty()) {
                return Optional.of(aggregatedRemoteStream);
            }
        }
        return Optional.empty();
    }

    private CompletableFuture<Void> send(PushStreamRequest pushStreamRequest, MemberId memberId) {
        return this.transport.send(StreamTopics.PUSH.topic(), pushStreamRequest, (v0) -> {
            return BufferUtil.bufferAsArray(v0);
        }, Function.identity(), memberId, REQUEST_TIMEOUT).thenApply(bArr -> {
            return null;
        });
    }
}
