package io.zeebe.broker.job;

import io.zeebe.broker.logstreams.processor.StreamProcessorLifecycleAware;
import io.zeebe.broker.logstreams.processor.TypedCommandWriter;
import io.zeebe.broker.logstreams.processor.TypedStreamProcessor;
import io.zeebe.protocol.clientapi.ValueType;
import io.zeebe.protocol.intent.JobIntent;
import io.zeebe.util.sched.ScheduledTimer;
import io.zeebe.util.sched.clock.ActorClock;
import java.time.Duration;

/* loaded from: input_file:io/zeebe/broker/job/JobTimeoutTrigger.class */
public class JobTimeoutTrigger implements StreamProcessorLifecycleAware {
    public static final Duration TIME_OUT_POLLING_INTERVAL = Duration.ofSeconds(30);
    private final JobState state;
    private ScheduledTimer timer;
    private TypedCommandWriter writer;

    public JobTimeoutTrigger(JobState jobState) {
        this.state = jobState;
    }

    @Override // io.zeebe.broker.logstreams.processor.StreamProcessorLifecycleAware
    public void onRecovered(TypedStreamProcessor typedStreamProcessor) {
        this.timer = typedStreamProcessor.getActor().runAtFixedRate(TIME_OUT_POLLING_INTERVAL, this::deactivateTimedOutJobs);
        this.writer = typedStreamProcessor.getEnvironment().buildCommandWriter();
    }

    @Override // io.zeebe.broker.logstreams.processor.StreamProcessorLifecycleAware
    public void onClose() {
        if (this.timer != null) {
            this.timer.cancel();
            this.timer = null;
        }
    }

    private void deactivateTimedOutJobs() {
        this.state.forEachTimedOutEntry(ActorClock.currentTimeMillis(), (l, jobRecord) -> {
            this.writer.appendFollowUpCommand(l.longValue(), JobIntent.TIME_OUT, jobRecord, recordMetadata -> {
                recordMetadata.valueType(ValueType.JOB);
            });
            this.writer.flush();
        });
    }
}
