package io.zeebe.broker.job;

import io.zeebe.broker.logstreams.processor.TypedRecord;
import io.zeebe.broker.logstreams.processor.TypedRecordProcessor;
import io.zeebe.broker.logstreams.processor.TypedResponseWriter;
import io.zeebe.broker.logstreams.processor.TypedStreamWriter;
import io.zeebe.msgpack.value.LongValue;
import io.zeebe.protocol.clientapi.RejectionType;
import io.zeebe.protocol.impl.record.value.job.JobBatchRecord;
import io.zeebe.protocol.impl.record.value.job.JobRecord;
import io.zeebe.protocol.intent.JobBatchIntent;
import io.zeebe.protocol.intent.JobIntent;
import io.zeebe.util.sched.clock.ActorClock;
import java.util.concurrent.atomic.AtomicInteger;
import org.agrona.ExpandableArrayBuffer;

/* loaded from: input_file:io/zeebe/broker/job/JobBatchActivateProcessor.class */
public class JobBatchActivateProcessor implements TypedRecordProcessor<JobBatchRecord> {
    private final JobState state;

    public JobBatchActivateProcessor(JobState jobState) {
        this.state = jobState;
    }

    @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
    public void processRecord(TypedRecord<JobBatchRecord> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter) {
        if (isValid(typedRecord.getValue())) {
            activateJobs(typedRecord, typedResponseWriter, typedStreamWriter);
        } else {
            rejectCommand(typedRecord, typedResponseWriter, typedStreamWriter);
        }
    }

    private boolean isValid(JobBatchRecord jobBatchRecord) {
        return jobBatchRecord.getAmount() > 0 && jobBatchRecord.getTimeout() > 0 && jobBatchRecord.getType().capacity() > 0 && jobBatchRecord.getWorker().capacity() > 0;
    }

    private void activateJobs(TypedRecord<JobBatchRecord> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter) {
        JobBatchRecord value = typedRecord.getValue();
        long nextKey = typedStreamWriter.getKeyGenerator().nextKey();
        AtomicInteger atomicInteger = new AtomicInteger(value.getAmount());
        this.state.forEachActivatableJobs(value.getType(), (j, jobRecord, iteratorControl) -> {
            int decrementAndGet = atomicInteger.decrementAndGet();
            if (decrementAndGet >= 0) {
                long currentTimeMillis = ActorClock.currentTimeMillis() + value.getTimeout();
                ((LongValue) value.jobKeys().add()).setValue(j);
                JobRecord jobRecord = (JobRecord) value.jobs().add();
                ExpandableArrayBuffer expandableArrayBuffer = new ExpandableArrayBuffer(jobRecord.getLength());
                jobRecord.write(expandableArrayBuffer, 0);
                jobRecord.wrap(expandableArrayBuffer);
                jobRecord.setDeadline(currentTimeMillis).setWorker(value.getWorker());
                this.state.activate(j, jobRecord);
                typedStreamWriter.appendFollowUpEvent(j, JobIntent.ACTIVATED, jobRecord);
            }
            if (decrementAndGet < 1) {
                iteratorControl.stop();
            }
        });
        typedStreamWriter.appendFollowUpEvent(nextKey, JobBatchIntent.ACTIVATED, value);
        typedResponseWriter.writeEventOnCommand(nextKey, JobBatchIntent.ACTIVATED, value, typedRecord);
    }

    private void rejectCommand(TypedRecord<JobBatchRecord> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter) {
        RejectionType rejectionType;
        String str;
        JobBatchRecord value = typedRecord.getValue();
        if (value.getAmount() < 1) {
            rejectionType = RejectionType.BAD_VALUE;
            str = "Job batch amount must be greater than zero, got " + value.getAmount();
        } else if (value.getTimeout() < 1) {
            rejectionType = RejectionType.BAD_VALUE;
            str = "Job batch timeout must be greater than zero, got " + value.getTimeout();
        } else if (value.getType().capacity() < 1) {
            rejectionType = RejectionType.BAD_VALUE;
            str = "Job batch type must not be empty";
        } else {
            if (value.getWorker().capacity() >= 1) {
                throw new IllegalStateException("Job batch command is valid and should not be rejected");
            }
            rejectionType = RejectionType.BAD_VALUE;
            str = "Job batch worker must not be empty";
        }
        typedStreamWriter.appendRejection(typedRecord, rejectionType, str);
        typedResponseWriter.writeRejectionOnCommand((TypedRecord<?>) typedRecord, rejectionType, str);
    }
}
