/*
 * Decompiled with CFR 0.152.
 */
package io.zeebe.broker.task.processor;

import io.zeebe.broker.logstreams.processor.MetadataFilter;
import io.zeebe.broker.task.CreditsRequest;
import io.zeebe.broker.task.TaskSubscriptionManager;
import io.zeebe.broker.task.data.TaskEvent;
import io.zeebe.broker.task.data.TaskState;
import io.zeebe.broker.task.map.TaskInstanceMap;
import io.zeebe.broker.transport.clientapi.CommandResponseWriter;
import io.zeebe.broker.transport.clientapi.SubscribedEventWriter;
import io.zeebe.broker.util.PayloadUtil;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.log.LogStreamWriter;
import io.zeebe.logstreams.log.LoggedEvent;
import io.zeebe.logstreams.processor.EventProcessor;
import io.zeebe.logstreams.processor.StreamProcessor;
import io.zeebe.logstreams.processor.StreamProcessorContext;
import io.zeebe.logstreams.spi.SnapshotSupport;
import io.zeebe.protocol.clientapi.EventType;
import io.zeebe.protocol.clientapi.SubscriptionType;
import io.zeebe.protocol.impl.BrokerEventMetadata;
import io.zeebe.util.buffer.BufferReader;
import io.zeebe.util.buffer.BufferUtil;
import io.zeebe.util.buffer.BufferWriter;
import org.agrona.DirectBuffer;

public class TaskInstanceStreamProcessor
implements StreamProcessor {
    protected static final short STATE_CREATED = 1;
    protected static final short STATE_LOCKED = 2;
    protected static final short STATE_FAILED = 3;
    protected static final short STATE_LOCK_EXPIRED = 4;
    protected BrokerEventMetadata sourceEventMetadata = new BrokerEventMetadata();
    protected final BrokerEventMetadata targetEventMetadata = new BrokerEventMetadata();
    protected final CommandResponseWriter responseWriter;
    protected final SubscribedEventWriter subscribedEventWriter;
    protected final TaskSubscriptionManager taskSubscriptionManager;
    protected final CreateTaskProcessor createTaskProcessor = new CreateTaskProcessor();
    protected final LockTaskProcessor lockTaskProcessor = new LockTaskProcessor();
    protected final CompleteTaskProcessor completeTaskProcessor = new CompleteTaskProcessor();
    protected final FailTaskProcessor failTaskProcessor = new FailTaskProcessor();
    protected final ExpireLockTaskProcessor expireLockTaskProcessor = new ExpireLockTaskProcessor();
    protected final UpdateRetriesTaskProcessor updateRetriesTaskProcessor = new UpdateRetriesTaskProcessor();
    protected final CancelTaskProcessor cancelTaskProcessor = new CancelTaskProcessor();
    protected final TaskInstanceMap taskIndex;
    protected final TaskEvent taskEvent = new TaskEvent();
    protected final CreditsRequest creditsRequest = new CreditsRequest();
    protected DirectBuffer logStreamTopicName;
    protected int logStreamPartitionId;
    protected LogStream targetStream;
    protected long eventKey = 0L;
    protected long eventPosition = 0L;

    public TaskInstanceStreamProcessor(CommandResponseWriter responseWriter, SubscribedEventWriter subscribedEventWriter, TaskSubscriptionManager taskSubscriptionManager) {
        this.responseWriter = responseWriter;
        this.subscribedEventWriter = subscribedEventWriter;
        this.taskSubscriptionManager = taskSubscriptionManager;
        this.taskIndex = new TaskInstanceMap();
    }

    public int getPriority(long now) {
        return 100;
    }

    public SnapshotSupport getStateResource() {
        return this.taskIndex.getSnapshotSupport();
    }

    public void onOpen(StreamProcessorContext context) {
        LogStream sourceStream = context.getSourceStream();
        this.logStreamTopicName = sourceStream.getTopicName();
        this.logStreamPartitionId = sourceStream.getPartitionId();
        this.targetStream = context.getTargetStream();
    }

    public void onClose() {
        this.taskIndex.close();
    }

    public static MetadataFilter eventFilter() {
        return m -> m.getEventType() == EventType.TASK_EVENT;
    }

    public EventProcessor onEvent(LoggedEvent event) {
        this.taskIndex.reset();
        this.eventKey = event.getKey();
        this.eventPosition = event.getPosition();
        event.readMetadata((BufferReader)this.sourceEventMetadata);
        this.taskEvent.reset();
        event.readValue((BufferReader)this.taskEvent);
        Object eventProcessor = null;
        switch (this.taskEvent.getState()) {
            case CREATE: {
                eventProcessor = this.createTaskProcessor;
                break;
            }
            case LOCK: {
                eventProcessor = this.lockTaskProcessor;
                break;
            }
            case COMPLETE: {
                eventProcessor = this.completeTaskProcessor;
                break;
            }
            case FAIL: {
                eventProcessor = this.failTaskProcessor;
                break;
            }
            case EXPIRE_LOCK: {
                eventProcessor = this.expireLockTaskProcessor;
                break;
            }
            case UPDATE_RETRIES: {
                eventProcessor = this.updateRetriesTaskProcessor;
                break;
            }
            case CANCEL: {
                eventProcessor = this.cancelTaskProcessor;
                break;
            }
        }
        return eventProcessor;
    }

    public void afterEvent() {
        this.taskEvent.reset();
    }

    protected boolean writeResponse() {
        return this.responseWriter.topicName(this.logStreamTopicName).partitionId(this.logStreamPartitionId).position(this.eventPosition).key(this.eventKey).eventWriter((BufferWriter)this.taskEvent).tryWriteResponse(this.sourceEventMetadata.getRequestStreamId(), this.sourceEventMetadata.getRequestId());
    }

    protected long writeEventToLogStream(LogStreamWriter writer) {
        this.targetEventMetadata.reset();
        this.targetEventMetadata.protocolVersion(1).eventType(EventType.TASK_EVENT).raftTermId(this.targetStream.getTerm());
        return writer.key(this.eventKey).metadataWriter((BufferWriter)this.targetEventMetadata).valueWriter((BufferWriter)this.taskEvent).tryWrite();
    }

    private class CancelTaskProcessor
    implements EventProcessor {
        private boolean isCanceled;

        private CancelTaskProcessor() {
        }

        public void processEvent() {
            this.isCanceled = false;
            short state = TaskInstanceStreamProcessor.this.taskIndex.wrapTaskInstanceKey(TaskInstanceStreamProcessor.this.eventKey).getState();
            if (state > 0) {
                TaskInstanceStreamProcessor.this.taskEvent.setState(TaskState.CANCELED);
                this.isCanceled = true;
            } else {
                TaskInstanceStreamProcessor.this.taskEvent.setState(TaskState.CANCEL_REJECTED);
            }
        }

        public long writeEvent(LogStreamWriter writer) {
            return TaskInstanceStreamProcessor.this.writeEventToLogStream(writer);
        }

        public void updateState() {
            if (this.isCanceled) {
                TaskInstanceStreamProcessor.this.taskIndex.remove(TaskInstanceStreamProcessor.this.eventKey);
            }
        }
    }

    private class UpdateRetriesTaskProcessor
    implements EventProcessor {
        private UpdateRetriesTaskProcessor() {
        }

        public void processEvent() {
            short state = TaskInstanceStreamProcessor.this.taskIndex.wrapTaskInstanceKey(TaskInstanceStreamProcessor.this.eventKey).getState();
            if (state == 3 && TaskInstanceStreamProcessor.this.taskEvent.getRetries() > 0) {
                TaskInstanceStreamProcessor.this.taskEvent.setState(TaskState.RETRIES_UPDATED);
            } else {
                TaskInstanceStreamProcessor.this.taskEvent.setState(TaskState.UPDATE_RETRIES_REJECTED);
            }
        }

        public boolean executeSideEffects() {
            return TaskInstanceStreamProcessor.this.writeResponse();
        }

        public long writeEvent(LogStreamWriter writer) {
            return TaskInstanceStreamProcessor.this.writeEventToLogStream(writer);
        }
    }

    private class ExpireLockTaskProcessor
    implements EventProcessor {
        protected boolean isExpired;

        private ExpireLockTaskProcessor() {
        }

        public void processEvent() {
            this.isExpired = false;
            TaskInstanceStreamProcessor.this.taskIndex.wrapTaskInstanceKey(TaskInstanceStreamProcessor.this.eventKey);
            if (TaskInstanceStreamProcessor.this.taskIndex.getState() == 2) {
                TaskInstanceStreamProcessor.this.taskEvent.setState(TaskState.LOCK_EXPIRED);
                this.isExpired = true;
            }
            if (!this.isExpired) {
                TaskInstanceStreamProcessor.this.taskEvent.setState(TaskState.LOCK_EXPIRATION_REJECTED);
            }
        }

        public long writeEvent(LogStreamWriter writer) {
            return TaskInstanceStreamProcessor.this.writeEventToLogStream(writer);
        }

        public void updateState() {
            if (this.isExpired) {
                TaskInstanceStreamProcessor.this.taskIndex.setState((short)4).write();
            }
        }
    }

    private class FailTaskProcessor
    implements EventProcessor {
        protected boolean isFailed;

        private FailTaskProcessor() {
        }

        public void processEvent() {
            this.isFailed = false;
            TaskInstanceStreamProcessor.this.taskIndex.wrapTaskInstanceKey(TaskInstanceStreamProcessor.this.eventKey);
            if (TaskInstanceStreamProcessor.this.taskIndex.getState() == 2 && BufferUtil.contentsEqual((DirectBuffer)TaskInstanceStreamProcessor.this.taskIndex.getLockOwner(), (DirectBuffer)TaskInstanceStreamProcessor.this.taskEvent.getLockOwner())) {
                TaskInstanceStreamProcessor.this.taskEvent.setState(TaskState.FAILED);
                this.isFailed = true;
            }
            if (!this.isFailed) {
                TaskInstanceStreamProcessor.this.taskEvent.setState(TaskState.FAIL_REJECTED);
            }
        }

        public boolean executeSideEffects() {
            return TaskInstanceStreamProcessor.this.writeResponse();
        }

        public long writeEvent(LogStreamWriter writer) {
            return TaskInstanceStreamProcessor.this.writeEventToLogStream(writer);
        }

        public void updateState() {
            if (this.isFailed) {
                TaskInstanceStreamProcessor.this.taskIndex.setState((short)3).write();
            }
        }
    }

    private class CompleteTaskProcessor
    implements EventProcessor {
        protected boolean isCompleted;

        private CompleteTaskProcessor() {
        }

        public void processEvent() {
            DirectBuffer payload;
            boolean isCompletable;
            this.isCompleted = false;
            TaskInstanceStreamProcessor.this.taskIndex.wrapTaskInstanceKey(TaskInstanceStreamProcessor.this.eventKey);
            short state = TaskInstanceStreamProcessor.this.taskIndex.getState();
            TaskState taskEventType = TaskState.COMPLETE_REJECTED;
            boolean bl = isCompletable = state == 2 || state == 4;
            if (isCompletable && (PayloadUtil.isNilPayload(payload = TaskInstanceStreamProcessor.this.taskEvent.getPayload()) || PayloadUtil.isValidPayload(payload)) && BufferUtil.contentsEqual((DirectBuffer)TaskInstanceStreamProcessor.this.taskIndex.getLockOwner(), (DirectBuffer)TaskInstanceStreamProcessor.this.taskEvent.getLockOwner())) {
                taskEventType = TaskState.COMPLETED;
                this.isCompleted = true;
            }
            TaskInstanceStreamProcessor.this.taskEvent.setState(taskEventType);
        }

        public boolean executeSideEffects() {
            return TaskInstanceStreamProcessor.this.writeResponse();
        }

        public long writeEvent(LogStreamWriter writer) {
            return TaskInstanceStreamProcessor.this.writeEventToLogStream(writer);
        }

        public void updateState() {
            if (this.isCompleted) {
                TaskInstanceStreamProcessor.this.taskIndex.remove(TaskInstanceStreamProcessor.this.eventKey);
            }
        }
    }

    private class LockTaskProcessor
    implements EventProcessor {
        protected boolean isLocked;

        private LockTaskProcessor() {
        }

        public void processEvent() {
            this.isLocked = false;
            short state = TaskInstanceStreamProcessor.this.taskIndex.wrapTaskInstanceKey(TaskInstanceStreamProcessor.this.eventKey).getState();
            if (state == 1 || state == 3 || state == 4) {
                TaskInstanceStreamProcessor.this.taskEvent.setState(TaskState.LOCKED);
                this.isLocked = true;
            }
            if (!this.isLocked) {
                TaskInstanceStreamProcessor.this.taskEvent.setState(TaskState.LOCK_REJECTED);
            }
        }

        public boolean executeSideEffects() {
            boolean success = true;
            if (this.isLocked) {
                success = TaskInstanceStreamProcessor.this.subscribedEventWriter.topicName(TaskInstanceStreamProcessor.this.logStreamTopicName).partitionId(TaskInstanceStreamProcessor.this.logStreamPartitionId).position(TaskInstanceStreamProcessor.this.eventPosition).key(TaskInstanceStreamProcessor.this.eventKey).subscriberKey(TaskInstanceStreamProcessor.this.sourceEventMetadata.getSubscriberKey()).subscriptionType(SubscriptionType.TASK_SUBSCRIPTION).eventType(EventType.TASK_EVENT).eventWriter((BufferWriter)TaskInstanceStreamProcessor.this.taskEvent).tryWriteMessage(TaskInstanceStreamProcessor.this.sourceEventMetadata.getRequestStreamId());
            } else {
                long subscriptionId = TaskInstanceStreamProcessor.this.sourceEventMetadata.getSubscriberKey();
                TaskInstanceStreamProcessor.this.creditsRequest.setSubscriberKey(subscriptionId);
                TaskInstanceStreamProcessor.this.creditsRequest.setCredits(1);
                success = TaskInstanceStreamProcessor.this.taskSubscriptionManager.increaseSubscriptionCreditsAsync(TaskInstanceStreamProcessor.this.creditsRequest);
            }
            return success;
        }

        public long writeEvent(LogStreamWriter writer) {
            return TaskInstanceStreamProcessor.this.writeEventToLogStream(writer);
        }

        public void updateState() {
            if (this.isLocked) {
                TaskInstanceStreamProcessor.this.taskIndex.setState((short)2).setLockOwner(TaskInstanceStreamProcessor.this.taskEvent.getLockOwner()).write();
            }
        }
    }

    private class CreateTaskProcessor
    implements EventProcessor {
        private CreateTaskProcessor() {
        }

        public void processEvent() {
            TaskInstanceStreamProcessor.this.taskEvent.setState(TaskState.CREATED);
        }

        public boolean executeSideEffects() {
            boolean success = true;
            if (TaskInstanceStreamProcessor.this.sourceEventMetadata.hasRequestMetadata()) {
                success = TaskInstanceStreamProcessor.this.writeResponse();
            }
            return success;
        }

        public long writeEvent(LogStreamWriter writer) {
            return TaskInstanceStreamProcessor.this.writeEventToLogStream(writer);
        }

        public void updateState() {
            TaskInstanceStreamProcessor.this.taskIndex.newTaskInstance(TaskInstanceStreamProcessor.this.eventKey).setState((short)1).write();
        }
    }
}

