package io.camunda.zeebe.transport.stream.impl;

import io.atomix.cluster.MemberId;
import io.camunda.zeebe.transport.stream.api.RemoteStreamErrorHandler;
import io.camunda.zeebe.transport.stream.api.RemoteStreamMetrics;
import io.camunda.zeebe.transport.stream.api.StreamResponseException;
import io.camunda.zeebe.transport.stream.impl.AggregatedRemoteStream;
import io.camunda.zeebe.transport.stream.impl.messages.PushStreamRequest;
import io.camunda.zeebe.transport.stream.impl.messages.PushStreamResponse;
import io.camunda.zeebe.transport.stream.impl.messages.StreamResponseDecoder;
import io.camunda.zeebe.util.buffer.BufferWriter;
import io.camunda.zeebe.util.logging.ThrottledLogger;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/camunda/zeebe/transport/stream/impl/RemoteStreamPusher.class */
public final class RemoteStreamPusher<P extends BufferWriter> {
    private static final Logger LOG = LoggerFactory.getLogger(RemoteStreamPusher.class);
    private final StreamResponseDecoder responseDecoder = new StreamResponseDecoder();
    private final ThrottledLogger pushErrorLogger = new ThrottledLogger(LOG, Duration.ofSeconds(5));
    private final ThrottledLogger pushWarnLogger = new ThrottledLogger(LOG, Duration.ofSeconds(5));
    private final RemoteStreamMetrics metrics;
    private final Transport transport;
    private final Executor executor;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/camunda/zeebe/transport/stream/impl/RemoteStreamPusher$Transport.class */
    public interface Transport {
        CompletableFuture<byte[]> send(PushStreamRequest pushStreamRequest, MemberId memberId) throws Exception;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RemoteStreamPusher(Transport transport, Executor executor, RemoteStreamMetrics remoteStreamMetrics) {
        this.metrics = (RemoteStreamMetrics) Objects.requireNonNull(remoteStreamMetrics, "must specify remote stream metrics");
        this.transport = (Transport) Objects.requireNonNull(transport, "must provide a network transport");
        this.executor = (Executor) Objects.requireNonNull(executor, "must provide an asynchronous executor");
    }

    public void pushAsync(P p, RemoteStreamErrorHandler<P> remoteStreamErrorHandler, AggregatedRemoteStream.StreamId streamId) {
        Objects.requireNonNull(remoteStreamErrorHandler, "must specify a error handler");
        try {
            Objects.requireNonNull(p, "must specify a payload");
            this.executor.execute(() -> {
                push(p, instrumentingErrorHandler(remoteStreamErrorHandler, streamId), streamId);
            });
        } catch (Exception e) {
            remoteStreamErrorHandler.handleError(e, p);
        }
    }

    private RemoteStreamErrorHandler<P> instrumentingErrorHandler(RemoteStreamErrorHandler<P> remoteStreamErrorHandler, AggregatedRemoteStream.StreamId streamId) {
        return (th, bufferWriter) -> {
            if (th == 0) {
                return;
            }
            if (th instanceof StreamResponseException) {
                StreamResponseException streamResponseException = (StreamResponseException) th;
                logResponseError(streamId, bufferWriter, streamResponseException);
                streamResponseException.details().forEach(errorDetail -> {
                    this.metrics.pushTryFailed(errorDetail.code());
                });
            } else {
                this.pushWarnLogger.warn("Failed to push (size = {}) to stream {}", new Object[]{Integer.valueOf(bufferWriter.getLength()), streamId, th});
            }
            this.metrics.pushFailed();
            remoteStreamErrorHandler.handleError(th, bufferWriter);
        };
    }

    private void logResponseError(AggregatedRemoteStream.StreamId streamId, P p, StreamResponseException streamResponseException) {
        switch (streamResponseException.code()) {
            case INVALID:
            case MALFORMED:
                this.pushErrorLogger.error("Failed to push (size = {}) to stream {}, request could not be parsed", new Object[]{Integer.valueOf(p.getLength()), streamId, streamResponseException});
                return;
            case EXHAUSTED:
                LOG.trace("Failed to push (size = {}) to stream {} after trying all clients", new Object[]{Integer.valueOf(p.getLength()), streamId, streamResponseException});
                return;
            default:
                this.pushWarnLogger.warn("Failed to push (size = {}) to stream {}", new Object[]{Integer.valueOf(p.getLength()), streamId, streamResponseException});
                return;
        }
    }

    private void push(P p, RemoteStreamErrorHandler<P> remoteStreamErrorHandler, AggregatedRemoteStream.StreamId streamId) {
        try {
            this.transport.send(new PushStreamRequest().streamId(streamId.streamId()).payload(p), streamId.receiver()).whenCompleteAsync((bArr, th) -> {
                onPush(p, remoteStreamErrorHandler, bArr, th);
            }, this.executor);
            LOG.trace("Pushed {} to stream {}", p, streamId);
        } catch (Exception e) {
            remoteStreamErrorHandler.handleError(e, p);
        }
    }

    private void onPush(P p, RemoteStreamErrorHandler<P> remoteStreamErrorHandler, byte[] bArr, Throwable th) {
        if (th != null) {
            remoteStreamErrorHandler.handleError(th, p);
        } else {
            this.responseDecoder.decode(bArr, new PushStreamResponse()).mapLeft((v0) -> {
                return v0.asException();
            }).ifRightOrLeft(pushStreamResponse -> {
                this.metrics.pushSucceeded();
            }, streamResponseException -> {
                remoteStreamErrorHandler.handleError(streamResponseException, p);
            });
        }
    }
}
