package io.camunda.zeebe.gateway.impl.stream;

import io.camunda.zeebe.gateway.ResponseMapper;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass;
import io.camunda.zeebe.protocol.impl.stream.job.ActivatedJobImpl;
import io.camunda.zeebe.protocol.impl.stream.job.JobActivationProperties;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.transport.stream.api.ClientStreamConsumer;
import io.camunda.zeebe.transport.stream.api.ClientStreamId;
import io.camunda.zeebe.transport.stream.api.ClientStreamer;
import io.camunda.zeebe.util.LockUtil;
import io.camunda.zeebe.util.buffer.BufferUtil;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import net.jcip.annotations.GuardedBy;
import org.agrona.DirectBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/camunda/zeebe/gateway/impl/stream/StreamJobsHandler.class */
public class StreamJobsHandler extends Actor {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamJobsHandler.class);
    private final ClientStreamer<JobActivationProperties> jobStreamer;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/camunda/zeebe/gateway/impl/stream/StreamJobsHandler$AsyncJobStreamRemover.class */
    public static final class AsyncJobStreamRemover implements Runnable {
        private final Lock lock = new ReentrantLock();
        private final ClientStreamer<JobActivationProperties> jobStreamer;

        @GuardedBy("lock")
        private boolean isRemoved;

        @GuardedBy("lock")
        private ClientStreamId streamId;

        AsyncJobStreamRemover(ClientStreamer<JobActivationProperties> clientStreamer) {
            this.jobStreamer = clientStreamer;
        }

        @Override // java.lang.Runnable
        public void run() {
            LockUtil.withLock(this.lock, this::lockedRemove);
        }

        void setStreamId(ClientStreamId clientStreamId) {
            LockUtil.withLock(this.lock, () -> {
                lockedSetStreamId(clientStreamId);
            });
        }

        @GuardedBy("lock")
        private void lockedRemove() {
            this.isRemoved = true;
            if (this.streamId != null) {
                this.jobStreamer.remove(this.streamId);
            }
        }

        @GuardedBy("lock")
        private void lockedSetStreamId(ClientStreamId clientStreamId) {
            if (this.isRemoved) {
                this.jobStreamer.remove(clientStreamId);
            } else {
                this.streamId = clientStreamId;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/camunda/zeebe/gateway/impl/stream/StreamJobsHandler$JobStreamConsumer.class */
    public static final class JobStreamConsumer implements ClientStreamConsumer {
        private final StreamObserver<GatewayOuterClass.ActivatedJob> responseObserver;
        private final ConcurrencyControl executor;

        JobStreamConsumer(StreamObserver<GatewayOuterClass.ActivatedJob> streamObserver, ConcurrencyControl concurrencyControl) {
            this.responseObserver = streamObserver;
            this.executor = concurrencyControl;
        }

        public ActorFuture<Void> push(DirectBuffer directBuffer) {
            try {
                return this.executor.call(() -> {
                    handlePushedJob(directBuffer);
                    return null;
                });
            } catch (Exception e) {
                this.responseObserver.onError(e);
                return CompletableActorFuture.completedExceptionally(e);
            }
        }

        private void handlePushedJob(DirectBuffer directBuffer) {
            ActivatedJobImpl activatedJobImpl = new ActivatedJobImpl();
            activatedJobImpl.wrap(directBuffer);
            try {
                this.responseObserver.onNext(ResponseMapper.toActivatedJob(activatedJobImpl));
            } catch (Exception e) {
                this.responseObserver.onError(e);
                throw e;
            }
        }
    }

    public StreamJobsHandler(ClientStreamer<JobActivationProperties> clientStreamer) {
        this.jobStreamer = clientStreamer;
    }

    public void handle(String str, JobActivationProperties jobActivationProperties, ServerCallStreamObserver<GatewayOuterClass.ActivatedJob> serverCallStreamObserver) {
        if (str.isBlank()) {
            handleError(serverCallStreamObserver, "type", "present", "blank");
        } else if (jobActivationProperties.timeout() < 1) {
            handleError(serverCallStreamObserver, "timeout", "greater than zero", Long.toString(jobActivationProperties.timeout()));
        } else {
            handleInternal(str, jobActivationProperties, serverCallStreamObserver);
        }
    }

    private void handleInternal(String str, JobActivationProperties jobActivationProperties, ServerCallStreamObserver<GatewayOuterClass.ActivatedJob> serverCallStreamObserver) {
        DirectBuffer wrapString = BufferUtil.wrapString(str);
        JobStreamConsumer jobStreamConsumer = new JobStreamConsumer(serverCallStreamObserver, this.actor);
        AsyncJobStreamRemover asyncJobStreamRemover = new AsyncJobStreamRemover(this.jobStreamer);
        serverCallStreamObserver.setOnCloseHandler(asyncJobStreamRemover);
        serverCallStreamObserver.setOnCancelHandler(asyncJobStreamRemover);
        this.actor.run(() -> {
            this.actor.runOnCompletion(this.jobStreamer.add(wrapString, jobActivationProperties, jobStreamConsumer), (clientStreamId, th) -> {
                onStreamAdded(serverCallStreamObserver, asyncJobStreamRemover, clientStreamId, th);
            });
        });
    }

    private void onStreamAdded(StreamObserver<GatewayOuterClass.ActivatedJob> streamObserver, AsyncJobStreamRemover asyncJobStreamRemover, ClientStreamId clientStreamId, Throwable th) {
        if (th == null) {
            asyncJobStreamRemover.setStreamId(clientStreamId);
        } else {
            LOGGER.warn("Failed to register new job stream", th);
            streamObserver.onError(Status.UNAVAILABLE.withDescription("Failed to register new job stream").withCause(th).augmentDescription("Cause: " + th.getMessage()).asRuntimeException());
        }
    }

    private void handleError(ServerCallStreamObserver<GatewayOuterClass.ActivatedJob> serverCallStreamObserver, String str, String str2, String str3) {
        serverCallStreamObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Expected to stream activated jobs with %s to be %s, but it was %s".formatted(str, str2, str3))));
    }
}
