/*
 * Decompiled with CFR 0.152.
 */
package io.zeebe.broker.task.processor;

import io.zeebe.broker.logstreams.processor.MetadataFilter;
import io.zeebe.broker.task.data.TaskEvent;
import io.zeebe.broker.task.data.TaskState;
import io.zeebe.broker.task.processor.ExpirationTimeBucket;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.log.LogStreamReader;
import io.zeebe.logstreams.log.LogStreamWriter;
import io.zeebe.logstreams.log.LoggedEvent;
import io.zeebe.logstreams.processor.EventProcessor;
import io.zeebe.logstreams.processor.StreamProcessor;
import io.zeebe.logstreams.processor.StreamProcessorContext;
import io.zeebe.logstreams.snapshot.SerializableWrapper;
import io.zeebe.logstreams.spi.SnapshotSupport;
import io.zeebe.protocol.clientapi.EventType;
import io.zeebe.protocol.impl.BrokerEventMetadata;
import io.zeebe.util.DeferredCommandContext;
import io.zeebe.util.buffer.BufferReader;
import io.zeebe.util.buffer.BufferWriter;
import io.zeebe.util.time.ClockUtil;
import java.util.HashMap;
import java.util.Iterator;
import org.agrona.DirectBuffer;

public class TaskExpireLockStreamProcessor
implements StreamProcessor {
    protected static final int INDEX_VALUE_LENGTH = 8;
    protected final EventProcessor lockedEventProcessor = new LockedEventProcessor();
    protected final EventProcessor unlockEventProcessor = new UnlockEventProcessor();
    protected final EventProcessor expireLockEventProcessor = new ExpireLockEventProcessor();
    protected final Runnable checkLockExpirationCmd = new CheckLockExpirationCmd();
    protected HashMap<Long, ExpirationTimeBucket> index = new HashMap();
    protected SerializableWrapper<HashMap<Long, ExpirationTimeBucket>> indexSnapshot = new SerializableWrapper(this.index);
    protected DeferredCommandContext cmdQueue;
    protected LogStreamReader targetLogStreamReader;
    protected LogStreamWriter targetLogStreamWriter;
    protected LogStream targetStream;
    protected DirectBuffer targetLogStreamTopicName;
    protected int targetLogStreamPartitionId;
    protected int streamProcessorId;
    protected final BrokerEventMetadata targetEventMetadata = new BrokerEventMetadata();
    protected final TaskEvent taskEvent = new TaskEvent();
    protected long eventKey = 0L;
    protected long eventPosition = 0L;
    protected long lastWrittenEventPosition = 0L;

    public SnapshotSupport getStateResource() {
        return this.indexSnapshot;
    }

    public void onOpen(StreamProcessorContext context) {
        this.streamProcessorId = context.getId();
        this.cmdQueue = context.getStreamProcessorCmdQueue();
        this.targetLogStreamReader = context.getTargetLogStreamReader();
        this.targetLogStreamWriter = context.getLogStreamWriter();
        this.targetStream = context.getTargetStream();
        this.targetLogStreamTopicName = this.targetStream.getTopicName();
        this.targetLogStreamPartitionId = this.targetStream.getPartitionId();
        this.index = (HashMap)this.indexSnapshot.getObject();
    }

    public static MetadataFilter eventFilter() {
        return m -> m.getEventType() == EventType.TASK_EVENT;
    }

    public EventProcessor onEvent(LoggedEvent event) {
        this.eventKey = event.getKey();
        this.eventPosition = event.getPosition();
        this.taskEvent.reset();
        event.readValue((BufferReader)this.taskEvent);
        EventProcessor eventProcessor = null;
        switch (this.taskEvent.getState()) {
            case LOCKED: {
                eventProcessor = this.lockedEventProcessor;
                break;
            }
            case EXPIRE_LOCK: {
                eventProcessor = this.expireLockEventProcessor;
                break;
            }
            case LOCK_EXPIRED: 
            case COMPLETED: 
            case FAILED: {
                eventProcessor = this.unlockEventProcessor;
                break;
            }
        }
        return eventProcessor;
    }

    public void checkLockExpirationAsync() {
        this.cmdQueue.runAsync(this.checkLockExpirationCmd);
    }

    class CheckLockExpirationCmd
    implements Runnable {
        CheckLockExpirationCmd() {
        }

        @Override
        public void run() {
            if (TaskExpireLockStreamProcessor.this.index.size() > 0) {
                Iterator<Long> eventKeyIt = TaskExpireLockStreamProcessor.this.index.keySet().iterator();
                while (eventKeyIt.hasNext()) {
                    long eventKey = eventKeyIt.next();
                    ExpirationTimeBucket expirationTimeBucket = TaskExpireLockStreamProcessor.this.index.get(eventKey);
                    long eventPosition = expirationTimeBucket.getEventPosition();
                    long lockExpirationTime = expirationTimeBucket.getExpirationTime();
                    if (!this.lockExpired(lockExpirationTime)) continue;
                    LoggedEvent taskLockedEvent = this.findEvent(eventPosition);
                    this.writeLockExpireEvent(eventKey, taskLockedEvent);
                    eventKeyIt.remove();
                }
            }
        }

        protected boolean lockExpired(long lockExpirationTime) {
            return lockExpirationTime <= ClockUtil.getCurrentTimeInMillis();
        }

        protected LoggedEvent findEvent(long position) {
            boolean found = TaskExpireLockStreamProcessor.this.targetLogStreamReader.seek(position);
            if (found && TaskExpireLockStreamProcessor.this.targetLogStreamReader.hasNext()) {
                return (LoggedEvent)TaskExpireLockStreamProcessor.this.targetLogStreamReader.next();
            }
            throw new IllegalStateException("Failed to check the task lock expiration time. Indexed task event not found in log stream.");
        }

        protected void writeLockExpireEvent(long eventKey, LoggedEvent lockedEvent) {
            TaskExpireLockStreamProcessor.this.taskEvent.reset();
            lockedEvent.readValue((BufferReader)TaskExpireLockStreamProcessor.this.taskEvent);
            TaskExpireLockStreamProcessor.this.taskEvent.setState(TaskState.EXPIRE_LOCK);
            TaskExpireLockStreamProcessor.this.targetEventMetadata.reset().protocolVersion(1).eventType(EventType.TASK_EVENT).raftTermId(TaskExpireLockStreamProcessor.this.targetStream.getTerm());
            long position = TaskExpireLockStreamProcessor.this.targetLogStreamWriter.producerId(TaskExpireLockStreamProcessor.this.streamProcessorId).sourceEvent(TaskExpireLockStreamProcessor.this.targetLogStreamTopicName, TaskExpireLockStreamProcessor.this.targetLogStreamPartitionId, lockedEvent.getPosition()).key(eventKey).metadataWriter((BufferWriter)TaskExpireLockStreamProcessor.this.targetEventMetadata).valueWriter((BufferWriter)TaskExpireLockStreamProcessor.this.taskEvent).tryWrite();
            if (position >= 0L) {
                TaskExpireLockStreamProcessor.this.lastWrittenEventPosition = position;
            }
        }
    }

    class ExpireLockEventProcessor
    implements EventProcessor {
        ExpireLockEventProcessor() {
        }

        public void processEvent() {
        }

        public long writeEvent(LogStreamWriter writer) {
            return TaskExpireLockStreamProcessor.this.lastWrittenEventPosition;
        }
    }

    class UnlockEventProcessor
    implements EventProcessor {
        UnlockEventProcessor() {
        }

        public void processEvent() {
        }

        public void updateState() {
            TaskExpireLockStreamProcessor.this.index.remove(TaskExpireLockStreamProcessor.this.eventKey);
        }
    }

    class LockedEventProcessor
    implements EventProcessor {
        LockedEventProcessor() {
        }

        public void processEvent() {
        }

        public void updateState() {
            ExpirationTimeBucket expirationTimeBucket = new ExpirationTimeBucket(TaskExpireLockStreamProcessor.this.eventPosition, TaskExpireLockStreamProcessor.this.taskEvent.getLockTime());
            TaskExpireLockStreamProcessor.this.index.put(TaskExpireLockStreamProcessor.this.eventKey, expirationTimeBucket);
        }
    }
}

