package io.zeebe.broker.job;

import io.zeebe.broker.logstreams.processor.KeyGenerator;
import io.zeebe.broker.logstreams.processor.TypedRecord;
import io.zeebe.broker.logstreams.processor.TypedRecordProcessor;
import io.zeebe.broker.logstreams.processor.TypedResponseWriter;
import io.zeebe.broker.logstreams.processor.TypedStreamWriter;
import io.zeebe.broker.workflow.state.VariablesState;
import io.zeebe.msgpack.value.DocumentValue;
import io.zeebe.msgpack.value.LongValue;
import io.zeebe.msgpack.value.ValueArray;
import io.zeebe.protocol.clientapi.RejectionType;
import io.zeebe.protocol.impl.record.value.job.JobBatchRecord;
import io.zeebe.protocol.impl.record.value.job.JobRecord;
import io.zeebe.protocol.intent.JobBatchIntent;
import io.zeebe.protocol.intent.JobIntent;
import io.zeebe.util.sched.clock.ActorClock;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
import org.agrona.DirectBuffer;
import org.agrona.ExpandableArrayBuffer;
import org.agrona.collections.ObjectHashSet;
import org.agrona.concurrent.UnsafeBuffer;

/* loaded from: input_file:io/zeebe/broker/job/JobBatchActivateProcessor.class */
public class JobBatchActivateProcessor implements TypedRecordProcessor<JobBatchRecord> {
    private final JobState jobState;
    private final VariablesState variablesState;
    private final KeyGenerator keyGenerator;
    private final ObjectHashSet<DirectBuffer> variableNames = new ObjectHashSet<>();

    public JobBatchActivateProcessor(JobState jobState, VariablesState variablesState, KeyGenerator keyGenerator) {
        this.jobState = jobState;
        this.variablesState = variablesState;
        this.keyGenerator = keyGenerator;
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
    public void processRecord(TypedRecord<JobBatchRecord> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter) {
        if (isValid(typedRecord.getValue())) {
            activateJobs(typedRecord, typedResponseWriter, typedStreamWriter);
        } else {
            rejectCommand(typedRecord, typedResponseWriter, typedStreamWriter);
        }
    }

    private boolean isValid(JobBatchRecord jobBatchRecord) {
        return jobBatchRecord.getMaxJobsToActivate() > 0 && jobBatchRecord.getTimeout() > 0 && jobBatchRecord.getType().capacity() > 0 && jobBatchRecord.getWorker().capacity() > 0;
    }

    private void activateJobs(TypedRecord<JobBatchRecord> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter) {
        JobBatchRecord value = typedRecord.getValue();
        long nextKey = this.keyGenerator.nextKey();
        collectJobsToActivate(typedRecord, new AtomicInteger(value.getMaxJobsToActivate()));
        activateJobs(typedStreamWriter, value);
        typedStreamWriter.appendFollowUpEvent(nextKey, JobBatchIntent.ACTIVATED, value);
        typedResponseWriter.writeEventOnCommand(nextKey, JobBatchIntent.ACTIVATED, value, typedRecord);
    }

    private void collectJobsToActivate(TypedRecord<JobBatchRecord> typedRecord, AtomicInteger atomicInteger) {
        JobBatchRecord value = typedRecord.getValue();
        ValueArray jobs = value.jobs();
        ValueArray jobKeys = value.jobKeys();
        this.variableNames.clear();
        value.variables().forEach(stringValue -> {
            UnsafeBuffer unsafeBuffer = new UnsafeBuffer(new byte[stringValue.getValue().capacity()]);
            unsafeBuffer.putBytes(0, stringValue.getValue(), 0, stringValue.getValue().capacity());
            this.variableNames.add(unsafeBuffer);
        });
        this.jobState.forEachActivatableJobs(value.getType(), (l, jobRecord) -> {
            int i = atomicInteger.get();
            jobRecord.setDeadline(ActorClock.currentTimeMillis() + value.getTimeout()).setWorker(value.getWorker());
            long elementInstanceKey = jobRecord.getHeaders().getElementInstanceKey();
            if (elementInstanceKey >= 0) {
                jobRecord.setVariables(collectVariables(this.variableNames, elementInstanceKey));
            } else {
                jobRecord.setVariables(DocumentValue.EMPTY_DOCUMENT);
            }
            if (i < 0 || value.getLength() + 8 + jobRecord.getLength() > typedRecord.getMaxValueLength()) {
                value.setTruncated(true);
                return false;
            }
            int decrementAndGet = atomicInteger.decrementAndGet();
            ((LongValue) jobKeys.add()).setValue(l.longValue());
            JobRecord jobRecord = (JobRecord) jobs.add();
            ExpandableArrayBuffer expandableArrayBuffer = new ExpandableArrayBuffer(jobRecord.getLength());
            jobRecord.write(expandableArrayBuffer, 0);
            jobRecord.wrap(expandableArrayBuffer);
            return Boolean.valueOf(decrementAndGet > 0);
        });
    }

    private void activateJobs(TypedStreamWriter typedStreamWriter, JobBatchRecord jobBatchRecord) {
        Iterator it = jobBatchRecord.jobs().iterator();
        Iterator it2 = jobBatchRecord.jobKeys().iterator();
        while (it.hasNext() && it2.hasNext()) {
            JobRecord jobRecord = (JobRecord) it.next();
            long value = ((LongValue) it2.next()).getValue();
            ExpandableArrayBuffer expandableArrayBuffer = new ExpandableArrayBuffer();
            jobRecord.write(expandableArrayBuffer, 0);
            JobRecord jobRecord2 = new JobRecord();
            jobRecord2.wrap(expandableArrayBuffer, 0, jobRecord.getLength());
            typedStreamWriter.appendFollowUpEvent(value, JobIntent.ACTIVATED, jobRecord2);
            this.jobState.activate(value, jobRecord2);
        }
    }

    private DirectBuffer collectVariables(Collection<DirectBuffer> collection, long j) {
        return collection.isEmpty() ? this.variablesState.getVariablesAsDocument(j) : this.variablesState.getVariablesAsDocument(j, collection);
    }

    private void rejectCommand(TypedRecord<JobBatchRecord> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter) {
        RejectionType rejectionType;
        String format;
        JobBatchRecord value = typedRecord.getValue();
        if (value.getMaxJobsToActivate() < 1) {
            rejectionType = RejectionType.INVALID_ARGUMENT;
            format = String.format("Expected to activate job batch with %s to be %s, but it was %s", "max jobs to activate", "greater than zero", String.format("'%d'", Integer.valueOf(value.getMaxJobsToActivate())));
        } else if (value.getTimeout() < 1) {
            rejectionType = RejectionType.INVALID_ARGUMENT;
            format = String.format("Expected to activate job batch with %s to be %s, but it was %s", "timeout", "greater than zero", String.format("'%d'", Long.valueOf(value.getTimeout())));
        } else if (value.getType().capacity() < 1) {
            rejectionType = RejectionType.INVALID_ARGUMENT;
            format = String.format("Expected to activate job batch with %s to be %s, but it was %s", "type", "present", "blank");
        } else {
            if (value.getWorker().capacity() >= 1) {
                throw new IllegalStateException("Expected to reject an invalid activate job batch command, but it appears to be valid");
            }
            rejectionType = RejectionType.INVALID_ARGUMENT;
            format = String.format("Expected to activate job batch with %s to be %s, but it was %s", "worker", "present", "blank");
        }
        typedStreamWriter.appendRejection(typedRecord, rejectionType, format);
        typedResponseWriter.writeRejectionOnCommand((TypedRecord<?>) typedRecord, rejectionType, format);
    }
}
