package io.camunda.zeebe.engine.processing.job;

import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecord;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedRejectionWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.KeyGenerator;
import io.camunda.zeebe.engine.state.immutable.JobState;
import io.camunda.zeebe.engine.state.immutable.VariableState;
import io.camunda.zeebe.engine.state.immutable.ZeebeState;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.msgpack.value.DocumentValue;
import io.camunda.zeebe.msgpack.value.LongValue;
import io.camunda.zeebe.msgpack.value.ValueArray;
import io.camunda.zeebe.protocol.impl.record.value.incident.IncidentRecord;
import io.camunda.zeebe.protocol.impl.record.value.job.JobBatchRecord;
import io.camunda.zeebe.protocol.impl.record.value.job.JobRecord;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.intent.IncidentIntent;
import io.camunda.zeebe.protocol.record.intent.JobBatchIntent;
import io.camunda.zeebe.protocol.record.value.ErrorType;
import io.camunda.zeebe.util.ByteValue;
import io.camunda.zeebe.util.buffer.BufferUtil;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import org.agrona.DirectBuffer;
import org.agrona.ExpandableArrayBuffer;
import org.agrona.collections.ObjectHashSet;
import org.agrona.concurrent.UnsafeBuffer;

/* loaded from: input_file:io/camunda/zeebe/engine/processing/job/JobBatchActivateProcessor.class */
public final class JobBatchActivateProcessor implements TypedRecordProcessor<JobBatchRecord> {
    private final StateWriter stateWriter;
    private final VariableState variableState;
    private final TypedRejectionWriter rejectionWriter;
    private final TypedResponseWriter responseWriter;
    private final JobState jobState;
    private final KeyGenerator keyGenerator;
    private final long maxRecordLength;
    private final long maxJobBatchLength;
    private final ObjectHashSet<DirectBuffer> variableNames = new ObjectHashSet<>();

    public JobBatchActivateProcessor(Writers writers, ZeebeState zeebeState, KeyGenerator keyGenerator, long j) {
        this.stateWriter = writers.state();
        this.rejectionWriter = writers.rejection();
        this.responseWriter = writers.response();
        this.jobState = zeebeState.getJobState();
        this.variableState = zeebeState.getVariableState();
        this.keyGenerator = keyGenerator;
        this.maxRecordLength = j;
        this.maxJobBatchLength = (j - 8) / 2;
    }

    @Override // io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor
    public void processRecord(TypedRecord<JobBatchRecord> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter) {
        if (isValid(typedRecord.mo23getValue())) {
            activateJobs(typedRecord);
        } else {
            rejectCommand(typedRecord);
        }
    }

    private boolean isValid(JobBatchRecord jobBatchRecord) {
        return jobBatchRecord.getMaxJobsToActivate() > 0 && jobBatchRecord.getTimeout() > 0 && jobBatchRecord.getTypeBuffer().capacity() > 0;
    }

    private void activateJobs(TypedRecord<JobBatchRecord> typedRecord) {
        UnpackedObject unpackedObject = (JobBatchRecord) typedRecord.mo23getValue();
        long nextKey = this.keyGenerator.nextKey();
        collectJobsToActivate(typedRecord, new AtomicInteger(unpackedObject.getMaxJobsToActivate()));
        this.stateWriter.appendFollowUpEvent(nextKey, JobBatchIntent.ACTIVATED, unpackedObject);
        this.responseWriter.writeEventOnCommand(nextKey, JobBatchIntent.ACTIVATED, unpackedObject, typedRecord);
    }

    private void collectJobsToActivate(TypedRecord<JobBatchRecord> typedRecord, AtomicInteger atomicInteger) {
        JobBatchRecord mo23getValue = typedRecord.mo23getValue();
        ValueArray jobs = mo23getValue.jobs();
        ValueArray jobKeys = mo23getValue.jobKeys();
        this.variableNames.clear();
        mo23getValue.variables().forEach(stringValue -> {
            UnsafeBuffer unsafeBuffer = new UnsafeBuffer(new byte[stringValue.getValue().capacity()]);
            unsafeBuffer.putBytes(0, stringValue.getValue(), 0, stringValue.getValue().capacity());
            this.variableNames.add(unsafeBuffer);
        });
        this.jobState.forEachActivatableJobs(mo23getValue.getTypeBuffer(), (l, jobRecord) -> {
            int i = atomicInteger.get();
            jobRecord.setDeadline(typedRecord.getTimestamp() + mo23getValue.getTimeout()).setWorker(mo23getValue.getWorkerBuffer());
            long elementInstanceKey = jobRecord.getElementInstanceKey();
            if (elementInstanceKey >= 0) {
                jobRecord.setVariables(collectVariables(this.variableNames, elementInstanceKey));
            } else {
                jobRecord.setVariables(DocumentValue.EMPTY_DOCUMENT);
            }
            if (i < 0 || typedRecord.getLength() + jobRecord.getLength() > this.maxJobBatchLength) {
                mo23getValue.setTruncated(true);
                if (mo23getValue.getJobs().isEmpty()) {
                    raiseIncidentJobTooLargeForMessageSize(l.longValue(), jobRecord);
                }
                return false;
            }
            int decrementAndGet = atomicInteger.decrementAndGet();
            ((LongValue) jobKeys.add()).setValue(l.longValue());
            JobRecord jobRecord = (JobRecord) jobs.add();
            ExpandableArrayBuffer expandableArrayBuffer = new ExpandableArrayBuffer(jobRecord.getLength());
            jobRecord.write(expandableArrayBuffer, 0);
            jobRecord.wrap(expandableArrayBuffer);
            return Boolean.valueOf(decrementAndGet > 0);
        });
    }

    private DirectBuffer collectVariables(Collection<DirectBuffer> collection, long j) {
        return collection.isEmpty() ? this.variableState.getVariablesAsDocument(j) : this.variableState.getVariablesAsDocument(j, collection);
    }

    private void rejectCommand(TypedRecord<JobBatchRecord> typedRecord) {
        RejectionType rejectionType;
        String format;
        JobBatchRecord mo23getValue = typedRecord.mo23getValue();
        if (mo23getValue.getMaxJobsToActivate() < 1) {
            rejectionType = RejectionType.INVALID_ARGUMENT;
            format = String.format("Expected to activate job batch with %s to be %s, but it was %s", "max jobs to activate", "greater than zero", String.format("'%d'", Integer.valueOf(mo23getValue.getMaxJobsToActivate())));
        } else if (mo23getValue.getTimeout() < 1) {
            rejectionType = RejectionType.INVALID_ARGUMENT;
            format = String.format("Expected to activate job batch with %s to be %s, but it was %s", "timeout", "greater than zero", String.format("'%d'", Long.valueOf(mo23getValue.getTimeout())));
        } else {
            if (mo23getValue.getTypeBuffer().capacity() >= 1) {
                throw new IllegalStateException("Expected to reject an invalid activate job batch command, but it appears to be valid");
            }
            rejectionType = RejectionType.INVALID_ARGUMENT;
            format = String.format("Expected to activate job batch with %s to be %s, but it was %s", "type", "present", "blank");
        }
        this.rejectionWriter.appendRejection(typedRecord, rejectionType, format);
        this.responseWriter.writeRejectionOnCommand(typedRecord, rejectionType, format);
    }

    private void raiseIncidentJobTooLargeForMessageSize(long j, JobRecord jobRecord) {
        this.stateWriter.appendFollowUpEvent(this.keyGenerator.nextKey(), IncidentIntent.CREATED, new IncidentRecord().setErrorType(ErrorType.MESSAGE_SIZE_EXCEEDED).setErrorMessage(BufferUtil.wrapString(String.format("The job with key '%s' can not be activated because it is larger than the configured message size (%s). Try to reduce the size by reducing the number of fetched variables or modifying the variable values.", Long.valueOf(j), ByteValue.prettyPrint(this.maxRecordLength)))).setBpmnProcessId(jobRecord.getBpmnProcessIdBuffer()).setProcessDefinitionKey(jobRecord.getProcessDefinitionKey()).setProcessInstanceKey(jobRecord.getProcessInstanceKey()).setElementId(jobRecord.getElementIdBuffer()).setElementInstanceKey(jobRecord.getElementInstanceKey()).setJobKey(j).setVariableScopeKey(jobRecord.getElementInstanceKey()));
    }
}
