package io.camunda.zeebe.transport.stream.impl;

import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.ActorControl;
import io.camunda.zeebe.transport.stream.api.RemoteStream;
import io.camunda.zeebe.transport.stream.api.RemoteStreamer;
import io.camunda.zeebe.transport.stream.impl.ImmutableStreamRegistry;
import io.camunda.zeebe.transport.stream.impl.RemoteStreamPusher;
import io.camunda.zeebe.transport.stream.impl.messages.PushStreamRequest;
import io.camunda.zeebe.transport.stream.impl.messages.StreamTopics;
import io.camunda.zeebe.util.buffer.BufferReader;
import io.camunda.zeebe.util.buffer.BufferWriter;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;

/* loaded from: input_file:io/camunda/zeebe/transport/stream/impl/RemoteStreamerImpl.class */
public final class RemoteStreamerImpl<M extends BufferReader, P extends BufferWriter> extends Actor implements RemoteStreamer<M, P> {
    private static final Duration REQUEST_TIMEOUT = Duration.ofSeconds(5);
    private final RemoteStreamPicker<M> streamPicker = new RandomStreamPicker();
    private final ClusterCommunicationService transport;
    private final ImmutableStreamRegistry<M> registry;

    public RemoteStreamerImpl(ClusterCommunicationService clusterCommunicationService, ImmutableStreamRegistry<M> immutableStreamRegistry) {
        this.transport = (ClusterCommunicationService) Objects.requireNonNull(clusterCommunicationService, "must specify a network transport");
        this.registry = (ImmutableStreamRegistry) Objects.requireNonNull(immutableStreamRegistry, "must specify a job stream registry");
    }

    @Override // io.camunda.zeebe.transport.stream.api.RemoteStreamer
    public Optional<RemoteStream<M, P>> streamFor(DirectBuffer directBuffer) {
        Set<ImmutableStreamRegistry.StreamConsumer<M>> set = this.registry.get(new UnsafeBuffer(directBuffer));
        if (set.isEmpty()) {
            return Optional.empty();
        }
        ImmutableStreamRegistry.StreamConsumer<M> pickStream = this.streamPicker.pickStream(set);
        M properties = pickStream.properties();
        ImmutableStreamRegistry.StreamId id = pickStream.id();
        RemoteStreamPusher.Transport transport = this::send;
        ActorControl actorControl = this.actor;
        Objects.requireNonNull(actorControl);
        return Optional.of(new RemoteStreamImpl(properties, new RemoteStreamPusher(id, transport, actorControl::run)));
    }

    private CompletableFuture<Void> send(PushStreamRequest pushStreamRequest, MemberId memberId) {
        return this.transport.send(StreamTopics.PUSH.topic(), pushStreamRequest, (v1) -> {
            return serialize(v1);
        }, Function.identity(), memberId, REQUEST_TIMEOUT).thenApply(bArr -> {
            return null;
        });
    }

    private byte[] serialize(BufferWriter bufferWriter) {
        byte[] bArr = new byte[bufferWriter.getLength()];
        UnsafeBuffer unsafeBuffer = new UnsafeBuffer();
        unsafeBuffer.wrap(bArr);
        bufferWriter.write(unsafeBuffer, 0);
        return bArr;
    }
}
