package io.camunda.zeebe.transport.stream.impl;

import io.camunda.zeebe.transport.stream.api.RemoteStream;
import io.camunda.zeebe.transport.stream.api.RemoteStreamErrorHandler;
import io.camunda.zeebe.transport.stream.api.StreamExhaustedException;
import io.camunda.zeebe.transport.stream.impl.AggregatedRemoteStream;
import io.camunda.zeebe.util.buffer.BufferWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/camunda/zeebe/transport/stream/impl/RemoteStreamImpl.class */
public final class RemoteStreamImpl<M, P extends BufferWriter> implements RemoteStream<M, P> {
    private static final Logger LOGGER = LoggerFactory.getLogger(RemoteStreamImpl.class);
    private final AggregatedRemoteStream<M> stream;
    private final RemoteStreamPusher<P> streamer;
    private final RemoteStreamErrorHandler<P> errorHandler;

    /* loaded from: input_file:io/camunda/zeebe/transport/stream/impl/RemoteStreamImpl$RetryHandler.class */
    private final class RetryHandler implements RemoteStreamErrorHandler<P> {
        private final RemoteStreamErrorHandler<P> errorHandler;
        private final AggregatedRemoteStream.StreamConsumer<M> initialConsumer;

        private RetryHandler(RemoteStreamErrorHandler<P> remoteStreamErrorHandler, AggregatedRemoteStream.StreamConsumer<M> streamConsumer) {
            this.errorHandler = remoteStreamErrorHandler;
            this.initialConsumer = streamConsumer;
        }

        @Override // io.camunda.zeebe.transport.stream.api.RemoteStreamErrorHandler
        public void handleError(Throwable th, P p) {
            ArrayList arrayList = new ArrayList(RemoteStreamImpl.this.stream.streamConsumers());
            if (arrayList.isEmpty()) {
                onConsumersExhausted(th, p);
                return;
            }
            arrayList.remove(this.initialConsumer);
            Collections.shuffle(arrayList);
            retry(th, p, arrayList.iterator());
        }

        private void retry(Throwable th, P p, Iterator<AggregatedRemoteStream.StreamConsumer<M>> it) {
            if (!it.hasNext()) {
                onConsumersExhausted(th, p);
                return;
            }
            AggregatedRemoteStream.StreamConsumer<M> next = it.next();
            RemoteStreamImpl.LOGGER.debug("Failed to push payload {}, retrying with next stream", p);
            RemoteStreamImpl.this.streamer.pushAsync(p, (th2, bufferWriter) -> {
                retry(th2, bufferWriter, it);
            }, next.id());
        }

        private void onConsumersExhausted(Throwable th, P p) {
            RemoteStreamImpl.LOGGER.debug("Failed to push payload {}, no more streams to retry", p);
            this.errorHandler.handleError(th, p);
        }
    }

    public RemoteStreamImpl(AggregatedRemoteStream<M> aggregatedRemoteStream, RemoteStreamPusher<P> remoteStreamPusher, RemoteStreamErrorHandler<P> remoteStreamErrorHandler) {
        this.stream = aggregatedRemoteStream;
        this.streamer = remoteStreamPusher;
        this.errorHandler = remoteStreamErrorHandler;
    }

    @Override // io.camunda.zeebe.transport.stream.api.RemoteStream
    public M metadata() {
        return this.stream.logicalId().metadata();
    }

    @Override // io.camunda.zeebe.transport.stream.api.RemoteStream
    public void push(P p) {
        AggregatedRemoteStream.StreamConsumer<M> pickInitialConsumer = pickInitialConsumer();
        if (pickInitialConsumer == null) {
            this.errorHandler.handleError(new StreamExhaustedException("Failed to push to stream %s, all consumers were removed since it was picked".formatted(this.stream.logicalId())), p);
        } else {
            this.streamer.pushAsync(p, new RetryHandler(this.errorHandler, pickInitialConsumer), pickInitialConsumer.id());
        }
    }

    private AggregatedRemoteStream.StreamConsumer<M> pickInitialConsumer() {
        List<AggregatedRemoteStream.StreamConsumer<M>> streamConsumers = this.stream.streamConsumers();
        int size = streamConsumers.size();
        while (true) {
            int i = size;
            if (i <= 0) {
                return null;
            }
            try {
                return streamConsumers.get(ThreadLocalRandom.current().nextInt(i));
            } catch (IndexOutOfBoundsException e) {
                LOGGER.trace("Stream consumer list concurrently modified while picking consumer; retrying", e);
                size = streamConsumers.size();
            }
        }
    }
}
