package io.zeebe.broker.job.processor;

import io.zeebe.broker.job.CreditsRequest;
import io.zeebe.broker.job.JobSubscriptionManager;
import io.zeebe.broker.job.data.JobRecord;
import io.zeebe.broker.job.map.JobInstanceMap;
import io.zeebe.broker.logstreams.processor.CommandProcessor;
import io.zeebe.broker.logstreams.processor.TypedRecord;
import io.zeebe.broker.logstreams.processor.TypedRecordProcessor;
import io.zeebe.broker.logstreams.processor.TypedResponseWriter;
import io.zeebe.broker.logstreams.processor.TypedStreamEnvironment;
import io.zeebe.broker.logstreams.processor.TypedStreamProcessor;
import io.zeebe.broker.logstreams.processor.TypedStreamWriter;
import io.zeebe.broker.transport.clientapi.SubscribedRecordWriter;
import io.zeebe.map.ZbMap;
import io.zeebe.protocol.clientapi.RecordType;
import io.zeebe.protocol.clientapi.RejectionType;
import io.zeebe.protocol.clientapi.SubscriptionType;
import io.zeebe.protocol.clientapi.ValueType;
import io.zeebe.protocol.impl.RecordMetadata;
import io.zeebe.protocol.intent.Intent;
import io.zeebe.protocol.intent.JobIntent;

/* loaded from: input_file:io/zeebe/broker/job/processor/JobInstanceStreamProcessor.class */
public class JobInstanceStreamProcessor {
    protected static final short STATE_CREATED = 1;
    protected static final short STATE_ACTIVATED = 2;
    protected static final short STATE_FAILED = 3;
    protected static final short STATE_TIMED_OUT = 4;
    protected SubscribedRecordWriter subscribedEventWriter;
    protected final JobSubscriptionManager jobSubscriptionManager;
    protected final CreditsRequest creditsRequest = new CreditsRequest();
    protected final JobInstanceMap jobIndex = new JobInstanceMap();
    protected int logStreamPartitionId;

    /* loaded from: input_file:io/zeebe/broker/job/processor/JobInstanceStreamProcessor$ActivateJobProcessor.class */
    private class ActivateJobProcessor implements TypedRecordProcessor<JobRecord> {
        protected boolean canActivate;
        protected final CreditsRequest creditsRequest;

        private ActivateJobProcessor() {
            this.creditsRequest = new CreditsRequest();
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public void processRecord(TypedRecord<JobRecord> typedRecord) {
            this.canActivate = false;
            short jobState = JobInstanceStreamProcessor.this.jobIndex.getJobState(typedRecord.getKey());
            if (jobState == 1 || jobState == 3 || jobState == 4) {
                this.canActivate = true;
            }
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public boolean executeSideEffects(TypedRecord<JobRecord> typedRecord, TypedResponseWriter typedResponseWriter) {
            boolean increaseSubscriptionCreditsAsync;
            if (this.canActivate) {
                RecordMetadata metadata = typedRecord.getMetadata();
                increaseSubscriptionCreditsAsync = JobInstanceStreamProcessor.this.subscribedEventWriter.recordType(RecordType.EVENT).intent(JobIntent.ACTIVATED).partitionId(JobInstanceStreamProcessor.this.logStreamPartitionId).position(typedRecord.getPosition()).sourceRecordPosition(typedRecord.getPosition()).key(typedRecord.getKey()).timestamp(typedRecord.getTimestamp()).subscriberKey(metadata.getSubscriberKey()).subscriptionType(SubscriptionType.JOB_SUBSCRIPTION).valueType(ValueType.JOB).valueWriter(typedRecord.getValue()).tryWriteMessage(metadata.getRequestStreamId());
            } else {
                this.creditsRequest.setSubscriberKey(typedRecord.getMetadata().getSubscriberKey());
                this.creditsRequest.setCredits(1);
                increaseSubscriptionCreditsAsync = JobInstanceStreamProcessor.this.jobSubscriptionManager.increaseSubscriptionCreditsAsync(this.creditsRequest);
            }
            return increaseSubscriptionCreditsAsync;
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public long writeRecord(TypedRecord<JobRecord> typedRecord, TypedStreamWriter typedStreamWriter) {
            return this.canActivate ? typedStreamWriter.writeFollowUpEvent(typedRecord.getKey(), JobIntent.ACTIVATED, typedRecord.getValue()) : typedStreamWriter.writeRejection(typedRecord, RejectionType.NOT_APPLICABLE, "Job is not in one of these states: CREATED, FAILED, TIMED_OUT");
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public void updateState(TypedRecord<JobRecord> typedRecord) {
            if (this.canActivate) {
                JobInstanceStreamProcessor.this.jobIndex.putJobInstance(typedRecord.getKey(), (short) 2);
            }
        }
    }

    /* loaded from: input_file:io/zeebe/broker/job/processor/JobInstanceStreamProcessor$CancelJobProcessor.class */
    private class CancelJobProcessor implements CommandProcessor<JobRecord> {
        private CancelJobProcessor() {
        }

        @Override // io.zeebe.broker.logstreams.processor.CommandProcessor
        public CommandProcessor.CommandResult onCommand(TypedRecord<JobRecord> typedRecord, CommandProcessor.CommandControl commandControl) {
            return JobInstanceStreamProcessor.this.jobIndex.getJobState(typedRecord.getKey()) > 0 ? commandControl.accept(JobIntent.CANCELED) : commandControl.reject(RejectionType.NOT_APPLICABLE, "Job does not exist");
        }

        @Override // io.zeebe.broker.logstreams.processor.CommandProcessor
        public void updateStateOnAccept(TypedRecord<JobRecord> typedRecord) {
            JobInstanceStreamProcessor.this.jobIndex.remove(typedRecord.getKey());
        }
    }

    /* loaded from: input_file:io/zeebe/broker/job/processor/JobInstanceStreamProcessor$CompleteJobProcessor.class */
    private class CompleteJobProcessor implements CommandProcessor<JobRecord> {
        private CompleteJobProcessor() {
        }

        @Override // io.zeebe.broker.logstreams.processor.CommandProcessor
        public CommandProcessor.CommandResult onCommand(TypedRecord<JobRecord> typedRecord, CommandProcessor.CommandControl commandControl) {
            short jobState = JobInstanceStreamProcessor.this.jobIndex.getJobState(typedRecord.getKey());
            return jobState == 2 || jobState == 4 ? commandControl.accept(JobIntent.COMPLETED) : commandControl.reject(RejectionType.NOT_APPLICABLE, "Job is not in state: ACTIVATED, TIMED_OUT");
        }

        @Override // io.zeebe.broker.logstreams.processor.CommandProcessor
        public void updateStateOnAccept(TypedRecord<JobRecord> typedRecord) {
            JobInstanceStreamProcessor.this.jobIndex.remove(typedRecord.getKey());
        }
    }

    /* loaded from: input_file:io/zeebe/broker/job/processor/JobInstanceStreamProcessor$CreateJobProcessor.class */
    private class CreateJobProcessor implements CommandProcessor<JobRecord> {
        private CreateJobProcessor() {
        }

        @Override // io.zeebe.broker.logstreams.processor.CommandProcessor
        public CommandProcessor.CommandResult onCommand(TypedRecord<JobRecord> typedRecord, CommandProcessor.CommandControl commandControl) {
            return commandControl.accept(JobIntent.CREATED);
        }

        @Override // io.zeebe.broker.logstreams.processor.CommandProcessor
        public void updateStateOnAccept(TypedRecord<JobRecord> typedRecord) {
            JobInstanceStreamProcessor.this.jobIndex.putJobInstance(typedRecord.getKey(), (short) 1);
        }
    }

    /* loaded from: input_file:io/zeebe/broker/job/processor/JobInstanceStreamProcessor$FailJobProcessor.class */
    private class FailJobProcessor implements CommandProcessor<JobRecord> {
        private FailJobProcessor() {
        }

        @Override // io.zeebe.broker.logstreams.processor.CommandProcessor
        public CommandProcessor.CommandResult onCommand(TypedRecord<JobRecord> typedRecord, CommandProcessor.CommandControl commandControl) {
            return JobInstanceStreamProcessor.this.jobIndex.getJobState(typedRecord.getKey()) == 2 ? commandControl.accept(JobIntent.FAILED) : commandControl.reject(RejectionType.NOT_APPLICABLE, "Job is not in state ACTIVATED");
        }

        @Override // io.zeebe.broker.logstreams.processor.CommandProcessor
        public void updateStateOnAccept(TypedRecord<JobRecord> typedRecord) {
            JobInstanceStreamProcessor.this.jobIndex.putJobInstance(typedRecord.getKey(), (short) 3);
        }
    }

    /* loaded from: input_file:io/zeebe/broker/job/processor/JobInstanceStreamProcessor$TimeOutJobProcessor.class */
    private class TimeOutJobProcessor implements CommandProcessor<JobRecord> {
        private static final String REJECTION_REASON = "Job is not in state ACTIVATED";

        private TimeOutJobProcessor() {
        }

        @Override // io.zeebe.broker.logstreams.processor.CommandProcessor
        public CommandProcessor.CommandResult onCommand(TypedRecord<JobRecord> typedRecord, CommandProcessor.CommandControl commandControl) {
            return JobInstanceStreamProcessor.this.jobIndex.getJobState(typedRecord.getKey()) == 2 ? commandControl.accept(JobIntent.TIMED_OUT) : commandControl.reject(RejectionType.NOT_APPLICABLE, REJECTION_REASON);
        }

        @Override // io.zeebe.broker.logstreams.processor.CommandProcessor
        public void updateStateOnAccept(TypedRecord<JobRecord> typedRecord) {
            JobInstanceStreamProcessor.this.jobIndex.putJobInstance(typedRecord.getKey(), (short) 4);
        }
    }

    /* loaded from: input_file:io/zeebe/broker/job/processor/JobInstanceStreamProcessor$UpdateRetriesJobProcessor.class */
    private class UpdateRetriesJobProcessor implements CommandProcessor<JobRecord> {
        private UpdateRetriesJobProcessor() {
        }

        @Override // io.zeebe.broker.logstreams.processor.CommandProcessor
        public CommandProcessor.CommandResult onCommand(TypedRecord<JobRecord> typedRecord, CommandProcessor.CommandControl commandControl) {
            return JobInstanceStreamProcessor.this.jobIndex.getJobState(typedRecord.getKey()) == 3 ? typedRecord.getValue().getRetries() > 0 ? commandControl.accept(JobIntent.RETRIES_UPDATED) : commandControl.reject(RejectionType.BAD_VALUE, "Retries must be greater than 0") : commandControl.reject(RejectionType.NOT_APPLICABLE, "Job is not in state FAILED");
        }
    }

    public JobInstanceStreamProcessor(JobSubscriptionManager jobSubscriptionManager) {
        this.jobSubscriptionManager = jobSubscriptionManager;
    }

    public TypedStreamProcessor createStreamProcessor(TypedStreamEnvironment typedStreamEnvironment) {
        this.logStreamPartitionId = typedStreamEnvironment.getStream().getPartitionId();
        this.subscribedEventWriter = new SubscribedRecordWriter(typedStreamEnvironment.getOutput());
        return typedStreamEnvironment.newStreamProcessor().onCommand(ValueType.JOB, (Intent) JobIntent.CREATE, (CommandProcessor) new CreateJobProcessor()).onCommand(ValueType.JOB, (Intent) JobIntent.ACTIVATE, (TypedRecordProcessor<?>) new ActivateJobProcessor()).onCommand(ValueType.JOB, (Intent) JobIntent.COMPLETE, (CommandProcessor) new CompleteJobProcessor()).onCommand(ValueType.JOB, (Intent) JobIntent.FAIL, (CommandProcessor) new FailJobProcessor()).onCommand(ValueType.JOB, (Intent) JobIntent.TIME_OUT, (CommandProcessor) new TimeOutJobProcessor()).onCommand(ValueType.JOB, (Intent) JobIntent.UPDATE_RETRIES, (CommandProcessor) new UpdateRetriesJobProcessor()).onCommand(ValueType.JOB, (Intent) JobIntent.CANCEL, (CommandProcessor) new CancelJobProcessor()).withStateResource((ZbMap<?, ?>) this.jobIndex.getMap()).build();
    }
}
