package io.camunda.zeebe.gateway.impl.job;

import com.google.rpc.Status;
import io.camunda.zeebe.broker.client.api.BrokerClient;
import io.camunda.zeebe.broker.client.api.BrokerClusterState;
import io.camunda.zeebe.gateway.Loggers;
import io.camunda.zeebe.gateway.grpc.ServerStreamObserver;
import io.camunda.zeebe.gateway.impl.broker.request.BrokerActivateJobsRequest;
import io.camunda.zeebe.gateway.metrics.LongPollingMetrics;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass;
import io.camunda.zeebe.scheduler.ActorControl;
import io.camunda.zeebe.scheduler.clock.ActorClock;
import io.grpc.protobuf.StatusProto;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;

/* loaded from: input_file:io/camunda/zeebe/gateway/impl/job/LongPollingActivateJobsHandler.class */
public final class LongPollingActivateJobsHandler implements ActivateJobsHandler {
    private static final String JOBS_AVAILABLE_TOPIC = "jobsAvailable";
    private static final Logger LOG = Loggers.LONG_POLLING;
    private static final String ERROR_MSG_ACTIVATED_EXHAUSTED = "Expected to activate jobs of type '%s', but no jobs available and at least one broker returned 'RESOURCE_EXHAUSTED'. Please try again later.";
    private final RoundRobinActivateJobsHandler activateJobsHandler;
    private final BrokerClient brokerClient;
    private final Duration longPollingTimeout;
    private final long probeTimeoutMillis;
    private final int failedAttemptThreshold;
    private ActorControl actor;
    private final Map<String, InFlightLongPollingActivateJobsRequestsState> jobTypeState = new ConcurrentHashMap();
    private final LongPollingMetrics metrics = new LongPollingMetrics();

    /* loaded from: input_file:io/camunda/zeebe/gateway/impl/job/LongPollingActivateJobsHandler$Builder.class */
    public static class Builder {
        private BrokerClient brokerClient;
        private long maxMessageSize;
        private long longPollingTimeout = 10000;
        private long probeTimeoutMillis = 10000;
        private int minEmptyResponses = 3;

        public Builder setBrokerClient(BrokerClient brokerClient) {
            this.brokerClient = brokerClient;
            return this;
        }

        public Builder setMaxMessageSize(long j) {
            this.maxMessageSize = j;
            return this;
        }

        public Builder setLongPollingTimeout(long j) {
            this.longPollingTimeout = j;
            return this;
        }

        public Builder setProbeTimeoutMillis(long j) {
            this.probeTimeoutMillis = j;
            return this;
        }

        public Builder setMinEmptyResponses(int i) {
            this.minEmptyResponses = i;
            return this;
        }

        public LongPollingActivateJobsHandler build() {
            Objects.requireNonNull(this.brokerClient, "brokerClient");
            return new LongPollingActivateJobsHandler(this.brokerClient, this.maxMessageSize, this.longPollingTimeout, this.probeTimeoutMillis, this.minEmptyResponses);
        }
    }

    private LongPollingActivateJobsHandler(BrokerClient brokerClient, long j, long j2, long j3, int i) {
        this.brokerClient = brokerClient;
        this.activateJobsHandler = new RoundRobinActivateJobsHandler(brokerClient, j);
        this.longPollingTimeout = Duration.ofMillis(j2);
        this.probeTimeoutMillis = j3;
        this.failedAttemptThreshold = i;
    }

    @Override // java.util.function.Consumer
    public void accept(ActorControl actorControl) {
        this.actor = actorControl;
        this.activateJobsHandler.accept(actorControl);
        onActorStarted();
    }

    void onActorStarted() {
        this.actor.run(() -> {
            this.brokerClient.subscribeJobAvailableNotification(JOBS_AVAILABLE_TOPIC, this::onJobAvailableNotification);
            this.actor.runAtFixedRate(Duration.ofMillis(this.probeTimeoutMillis), this::probe);
        });
    }

    @Override // io.camunda.zeebe.gateway.impl.job.ActivateJobsHandler
    public void activateJobs(BrokerActivateJobsRequest brokerActivateJobsRequest, ServerStreamObserver<GatewayOuterClass.ActivateJobsResponse> serverStreamObserver, long j) {
        InflightActivateJobsRequest inflightActivateJobsRequest = new InflightActivateJobsRequest(ACTIVATE_JOBS_REQUEST_ID_GENERATOR.getAndIncrement(), brokerActivateJobsRequest, serverStreamObserver, j);
        String type = inflightActivateJobsRequest.getType();
        serverStreamObserver.setOnCancelHandler(() -> {
            onRequestCancel(type, inflightActivateJobsRequest);
        });
        this.actor.run(() -> {
            tryToActivateJobsOnAllPartitions(this.jobTypeState.computeIfAbsent(type, str -> {
                return new InFlightLongPollingActivateJobsRequestsState(str, this.metrics);
            }), inflightActivateJobsRequest);
        });
    }

    private void onRequestCancel(String str, InflightActivateJobsRequest inflightActivateJobsRequest) {
        this.actor.run(() -> {
            InFlightLongPollingActivateJobsRequestsState inFlightLongPollingActivateJobsRequestsState = this.jobTypeState.get(str);
            if (inFlightLongPollingActivateJobsRequestsState != null) {
                inFlightLongPollingActivateJobsRequestsState.removeRequest(inflightActivateJobsRequest);
            }
        });
    }

    private void tryToActivateJobsOnAllPartitions(InFlightLongPollingActivateJobsRequestsState inFlightLongPollingActivateJobsRequestsState, InflightActivateJobsRequest inflightActivateJobsRequest) {
        BrokerClusterState topology = this.brokerClient.getTopologyManager().getTopology();
        if (topology != null) {
            inFlightLongPollingActivateJobsRequestsState.addActiveRequest(inflightActivateJobsRequest);
            this.activateJobsHandler.activateJobs(topology.getPartitionsCount(), inflightActivateJobsRequest, th -> {
                this.actor.submit(() -> {
                    inflightActivateJobsRequest.onError(th);
                    inFlightLongPollingActivateJobsRequestsState.removeActiveRequest(inflightActivateJobsRequest);
                });
            }, (num, bool) -> {
                if (num.intValue() == inflightActivateJobsRequest.getMaxJobsToActivate()) {
                    handleNoReceivedJobsFromAllPartitions(inFlightLongPollingActivateJobsRequestsState, inflightActivateJobsRequest, bool);
                } else {
                    this.actor.submit(() -> {
                        inflightActivateJobsRequest.complete();
                        inFlightLongPollingActivateJobsRequestsState.removeActiveRequest(inflightActivateJobsRequest);
                        inFlightLongPollingActivateJobsRequestsState.resetFailedAttempts();
                        handlePendingRequests(inFlightLongPollingActivateJobsRequestsState, inflightActivateJobsRequest.getType());
                    });
                }
            });
        }
    }

    private void handleNoReceivedJobsFromAllPartitions(InFlightLongPollingActivateJobsRequestsState inFlightLongPollingActivateJobsRequestsState, InflightActivateJobsRequest inflightActivateJobsRequest, Boolean bool) {
        if (bool.booleanValue()) {
            this.actor.submit(() -> {
                inFlightLongPollingActivateJobsRequestsState.removeActiveRequest(inflightActivateJobsRequest);
                inflightActivateJobsRequest.onError(StatusProto.toStatusException(Status.newBuilder().setCode(8).setMessage(String.format(ERROR_MSG_ACTIVATED_EXHAUSTED, inflightActivateJobsRequest.getType())).build()));
            });
        } else {
            this.actor.submit(() -> {
                inFlightLongPollingActivateJobsRequestsState.incrementFailedAttempts(ActorClock.currentTimeMillis());
                boolean shouldBeRepeated = inFlightLongPollingActivateJobsRequestsState.shouldBeRepeated(inflightActivateJobsRequest);
                inFlightLongPollingActivateJobsRequestsState.removeActiveRequest(inflightActivateJobsRequest);
                completeOrResubmitRequest(inflightActivateJobsRequest, shouldBeRepeated);
            });
        }
    }

    private void completeOrResubmitRequest(InflightActivateJobsRequest inflightActivateJobsRequest, boolean z) {
        if (inflightActivateJobsRequest.isLongPollingDisabled()) {
            inflightActivateJobsRequest.complete();
            return;
        }
        if (inflightActivateJobsRequest.isTimedOut()) {
            return;
        }
        InFlightLongPollingActivateJobsRequestsState computeIfAbsent = this.jobTypeState.computeIfAbsent(inflightActivateJobsRequest.getType(), str -> {
            return new InFlightLongPollingActivateJobsRequestsState(str, this.metrics);
        });
        if (!inflightActivateJobsRequest.hasScheduledTimer()) {
            scheduleLongPollingTimeout(computeIfAbsent, inflightActivateJobsRequest);
        }
        if (z) {
            internalActivateJobsRetry(inflightActivateJobsRequest);
        } else {
            markRequestAsPending(computeIfAbsent, inflightActivateJobsRequest);
        }
    }

    void internalActivateJobsRetry(InflightActivateJobsRequest inflightActivateJobsRequest) {
        this.actor.run(() -> {
            InFlightLongPollingActivateJobsRequestsState computeIfAbsent = this.jobTypeState.computeIfAbsent(inflightActivateJobsRequest.getType(), str -> {
                return new InFlightLongPollingActivateJobsRequestsState(str, this.metrics);
            });
            if (computeIfAbsent.shouldAttempt(this.failedAttemptThreshold)) {
                tryToActivateJobsOnAllPartitions(computeIfAbsent, inflightActivateJobsRequest);
            } else {
                completeOrResubmitRequest(inflightActivateJobsRequest, false);
            }
        });
    }

    private void onJobAvailableNotification(String str) {
        LOG.trace("Received jobs available notification for type {}.", str);
        InFlightLongPollingActivateJobsRequestsState inFlightLongPollingActivateJobsRequestsState = this.jobTypeState.get(str);
        if (inFlightLongPollingActivateJobsRequestsState == null || !inFlightLongPollingActivateJobsRequestsState.shouldNotifyAndStartNotification()) {
            LOG.trace("Ignore jobs available notification for type {}.", str);
        } else {
            LOG.trace("Handle jobs available notification for type {}.", str);
            this.actor.run(() -> {
                inFlightLongPollingActivateJobsRequestsState.resetFailedAttempts();
                handlePendingRequests(inFlightLongPollingActivateJobsRequestsState, str);
                inFlightLongPollingActivateJobsRequestsState.completeNotification();
            });
        }
    }

    private void handlePendingRequests(InFlightLongPollingActivateJobsRequestsState inFlightLongPollingActivateJobsRequestsState, String str) {
        Queue<InflightActivateJobsRequest> pendingRequests = inFlightLongPollingActivateJobsRequestsState.getPendingRequests();
        if (!pendingRequests.isEmpty()) {
            pendingRequests.forEach(inflightActivateJobsRequest -> {
                LOG.trace("Unblocking ActivateJobsRequest {}", inflightActivateJobsRequest.getRequest());
                internalActivateJobsRetry(inflightActivateJobsRequest);
            });
        } else {
            if (inFlightLongPollingActivateJobsRequestsState.hasActiveRequests()) {
                return;
            }
            this.jobTypeState.remove(str);
        }
    }

    private void markRequestAsPending(InFlightLongPollingActivateJobsRequestsState inFlightLongPollingActivateJobsRequestsState, InflightActivateJobsRequest inflightActivateJobsRequest) {
        LOG.trace("Worker '{}' asked for '{}' jobs of type '{}', but none are available. This request will be kept open until a new job of this type is created or until timeout of '{}'.", new Object[]{inflightActivateJobsRequest.getWorker(), Integer.valueOf(inflightActivateJobsRequest.getMaxJobsToActivate()), inflightActivateJobsRequest.getType(), inflightActivateJobsRequest.getLongPollingTimeout(this.longPollingTimeout)});
        inFlightLongPollingActivateJobsRequestsState.enqueueRequest(inflightActivateJobsRequest);
    }

    private void scheduleLongPollingTimeout(InFlightLongPollingActivateJobsRequestsState inFlightLongPollingActivateJobsRequestsState, InflightActivateJobsRequest inflightActivateJobsRequest) {
        inflightActivateJobsRequest.setScheduledTimer(this.actor.schedule(inflightActivateJobsRequest.getLongPollingTimeout(this.longPollingTimeout), () -> {
            inflightActivateJobsRequest.timeout();
            inFlightLongPollingActivateJobsRequestsState.removeRequest(inflightActivateJobsRequest);
        }));
    }

    private void probe() {
        long currentTimeMillis = ActorClock.currentTimeMillis();
        this.jobTypeState.forEach((str, inFlightLongPollingActivateJobsRequestsState) -> {
            if (inFlightLongPollingActivateJobsRequestsState.getLastUpdatedTime() < currentTimeMillis - this.probeTimeoutMillis) {
                InflightActivateJobsRequest nextPendingRequest = inFlightLongPollingActivateJobsRequestsState.getNextPendingRequest();
                if (nextPendingRequest != null) {
                    tryToActivateJobsOnAllPartitions(inFlightLongPollingActivateJobsRequestsState, nextPendingRequest);
                } else if (inFlightLongPollingActivateJobsRequestsState.getFailedAttempts() >= this.failedAttemptThreshold) {
                    inFlightLongPollingActivateJobsRequestsState.setFailedAttempts(this.failedAttemptThreshold - 1);
                }
            }
        });
    }

    public static Builder newBuilder() {
        return new Builder();
    }
}
