package io.zeebe.broker.task.processor;

import io.zeebe.broker.logstreams.processor.StreamProcessorLifecycleAware;
import io.zeebe.broker.logstreams.processor.TypedEvent;
import io.zeebe.broker.logstreams.processor.TypedEventProcessor;
import io.zeebe.broker.logstreams.processor.TypedStreamEnvironment;
import io.zeebe.broker.logstreams.processor.TypedStreamProcessor;
import io.zeebe.broker.logstreams.processor.TypedStreamReader;
import io.zeebe.broker.logstreams.processor.TypedStreamWriter;
import io.zeebe.broker.task.TaskQueueManagerService;
import io.zeebe.broker.task.data.TaskEvent;
import io.zeebe.broker.task.data.TaskState;
import io.zeebe.map.Long2BytesZbMap;
import io.zeebe.map.ZbMap;
import io.zeebe.map.iterator.Long2BytesZbMapEntry;
import io.zeebe.protocol.clientapi.EventType;
import io.zeebe.util.sched.ScheduledTimer;
import io.zeebe.util.sched.clock.ActorClock;
import java.util.Iterator;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;

/* loaded from: input_file:io/zeebe/broker/task/processor/TaskExpireLockStreamProcessor.class */
public class TaskExpireLockStreamProcessor implements StreamProcessorLifecycleAware {
    protected static final int MAP_VALUE_MAX_LENGTH = 16;
    protected Long2BytesZbMap expirationMap = new Long2BytesZbMap(16);
    private UnsafeBuffer mapAccessBuffer = new UnsafeBuffer(new byte[16]);
    private final TaskEventWriter streamWriter;
    private ScheduledTimer timer;

    public TaskExpireLockStreamProcessor(TypedStreamReader typedStreamReader, TypedStreamWriter typedStreamWriter) {
        this.streamWriter = new TaskEventWriter(typedStreamWriter, typedStreamReader);
    }

    @Override // io.zeebe.broker.logstreams.processor.StreamProcessorLifecycleAware
    public void onOpen(TypedStreamProcessor typedStreamProcessor) {
        this.timer = typedStreamProcessor.getActor().runAtFixedRate(TaskQueueManagerService.LOCK_EXPIRATION_INTERVAL, this::timeOutTasks);
    }

    @Override // io.zeebe.broker.logstreams.processor.StreamProcessorLifecycleAware
    public void onClose() {
        if (this.timer != null) {
            this.timer.cancel();
            this.timer = null;
        }
        this.streamWriter.close();
    }

    private void timeOutTasks() {
        Iterator it = this.expirationMap.iterator();
        while (it.hasNext()) {
            DirectBuffer value = ((Long2BytesZbMapEntry) it.next()).getValue();
            long j = value.getLong(0);
            if (lockExpired(value.getLong(8)) && !this.streamWriter.tryWriteTaskEvent(j, TaskState.EXPIRE_LOCK)) {
                return;
            }
        }
    }

    private boolean lockExpired(long j) {
        return j <= ActorClock.currentTimeMillis();
    }

    public TypedStreamProcessor createStreamProcessor(TypedStreamEnvironment typedStreamEnvironment) {
        TypedEventProcessor<TaskEvent> typedEventProcessor = new TypedEventProcessor<TaskEvent>() { // from class: io.zeebe.broker.task.processor.TaskExpireLockStreamProcessor.1
            @Override // io.zeebe.broker.logstreams.processor.TypedEventProcessor
            public void updateState(TypedEvent<TaskEvent> typedEvent) {
                long lockTime = typedEvent.getValue().getLockTime();
                TaskExpireLockStreamProcessor.this.mapAccessBuffer.putLong(0, typedEvent.getPosition());
                TaskExpireLockStreamProcessor.this.mapAccessBuffer.putLong(8, lockTime);
                TaskExpireLockStreamProcessor.this.expirationMap.put(typedEvent.getKey(), TaskExpireLockStreamProcessor.this.mapAccessBuffer);
            }
        };
        TypedEventProcessor<TaskEvent> typedEventProcessor2 = new TypedEventProcessor<TaskEvent>() { // from class: io.zeebe.broker.task.processor.TaskExpireLockStreamProcessor.2
            @Override // io.zeebe.broker.logstreams.processor.TypedEventProcessor
            public void updateState(TypedEvent<TaskEvent> typedEvent) {
                TaskExpireLockStreamProcessor.this.expirationMap.remove(typedEvent.getKey());
            }
        };
        return typedStreamEnvironment.newStreamProcessor().onEvent(EventType.TASK_EVENT, TaskState.LOCKED, typedEventProcessor).onEvent(EventType.TASK_EVENT, TaskState.LOCK_EXPIRED, typedEventProcessor2).onEvent(EventType.TASK_EVENT, TaskState.COMPLETED, typedEventProcessor2).onEvent(EventType.TASK_EVENT, TaskState.FAILED, typedEventProcessor2).withListener(this).withStateResource((ZbMap<?, ?>) this.expirationMap).build();
    }
}
