package io.zeebe.broker.job;

import io.zeebe.broker.job.JobStateController;
import io.zeebe.broker.logstreams.processor.CommandProcessor;
import io.zeebe.broker.logstreams.processor.KeyGenerator;
import io.zeebe.broker.logstreams.processor.StreamProcessorLifecycleAware;
import io.zeebe.broker.logstreams.processor.TypedBatchWriter;
import io.zeebe.broker.logstreams.processor.TypedCommandWriter;
import io.zeebe.broker.logstreams.processor.TypedRecord;
import io.zeebe.broker.logstreams.processor.TypedRecordProcessor;
import io.zeebe.broker.logstreams.processor.TypedResponseWriter;
import io.zeebe.broker.logstreams.processor.TypedStreamEnvironment;
import io.zeebe.broker.logstreams.processor.TypedStreamProcessor;
import io.zeebe.broker.logstreams.processor.TypedStreamWriter;
import io.zeebe.logstreams.state.StateSnapshotController;
import io.zeebe.logstreams.state.StateStorage;
import io.zeebe.msgpack.value.LongValue;
import io.zeebe.protocol.clientapi.RejectionType;
import io.zeebe.protocol.clientapi.ValueType;
import io.zeebe.protocol.impl.record.value.job.JobBatchRecord;
import io.zeebe.protocol.impl.record.value.job.JobRecord;
import io.zeebe.protocol.intent.Intent;
import io.zeebe.protocol.intent.JobBatchIntent;
import io.zeebe.protocol.intent.JobIntent;
import io.zeebe.util.sched.ScheduledTimer;
import io.zeebe.util.sched.clock.ActorClock;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import org.agrona.ExpandableArrayBuffer;

/* loaded from: input_file:io/zeebe/broker/job/JobStreamProcessor.class */
public class JobStreamProcessor implements StreamProcessorLifecycleAware {
    public static final Duration TIME_OUT_POLLING_INTERVAL = Duration.ofSeconds(30);
    private final JobStateController state = new JobStateController();
    private int partitionId;
    private ScheduledTimer timer;
    private TypedCommandWriter writer;
    private KeyGenerator jobKeyGenerator;

    /* loaded from: input_file:io/zeebe/broker/job/JobStreamProcessor$CancelProcessor.class */
    private class CancelProcessor implements CommandProcessor<JobRecord> {
        private CancelProcessor() {
        }

        @Override // io.zeebe.broker.logstreams.processor.CommandProcessor
        public void onCommand(TypedRecord<JobRecord> typedRecord, CommandProcessor.CommandControl<JobRecord> commandControl) {
            long key = typedRecord.getKey();
            JobRecord job = JobStreamProcessor.this.state.getJob(key);
            if (job == null) {
                commandControl.reject(RejectionType.NOT_APPLICABLE, "Job does not exist");
            } else {
                JobStreamProcessor.this.state.delete(key, job);
                commandControl.accept(JobIntent.CANCELED, job);
            }
        }
    }

    /* loaded from: input_file:io/zeebe/broker/job/JobStreamProcessor$CompleteProcessor.class */
    private class CompleteProcessor implements CommandProcessor<JobRecord> {
        private CompleteProcessor() {
        }

        @Override // io.zeebe.broker.logstreams.processor.CommandProcessor
        public void onCommand(TypedRecord<JobRecord> typedRecord, CommandProcessor.CommandControl<JobRecord> commandControl) {
            long key = typedRecord.getKey();
            if (!JobStreamProcessor.this.state.exists(key)) {
                commandControl.reject(RejectionType.NOT_APPLICABLE, "Job does not exist");
                return;
            }
            if (JobStreamProcessor.this.state.isInState(key, JobStateController.State.FAILED)) {
                commandControl.reject(RejectionType.NOT_APPLICABLE, "Job is failed and must be resolved first");
                return;
            }
            JobRecord job = JobStreamProcessor.this.state.getJob(key);
            job.setPayload(typedRecord.getValue().getPayload());
            JobStreamProcessor.this.state.delete(key, job);
            commandControl.accept(JobIntent.COMPLETED, job);
        }
    }

    /* loaded from: input_file:io/zeebe/broker/job/JobStreamProcessor$CreateProcessor.class */
    private class CreateProcessor implements CommandProcessor<JobRecord> {
        private CreateProcessor() {
        }

        @Override // io.zeebe.broker.logstreams.processor.CommandProcessor
        public void onCommand(TypedRecord<JobRecord> typedRecord, CommandProcessor.CommandControl<JobRecord> commandControl) {
            JobStreamProcessor.this.state.create(commandControl.accept(JobIntent.CREATED, typedRecord.getValue()), typedRecord.getValue());
        }
    }

    /* loaded from: input_file:io/zeebe/broker/job/JobStreamProcessor$FailProcessor.class */
    private class FailProcessor implements CommandProcessor<JobRecord> {
        private FailProcessor() {
        }

        @Override // io.zeebe.broker.logstreams.processor.CommandProcessor
        public void onCommand(TypedRecord<JobRecord> typedRecord, CommandProcessor.CommandControl<JobRecord> commandControl) {
            long key = typedRecord.getKey();
            if (!JobStreamProcessor.this.state.isInState(key, JobStateController.State.ACTIVATED)) {
                commandControl.reject(RejectionType.NOT_APPLICABLE, "Job is not currently activated");
                return;
            }
            JobRecord job = JobStreamProcessor.this.state.getJob(key);
            job.setRetries(typedRecord.getValue().getRetries());
            JobStreamProcessor.this.state.fail(key, job);
            commandControl.accept(JobIntent.FAILED, job);
        }
    }

    /* loaded from: input_file:io/zeebe/broker/job/JobStreamProcessor$JobBatchActivateProcessor.class */
    private class JobBatchActivateProcessor implements TypedRecordProcessor<JobBatchRecord> {
        private JobBatchActivateProcessor() {
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public void processRecord(TypedRecord<JobBatchRecord> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter) {
            if (JobStreamProcessor.this.isValid(typedRecord.getValue())) {
                JobStreamProcessor.this.activateJobs(typedRecord, typedResponseWriter, typedStreamWriter);
            } else {
                JobStreamProcessor.this.rejectCommand(typedRecord, typedResponseWriter, typedStreamWriter);
            }
        }
    }

    /* loaded from: input_file:io/zeebe/broker/job/JobStreamProcessor$TimeOutProcessor.class */
    private class TimeOutProcessor implements CommandProcessor<JobRecord> {
        private TimeOutProcessor() {
        }

        @Override // io.zeebe.broker.logstreams.processor.CommandProcessor
        public void onCommand(TypedRecord<JobRecord> typedRecord, CommandProcessor.CommandControl<JobRecord> commandControl) {
            if (!JobStreamProcessor.this.state.isInState(typedRecord.getKey(), JobStateController.State.ACTIVATED)) {
                commandControl.reject(RejectionType.NOT_APPLICABLE, "Job not activated");
            } else {
                JobStreamProcessor.this.state.timeout(typedRecord.getKey(), typedRecord.getValue());
                commandControl.accept(JobIntent.TIMED_OUT, typedRecord.getValue());
            }
        }
    }

    /* loaded from: input_file:io/zeebe/broker/job/JobStreamProcessor$UpdateRetriesProcessor.class */
    private class UpdateRetriesProcessor implements CommandProcessor<JobRecord> {
        private UpdateRetriesProcessor() {
        }

        @Override // io.zeebe.broker.logstreams.processor.CommandProcessor
        public void onCommand(TypedRecord<JobRecord> typedRecord, CommandProcessor.CommandControl<JobRecord> commandControl) {
            long key = typedRecord.getKey();
            int retries = typedRecord.getValue().getRetries();
            if (!JobStreamProcessor.this.state.isInState(typedRecord.getKey(), JobStateController.State.FAILED)) {
                commandControl.reject(RejectionType.NOT_APPLICABLE, "Job is not failed");
                return;
            }
            if (retries <= 0) {
                commandControl.reject(RejectionType.BAD_VALUE, "Job retries must be positive");
                return;
            }
            JobRecord job = JobStreamProcessor.this.state.getJob(typedRecord.getKey());
            job.setRetries(retries);
            JobStreamProcessor.this.state.resolve(key, job);
            commandControl.accept(JobIntent.RETRIES_UPDATED, job);
        }
    }

    @Override // io.zeebe.broker.logstreams.processor.StreamProcessorLifecycleAware
    public void onRecovered(TypedStreamProcessor typedStreamProcessor) {
        this.timer = typedStreamProcessor.getActor().runAtFixedRate(TIME_OUT_POLLING_INTERVAL, this::deactivateTimedOutJobs);
        this.writer = typedStreamProcessor.getEnvironment().buildCommandWriter();
    }

    @Override // io.zeebe.broker.logstreams.processor.StreamProcessorLifecycleAware
    public void onClose() {
        if (this.timer != null) {
            this.timer.cancel();
            this.timer = null;
        }
    }

    public TypedStreamProcessor createStreamProcessor(TypedStreamEnvironment typedStreamEnvironment) {
        this.partitionId = typedStreamEnvironment.getStream().getPartitionId();
        this.jobKeyGenerator = KeyGenerator.createJobKeyGenerator(this.partitionId, this.state);
        return typedStreamEnvironment.newStreamProcessor().keyGenerator(this.jobKeyGenerator).onCommand(ValueType.JOB, (Intent) JobIntent.CREATE, (CommandProcessor) new CreateProcessor()).onCommand(ValueType.JOB, (Intent) JobIntent.COMPLETE, (CommandProcessor) new CompleteProcessor()).onCommand(ValueType.JOB, (Intent) JobIntent.FAIL, (CommandProcessor) new FailProcessor()).onCommand(ValueType.JOB, (Intent) JobIntent.TIME_OUT, (CommandProcessor) new TimeOutProcessor()).onCommand(ValueType.JOB, (Intent) JobIntent.UPDATE_RETRIES, (CommandProcessor) new UpdateRetriesProcessor()).onCommand(ValueType.JOB, (Intent) JobIntent.CANCEL, (CommandProcessor) new CancelProcessor()).onCommand(ValueType.JOB_BATCH, (Intent) JobBatchIntent.ACTIVATE, (TypedRecordProcessor<?>) new JobBatchActivateProcessor()).withStateController(this.state).withListener(this).build();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StateSnapshotController createSnapshotController(StateStorage stateStorage) {
        return new StateSnapshotController(this.state, stateStorage);
    }

    private void deactivateTimedOutJobs() {
        this.state.forEachTimedOutEntry(ActorClock.currentTimeMillis(), (j, jobRecord, iteratorControl) -> {
            this.writer.writeFollowUpCommand(j, JobIntent.TIME_OUT, jobRecord, recordMetadata -> {
                recordMetadata.valueType(ValueType.JOB);
            });
            this.writer.flush();
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isValid(JobBatchRecord jobBatchRecord) {
        return jobBatchRecord.getAmount() > 0 && jobBatchRecord.getTimeout() > 0 && jobBatchRecord.getType().capacity() > 0 && jobBatchRecord.getWorker().capacity() > 0;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void activateJobs(TypedRecord<JobBatchRecord> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter) {
        JobBatchRecord value = typedRecord.getValue();
        long nextKey = this.jobKeyGenerator.nextKey();
        TypedBatchWriter newBatch = typedStreamWriter.newBatch();
        AtomicInteger atomicInteger = new AtomicInteger(value.getAmount());
        this.state.forEachActivatableJobs(value.getType(), (j, jobRecord, iteratorControl) -> {
            int decrementAndGet = atomicInteger.decrementAndGet();
            if (decrementAndGet >= 0) {
                long currentTimeMillis = ActorClock.currentTimeMillis() + value.getTimeout();
                ((LongValue) value.jobKeys().add()).setValue(j);
                JobRecord jobRecord = (JobRecord) value.jobs().add();
                ExpandableArrayBuffer expandableArrayBuffer = new ExpandableArrayBuffer(jobRecord.getLength());
                jobRecord.write(expandableArrayBuffer, 0);
                jobRecord.wrap(expandableArrayBuffer);
                jobRecord.setDeadline(currentTimeMillis).setWorker(value.getWorker());
                this.state.activate(j, jobRecord);
                newBatch.addFollowUpEvent(j, JobIntent.ACTIVATED, jobRecord);
            }
            if (decrementAndGet < 1) {
                iteratorControl.stop();
            }
        });
        newBatch.addFollowUpEvent(nextKey, JobBatchIntent.ACTIVATED, value);
        typedResponseWriter.writeEventOnCommand(nextKey, JobBatchIntent.ACTIVATED, value, typedRecord);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void rejectCommand(TypedRecord<JobBatchRecord> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter) {
        RejectionType rejectionType;
        String str;
        JobBatchRecord value = typedRecord.getValue();
        if (value.getAmount() < 1) {
            rejectionType = RejectionType.BAD_VALUE;
            str = "Job batch amount must be greater than zero, got " + value.getAmount();
        } else if (value.getTimeout() < 1) {
            rejectionType = RejectionType.BAD_VALUE;
            str = "Job batch timeout must be greater than zero, got " + value.getTimeout();
        } else if (value.getType().capacity() < 1) {
            rejectionType = RejectionType.BAD_VALUE;
            str = "Job batch type must not be empty";
        } else {
            if (value.getWorker().capacity() >= 1) {
                throw new IllegalStateException("Job batch command is valid and should not be rejected");
            }
            rejectionType = RejectionType.BAD_VALUE;
            str = "Job batch worker must not be empty";
        }
        typedStreamWriter.writeRejection(typedRecord, rejectionType, str);
        typedResponseWriter.writeRejectionOnCommand((TypedRecord<?>) typedRecord, rejectionType, str);
    }
}
