package io.zeebe.broker.job.processor;

import io.zeebe.broker.job.JobQueueManagerService;
import io.zeebe.broker.job.data.JobRecord;
import io.zeebe.broker.logstreams.processor.StreamProcessorLifecycleAware;
import io.zeebe.broker.logstreams.processor.TypedRecord;
import io.zeebe.broker.logstreams.processor.TypedRecordProcessor;
import io.zeebe.broker.logstreams.processor.TypedStreamEnvironment;
import io.zeebe.broker.logstreams.processor.TypedStreamProcessor;
import io.zeebe.broker.logstreams.processor.TypedStreamReader;
import io.zeebe.broker.logstreams.processor.TypedStreamWriter;
import io.zeebe.map.Long2BytesZbMap;
import io.zeebe.map.ZbMap;
import io.zeebe.map.iterator.Long2BytesZbMapEntry;
import io.zeebe.protocol.clientapi.ValueType;
import io.zeebe.protocol.intent.Intent;
import io.zeebe.protocol.intent.JobIntent;
import io.zeebe.util.sched.ScheduledTimer;
import io.zeebe.util.sched.clock.ActorClock;
import java.util.Iterator;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;

/* loaded from: input_file:io/zeebe/broker/job/processor/JobTimeOutStreamProcessor.class */
public class JobTimeOutStreamProcessor implements StreamProcessorLifecycleAware {
    protected static final int MAP_VALUE_MAX_LENGTH = 16;
    protected Long2BytesZbMap expirationMap = new Long2BytesZbMap(MAP_VALUE_MAX_LENGTH);
    private UnsafeBuffer mapAccessBuffer = new UnsafeBuffer(new byte[MAP_VALUE_MAX_LENGTH]);
    private ScheduledTimer timer;
    private TypedStreamWriter writer;
    private TypedStreamReader reader;

    @Override // io.zeebe.broker.logstreams.processor.StreamProcessorLifecycleAware
    public void onOpen(TypedStreamProcessor typedStreamProcessor) {
        this.timer = typedStreamProcessor.getActor().runAtFixedRate(JobQueueManagerService.TIME_OUT_INTERVAL, this::timeOutJobs);
        this.writer = typedStreamProcessor.getEnvironment().buildStreamWriter();
        this.reader = typedStreamProcessor.getEnvironment().buildStreamReader();
    }

    @Override // io.zeebe.broker.logstreams.processor.StreamProcessorLifecycleAware
    public void onClose() {
        if (this.timer != null) {
            this.timer.cancel();
            this.timer = null;
        }
        this.reader.close();
    }

    private void timeOutJobs() {
        Iterator it = this.expirationMap.iterator();
        while (it.hasNext()) {
            DirectBuffer value = ((Long2BytesZbMapEntry) it.next()).getValue();
            long j = value.getLong(0);
            if (isExpired(value.getLong(8))) {
                TypedRecord readValue = this.reader.readValue(j, JobRecord.class);
                if (!(this.writer.writeFollowUpCommand(readValue.getKey(), JobIntent.TIME_OUT, readValue.getValue()) >= 0)) {
                    return;
                }
            }
        }
    }

    private boolean isExpired(long j) {
        return j <= ActorClock.currentTimeMillis();
    }

    public TypedStreamProcessor createStreamProcessor(TypedStreamEnvironment typedStreamEnvironment) {
        TypedRecordProcessor<JobRecord> typedRecordProcessor = new TypedRecordProcessor<JobRecord>() { // from class: io.zeebe.broker.job.processor.JobTimeOutStreamProcessor.1
            @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
            public void updateState(TypedRecord<JobRecord> typedRecord) {
                long deadline = typedRecord.getValue().getDeadline();
                JobTimeOutStreamProcessor.this.mapAccessBuffer.putLong(0, typedRecord.getPosition());
                JobTimeOutStreamProcessor.this.mapAccessBuffer.putLong(8, deadline);
                JobTimeOutStreamProcessor.this.expirationMap.put(typedRecord.getKey(), JobTimeOutStreamProcessor.this.mapAccessBuffer);
            }
        };
        TypedRecordProcessor<JobRecord> typedRecordProcessor2 = new TypedRecordProcessor<JobRecord>() { // from class: io.zeebe.broker.job.processor.JobTimeOutStreamProcessor.2
            @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
            public void updateState(TypedRecord<JobRecord> typedRecord) {
                JobTimeOutStreamProcessor.this.expirationMap.remove(typedRecord.getKey());
            }
        };
        return typedStreamEnvironment.newStreamProcessor().onEvent(ValueType.JOB, (Intent) JobIntent.ACTIVATED, (TypedRecordProcessor<?>) typedRecordProcessor).onEvent(ValueType.JOB, (Intent) JobIntent.TIMED_OUT, (TypedRecordProcessor<?>) typedRecordProcessor2).onEvent(ValueType.JOB, (Intent) JobIntent.COMPLETED, (TypedRecordProcessor<?>) typedRecordProcessor2).onEvent(ValueType.JOB, (Intent) JobIntent.FAILED, (TypedRecordProcessor<?>) typedRecordProcessor2).withListener(this).withStateResource((ZbMap<?, ?>) this.expirationMap).build();
    }
}
