package io.camunda.zeebe.transport.stream.impl;

import io.atomix.cluster.MemberId;
import io.camunda.zeebe.transport.stream.api.RemoteStreamErrorHandler;
import io.camunda.zeebe.transport.stream.api.RemoteStreamMetrics;
import io.camunda.zeebe.transport.stream.impl.AggregatedRemoteStream;
import io.camunda.zeebe.transport.stream.impl.messages.PushStreamRequest;
import io.camunda.zeebe.util.buffer.BufferWriter;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/camunda/zeebe/transport/stream/impl/RemoteStreamPusher.class */
final class RemoteStreamPusher<P extends BufferWriter> {
    private static final Logger LOG = LoggerFactory.getLogger(RemoteStreamPusher.class);
    private final RemoteStreamMetrics metrics;
    private final Transport transport;
    private final Executor executor;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/camunda/zeebe/transport/stream/impl/RemoteStreamPusher$Transport.class */
    public interface Transport {
        CompletableFuture<Void> send(PushStreamRequest pushStreamRequest, MemberId memberId) throws Exception;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RemoteStreamPusher(Transport transport, Executor executor, RemoteStreamMetrics remoteStreamMetrics) {
        this.metrics = (RemoteStreamMetrics) Objects.requireNonNull(remoteStreamMetrics, "must specify remote stream metrics");
        this.transport = (Transport) Objects.requireNonNull(transport, "must provide a network transport");
        this.executor = (Executor) Objects.requireNonNull(executor, "must provide an asynchronous executor");
    }

    public void pushAsync(P p, RemoteStreamErrorHandler<P> remoteStreamErrorHandler, AggregatedRemoteStream.StreamId streamId) {
        Objects.requireNonNull(p, "must specify a payload");
        Objects.requireNonNull(remoteStreamErrorHandler, "must specify a error handler");
        this.executor.execute(() -> {
            push(p, instrumentingErrorHandler(remoteStreamErrorHandler, streamId), streamId);
        });
    }

    private RemoteStreamErrorHandler<P> instrumentingErrorHandler(RemoteStreamErrorHandler<P> remoteStreamErrorHandler, AggregatedRemoteStream.StreamId streamId) {
        return (th, bufferWriter) -> {
            if (th != null) {
                this.metrics.pushFailed();
                LOG.debug("Failed to push {} to stream {}", new Object[]{bufferWriter, streamId, th});
                remoteStreamErrorHandler.handleError(th, bufferWriter);
            }
        };
    }

    private void push(P p, RemoteStreamErrorHandler<P> remoteStreamErrorHandler, AggregatedRemoteStream.StreamId streamId) {
        try {
            this.transport.send(new PushStreamRequest().streamId(streamId.streamId()).payload(p), streamId.receiver()).whenCompleteAsync((r8, th) -> {
                onPush(p, remoteStreamErrorHandler, th);
            }, this.executor);
            LOG.trace("Pushed {} to stream {}", p, streamId);
        } catch (Exception e) {
            remoteStreamErrorHandler.handleError(e, p);
        }
    }

    private void onPush(P p, RemoteStreamErrorHandler<P> remoteStreamErrorHandler, Throwable th) {
        if (th != null) {
            remoteStreamErrorHandler.handleError(th, p);
        } else {
            this.metrics.pushSucceeded();
        }
    }
}
