package io.camunda.zeebe.engine.processing.distribution;

import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedRejectionWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.immutable.DistributionState;
import io.camunda.zeebe.protocol.impl.record.value.distribution.CommandDistributionRecord;
import io.camunda.zeebe.protocol.record.RecordValue;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.intent.CommandDistributionIntent;
import io.camunda.zeebe.stream.api.records.TypedRecord;

/* loaded from: input_file:io/camunda/zeebe/engine/processing/distribution/CommandDistributionAcknowledgeProcessor.class */
public class CommandDistributionAcknowledgeProcessor implements TypedRecordProcessor<CommandDistributionRecord> {
    private static final String ERROR_PENDING_DISTRIBUTION_NOT_FOUND = "Expected to find pending distribution with key %d for partition %d, but no pending distribution was found.";
    private static final CommandDistributionRecord EMPTY_DISTRIBUTION_RECORD = new CommandDistributionRecord();
    private final DistributionState distributionState;
    private final StateWriter stateWriter;
    private final TypedRejectionWriter rejectionWriter;

    public CommandDistributionAcknowledgeProcessor(DistributionState distributionState, Writers writers) {
        this.distributionState = distributionState;
        this.stateWriter = writers.state();
        this.rejectionWriter = writers.rejection();
    }

    @Override // io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor
    public void processRecord(TypedRecord<CommandDistributionRecord> typedRecord) {
        long key = typedRecord.getKey();
        RecordValue recordValue = (CommandDistributionRecord) typedRecord.getValue();
        int partitionId = recordValue.getPartitionId();
        if (!this.distributionState.hasPendingDistribution(key, partitionId)) {
            this.rejectionWriter.appendRejection(typedRecord, RejectionType.NOT_FOUND, String.format(ERROR_PENDING_DISTRIBUTION_NOT_FOUND, Long.valueOf(key), Integer.valueOf(partitionId)));
            return;
        }
        this.stateWriter.appendFollowUpEvent(key, CommandDistributionIntent.ACKNOWLEDGED, recordValue);
        if (this.distributionState.hasPendingDistribution(key)) {
            return;
        }
        this.stateWriter.appendFollowUpEvent(key, CommandDistributionIntent.FINISHED, new CommandDistributionRecord().setPartitionId(typedRecord.getPartitionId()).setValueType(recordValue.getValueType()).setIntent(recordValue.getIntent()));
    }
}
