/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.client.impl.worker;

import io.camunda.zeebe.client.api.JsonMapper;
import io.camunda.zeebe.client.api.response.ActivatedJob;
import io.camunda.zeebe.client.impl.Loggers;
import io.camunda.zeebe.client.impl.response.ActivatedJobImpl;
import io.camunda.zeebe.gateway.protocol.GatewayGrpc;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.IntConsumer;
import java.util.function.Predicate;
import org.slf4j.Logger;

public final class JobPoller
implements StreamObserver<GatewayOuterClass.ActivateJobsResponse> {
    private static final Logger LOG = Loggers.JOB_POLLER_LOGGER;
    private final GatewayGrpc.GatewayStub gatewayStub;
    private final GatewayOuterClass.ActivateJobsRequest.Builder requestBuilder;
    private final JsonMapper jsonMapper;
    private final long requestTimeout;
    private final Predicate<Throwable> retryPredicate;
    private Consumer<ActivatedJob> jobConsumer;
    private IntConsumer doneCallback;
    private Consumer<Throwable> errorCallback;
    private int activatedJobs;
    private BooleanSupplier openSupplier;

    public JobPoller(GatewayGrpc.GatewayStub gatewayStub, GatewayOuterClass.ActivateJobsRequest.Builder requestBuilder, JsonMapper jsonMapper, Duration requestTimeout, Predicate<Throwable> retryPredicate) {
        this.gatewayStub = gatewayStub;
        this.requestBuilder = requestBuilder;
        this.jsonMapper = jsonMapper;
        this.requestTimeout = requestTimeout.toMillis();
        this.retryPredicate = retryPredicate;
    }

    private void reset() {
        this.activatedJobs = 0;
    }

    public void poll(int maxJobsToActivate, Consumer<ActivatedJob> jobConsumer, IntConsumer doneCallback, Consumer<Throwable> errorCallback, BooleanSupplier openSupplier) {
        this.reset();
        this.requestBuilder.setMaxJobsToActivate(maxJobsToActivate);
        this.jobConsumer = jobConsumer;
        this.doneCallback = doneCallback;
        this.errorCallback = errorCallback;
        this.openSupplier = openSupplier;
        this.poll();
    }

    private void poll() {
        LOG.trace("Polling at max {} jobs for worker {} and job type {}", new Object[]{this.requestBuilder.getMaxJobsToActivate(), this.requestBuilder.getWorker(), this.requestBuilder.getType()});
        ((GatewayGrpc.GatewayStub)this.gatewayStub.withDeadlineAfter(this.requestTimeout, TimeUnit.MILLISECONDS)).activateJobs(this.requestBuilder.build(), (StreamObserver)this);
    }

    public void onNext(GatewayOuterClass.ActivateJobsResponse activateJobsResponse) {
        this.activatedJobs += activateJobsResponse.getJobsCount();
        activateJobsResponse.getJobsList().stream().map(job -> new ActivatedJobImpl(this.jsonMapper, (GatewayOuterClass.ActivatedJob)job)).forEach(this.jobConsumer);
    }

    public void onError(Throwable throwable) {
        if (this.retryPredicate.test(throwable)) {
            this.poll();
        } else if (this.openSupplier.getAsBoolean()) {
            try {
                this.logFailure(throwable);
            }
            finally {
                this.errorCallback.accept(throwable);
            }
        }
    }

    public void onCompleted() {
        this.pollingDone();
    }

    private void logFailure(Throwable throwable) {
        StatusRuntimeException statusRuntimeException;
        String errorMsg = "Failed to activate jobs for worker {} and job type {}";
        if (throwable instanceof StatusRuntimeException && (statusRuntimeException = (StatusRuntimeException)throwable).getStatus().getCode() == Status.RESOURCE_EXHAUSTED.getCode()) {
            LOG.trace("Failed to activate jobs for worker {} and job type {}", new Object[]{this.requestBuilder.getWorker(), this.requestBuilder.getType(), throwable});
            return;
        }
        LOG.warn("Failed to activate jobs for worker {} and job type {}", new Object[]{this.requestBuilder.getWorker(), this.requestBuilder.getType(), throwable});
    }

    private void pollingDone() {
        if (this.activatedJobs > 0) {
            LOG.debug("Activated {} jobs for worker {} and job type {}", new Object[]{this.activatedJobs, this.requestBuilder.getWorker(), this.requestBuilder.getType()});
        } else {
            LOG.trace("No jobs activated for worker {} and job type {}", (Object)this.requestBuilder.getWorker(), (Object)this.requestBuilder.getType());
        }
        this.doneCallback.accept(this.activatedJobs);
    }
}

