/*
 * Decompiled with CFR 0.152.
 */
package io.zeebe.broker.transport.clientapi;

import io.zeebe.broker.event.processor.TopicSubscriberEvent;
import io.zeebe.broker.event.processor.TopicSubscriptionEvent;
import io.zeebe.broker.task.data.TaskEvent;
import io.zeebe.broker.transport.clientapi.ErrorResponseWriter;
import io.zeebe.broker.transport.controlmessage.ControlMessageRequestHeaderDescriptor;
import io.zeebe.broker.workflow.data.DeploymentEvent;
import io.zeebe.broker.workflow.data.WorkflowInstanceEvent;
import io.zeebe.dispatcher.ClaimedFragment;
import io.zeebe.dispatcher.Dispatcher;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.log.LogStreamWriter;
import io.zeebe.logstreams.log.LogStreamWriterImpl;
import io.zeebe.msgpack.UnpackedObject;
import io.zeebe.protocol.clientapi.ErrorCode;
import io.zeebe.protocol.clientapi.EventType;
import io.zeebe.protocol.clientapi.ExecuteCommandRequestDecoder;
import io.zeebe.protocol.clientapi.MessageHeaderDecoder;
import io.zeebe.protocol.impl.BrokerEventMetadata;
import io.zeebe.transport.RemoteAddress;
import io.zeebe.transport.ServerMessageHandler;
import io.zeebe.transport.ServerOutput;
import io.zeebe.transport.ServerRequestHandler;
import io.zeebe.util.buffer.BufferUtil;
import io.zeebe.util.buffer.BufferWriter;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.concurrent.ManyToOneConcurrentArrayQueue;
import org.agrona.concurrent.UnsafeBuffer;

public class ClientApiMessageHandler
implements ServerMessageHandler,
ServerRequestHandler {
    protected final MessageHeaderDecoder messageHeaderDecoder = new MessageHeaderDecoder();
    protected final ExecuteCommandRequestDecoder executeCommandRequestDecoder = new ExecuteCommandRequestDecoder();
    protected final ControlMessageRequestHeaderDescriptor controlMessageRequestHeaderDescriptor = new ControlMessageRequestHeaderDescriptor();
    protected final DirectBuffer topicName = new UnsafeBuffer(0L, 0);
    protected final ManyToOneConcurrentArrayQueue<Runnable> cmdQueue = new ManyToOneConcurrentArrayQueue(100);
    protected final Consumer<Runnable> cmdConsumer = c -> c.run();
    protected final Map<DirectBuffer, Int2ObjectHashMap<LogStream>> logStreamsByTopic = new HashMap<DirectBuffer, Int2ObjectHashMap<LogStream>>();
    protected final BrokerEventMetadata eventMetadata = new BrokerEventMetadata();
    protected final LogStreamWriter logStreamWriter = new LogStreamWriterImpl();
    protected final ErrorResponseWriter errorResponseWriter = new ErrorResponseWriter();
    protected final Dispatcher controlMessageDispatcher;
    protected final ClaimedFragment claimedControlMessageFragment = new ClaimedFragment();
    protected final EnumMap<EventType, UnpackedObject> eventsByType = new EnumMap(EventType.class);

    public ClientApiMessageHandler(Dispatcher controlMessageDispatcher) {
        this.controlMessageDispatcher = controlMessageDispatcher;
        this.initEventTypeMap();
    }

    private void initEventTypeMap() {
        this.eventsByType.put(EventType.DEPLOYMENT_EVENT, new DeploymentEvent());
        this.eventsByType.put(EventType.TASK_EVENT, new TaskEvent());
        this.eventsByType.put(EventType.WORKFLOW_INSTANCE_EVENT, new WorkflowInstanceEvent());
        this.eventsByType.put(EventType.SUBSCRIBER_EVENT, new TopicSubscriberEvent());
        this.eventsByType.put(EventType.SUBSCRIPTION_EVENT, new TopicSubscriptionEvent());
    }

    private boolean handleExecuteCommandRequest(ServerOutput output, RemoteAddress requestAddress, long requestId, BrokerEventMetadata eventMetadata, DirectBuffer buffer, int messageOffset, int messageLength) {
        this.executeCommandRequestDecoder.wrap(buffer, messageOffset + this.messageHeaderDecoder.encodedLength(), this.messageHeaderDecoder.blockLength(), this.messageHeaderDecoder.version());
        int topicNameOffset = this.executeCommandRequestDecoder.limit() + ExecuteCommandRequestDecoder.topicNameHeaderLength();
        int topicNameLength = this.executeCommandRequestDecoder.topicNameLength();
        this.topicName.wrap(buffer, topicNameOffset, topicNameLength);
        this.executeCommandRequestDecoder.limit(topicNameOffset + topicNameLength);
        int partitionId = this.executeCommandRequestDecoder.partitionId();
        long key = this.executeCommandRequestDecoder.key();
        LogStream logStream = this.getLogStream(this.topicName, partitionId);
        if (logStream == null) {
            return this.errorResponseWriter.errorCode(ErrorCode.TOPIC_NOT_FOUND).errorMessage("Cannot execute command. Topic with name '%s' and partition id '%d' not found", BufferUtil.bufferAsString((DirectBuffer)this.topicName), partitionId).failedRequest(buffer, messageOffset, messageLength).tryWriteResponseOrLogFailure(output, requestAddress.getStreamId(), requestId);
        }
        EventType eventType = this.executeCommandRequestDecoder.eventType();
        UnpackedObject event = this.eventsByType.get(eventType);
        if (event == null) {
            return this.errorResponseWriter.errorCode(ErrorCode.MESSAGE_NOT_SUPPORTED).errorMessage("Cannot execute command. Invalid event type '%s'.", eventType.name()).failedRequest(buffer, messageOffset, messageLength).tryWriteResponseOrLogFailure(output, requestAddress.getStreamId(), requestId);
        }
        int eventOffset = this.executeCommandRequestDecoder.limit() + ExecuteCommandRequestDecoder.commandHeaderLength();
        int eventLength = this.executeCommandRequestDecoder.commandLength();
        event.reset();
        try {
            event.wrap(buffer, eventOffset, eventLength);
        }
        catch (Throwable t) {
            return this.errorResponseWriter.errorCode(ErrorCode.INVALID_MESSAGE).errorMessage("Cannot deserialize command: '%s'.", this.concatErrorMessages(t)).failedRequest(buffer, messageOffset, messageLength).tryWriteResponseOrLogFailure(output, requestAddress.getStreamId(), requestId);
        }
        eventMetadata.eventType(eventType);
        eventMetadata.raftTermId(logStream.getTerm());
        this.logStreamWriter.wrap(logStream);
        if (key != ExecuteCommandRequestDecoder.keyNullValue()) {
            this.logStreamWriter.key(key);
        } else {
            this.logStreamWriter.positionAsKey();
        }
        long eventPosition = this.logStreamWriter.metadataWriter((BufferWriter)eventMetadata).value(buffer, eventOffset, eventLength).tryWrite();
        return eventPosition >= 0L;
    }

    private String concatErrorMessages(Throwable t) {
        StringBuilder sb = new StringBuilder();
        sb.append(t.getMessage());
        while (t.getCause() != null) {
            t = t.getCause();
            sb.append("; ");
            sb.append(t.getMessage());
        }
        return sb.toString();
    }

    private LogStream getLogStream(DirectBuffer topicName, int partitionId) {
        Int2ObjectHashMap<LogStream> logStreamPartitions = this.logStreamsByTopic.get(topicName);
        if (logStreamPartitions != null) {
            return (LogStream)logStreamPartitions.get(partitionId);
        }
        return null;
    }

    private boolean handleControlMessageRequest(BrokerEventMetadata eventMetadata, DirectBuffer buffer, int messageOffset, int messageLength) {
        long publishPosition;
        boolean isHandled = false;
        while ((publishPosition = this.controlMessageDispatcher.claim(this.claimedControlMessageFragment, ControlMessageRequestHeaderDescriptor.framedLength(messageLength))) == -2L) {
        }
        if (publishPosition >= 0L) {
            MutableDirectBuffer writeBuffer = this.claimedControlMessageFragment.getBuffer();
            int writeBufferOffset = this.claimedControlMessageFragment.getOffset();
            this.controlMessageRequestHeaderDescriptor.wrap((DirectBuffer)writeBuffer, writeBufferOffset).streamId(eventMetadata.getRequestStreamId()).requestId(eventMetadata.getRequestId());
            writeBuffer.putBytes(writeBufferOffset += ControlMessageRequestHeaderDescriptor.headerLength(), buffer, messageOffset, messageLength);
            this.claimedControlMessageFragment.commit();
            isHandled = true;
        }
        return isHandled;
    }

    public void addStream(LogStream logStream) {
        this.cmdQueue.add(() -> {
            LogStream cfr_ignored_0 = (LogStream)this.logStreamsByTopic.computeIfAbsent(logStream.getTopicName(), topicName -> new Int2ObjectHashMap()).put(logStream.getPartitionId(), (Object)logStream);
        });
    }

    public void removeStream(LogStream logStream) {
        this.cmdQueue.add(() -> {
            DirectBuffer topicName = logStream.getTopicName();
            int partitionId = logStream.getPartitionId();
            Int2ObjectHashMap<LogStream> logStreamPartitions = this.logStreamsByTopic.get(topicName);
            if (logStreamPartitions != null) {
                logStreamPartitions.remove(partitionId);
                if (logStreamPartitions.isEmpty()) {
                    this.logStreamsByTopic.remove(topicName);
                }
            }
        });
    }

    public boolean onRequest(ServerOutput output, RemoteAddress remoteAddress, DirectBuffer buffer, int offset, int length, long requestId) {
        boolean isHandled;
        this.drainCommandQueue();
        this.messageHeaderDecoder.wrap(buffer, offset);
        int templateId = this.messageHeaderDecoder.templateId();
        int clientVersion = this.messageHeaderDecoder.version();
        if (clientVersion > 1) {
            return this.errorResponseWriter.errorCode(ErrorCode.INVALID_CLIENT_VERSION).errorMessage("Client has newer version than broker (%d > %d)", clientVersion, 1).failedRequest(buffer, offset, length).tryWriteResponse(output, remoteAddress.getStreamId(), requestId);
        }
        this.eventMetadata.reset();
        this.eventMetadata.protocolVersion(clientVersion);
        this.eventMetadata.requestId(requestId);
        this.eventMetadata.requestStreamId(remoteAddress.getStreamId());
        switch (templateId) {
            case 20: {
                isHandled = this.handleExecuteCommandRequest(output, remoteAddress, requestId, this.eventMetadata, buffer, offset, length);
                break;
            }
            case 10: {
                isHandled = this.handleControlMessageRequest(this.eventMetadata, buffer, offset, length);
                break;
            }
            default: {
                isHandled = this.errorResponseWriter.errorCode(ErrorCode.MESSAGE_NOT_SUPPORTED).errorMessage("Cannot handle message. Template id '%d' is not supported.", templateId).failedRequest(buffer, offset, length).tryWriteResponse(output, remoteAddress.getStreamId(), requestId);
            }
        }
        return isHandled;
    }

    public boolean onMessage(ServerOutput output, RemoteAddress remoteAddress, DirectBuffer buffer, int offset, int length) {
        return true;
    }

    private void drainCommandQueue() {
        this.cmdQueue.drain(this.cmdConsumer);
    }
}

