/*
 * Decompiled with CFR 0.152.
 */
package io.zeebe.broker.transport.controlmessage;

import io.zeebe.broker.transport.clientapi.ErrorResponseWriter;
import io.zeebe.broker.transport.controlmessage.ControlMessageHandler;
import io.zeebe.broker.transport.controlmessage.ControlMessageRequestHeaderDescriptor;
import io.zeebe.dispatcher.Dispatcher;
import io.zeebe.dispatcher.FragmentHandler;
import io.zeebe.dispatcher.Subscription;
import io.zeebe.protocol.clientapi.ControlMessageRequestDecoder;
import io.zeebe.protocol.clientapi.ControlMessageType;
import io.zeebe.protocol.clientapi.ErrorCode;
import io.zeebe.protocol.clientapi.MessageHeaderDecoder;
import io.zeebe.protocol.impl.BrokerEventMetadata;
import io.zeebe.transport.ServerOutput;
import io.zeebe.transport.ServerResponse;
import io.zeebe.util.actor.Actor;
import io.zeebe.util.actor.ActorReference;
import io.zeebe.util.actor.ActorScheduler;
import io.zeebe.util.state.SimpleStateMachineContext;
import io.zeebe.util.state.State;
import io.zeebe.util.state.StateMachine;
import io.zeebe.util.state.StateMachineAgent;
import io.zeebe.util.state.TransitionState;
import io.zeebe.util.state.WaitState;
import io.zeebe.util.time.ClockUtil;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.concurrent.UnsafeBuffer;

public class ControlMessageHandlerManager
implements Actor {
    protected static final String NAME = "control.message.handler";
    protected static final int TRANSITION_DEFAULT = 0;
    protected static final int TRANSITION_OPEN = 1;
    protected static final int TRANSITION_CLOSE = 2;
    protected static final int TRANSITION_PROCESS = 3;
    protected static final int TRANSITION_FAILED = 4;
    protected final State<Context> openingState = new OpeningState();
    protected final State<Context> openedState = new OpenedState();
    protected final State<Context> processingState = new ProcessingState();
    protected final State<Context> processingFailedState = new ProcessingFailedState();
    protected final State<Context> closedState = new ClosedState();
    protected final StateMachineAgent<Context> stateMachineAgent = new StateMachineAgent(StateMachine.builder(s -> new Context((StateMachine<Context>)s)).initialState(this.closedState).from(this.openingState).take(0).to(this.openedState).from(this.openedState).take(3).to(this.processingState).from(this.openedState).take(4).to(this.processingFailedState).from(this.openedState).take(2).to(this.closedState).from(this.processingState).take(0).to(this.openedState).from(this.processingFailedState).take(0).to(this.openedState).from(this.closedState).take(1).to(this.openingState).build());
    protected final ActorScheduler actorScheduler;
    protected final AtomicBoolean isRunning = new AtomicBoolean(false);
    protected ActorReference actorRef;
    protected final ControlMessageRequestHeaderDescriptor requestHeaderDescriptor = new ControlMessageRequestHeaderDescriptor();
    protected final MessageHeaderDecoder messageHeaderDecoder = new MessageHeaderDecoder();
    protected final ControlMessageRequestDecoder requestDecoder = new ControlMessageRequestDecoder();
    protected final UnsafeBuffer requestBuffer = new UnsafeBuffer(new byte[32768]);
    protected final Dispatcher controlMessageDispatcher;
    protected Subscription subscription;
    protected final Int2ObjectHashMap<ControlMessageHandler> handlersByTypeId = new Int2ObjectHashMap();
    protected final ErrorResponseWriter errorResponseWriter;
    protected final BrokerEventMetadata eventMetada = new BrokerEventMetadata();
    protected final ServerResponse response = new ServerResponse();
    protected final long requestTimeoutInMillis;

    public ControlMessageHandlerManager(ServerOutput output, Dispatcher controlMessageDispatcher, long requestTimeoutInMillis, ActorScheduler actorScheduler, List<ControlMessageHandler> handlers) {
        this.actorScheduler = actorScheduler;
        this.controlMessageDispatcher = controlMessageDispatcher;
        this.requestTimeoutInMillis = requestTimeoutInMillis;
        this.errorResponseWriter = new ErrorResponseWriter(output);
        for (ControlMessageHandler handler : handlers) {
            ControlMessageType messageType = handler.getMessageType();
            this.handlersByTypeId.put((int)messageType.value(), (Object)handler);
        }
    }

    public String name() {
        return NAME;
    }

    public int doWork() {
        return this.stateMachineAgent.doWork();
    }

    public CompletableFuture<Void> openAsync() {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.stateMachineAgent.addCommand(context -> {
            boolean opening = context.tryTake(1);
            if (opening) {
                context.setOpenCloseFuture(future);
            } else {
                future.completeExceptionally(new IllegalStateException("Cannot open control message handler."));
            }
        });
        if (this.isRunning.compareAndSet(false, true)) {
            try {
                this.actorRef = this.actorScheduler.schedule((Actor)this);
            }
            catch (Exception e) {
                this.isRunning.set(false);
                future.completeExceptionally(e);
            }
        }
        return future;
    }

    public CompletableFuture<Void> closeAsync() {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.stateMachineAgent.addCommand(context -> {
            boolean closing = context.tryTake(2);
            if (closing) {
                context.setOpenCloseFuture(future);
            } else {
                future.complete(null);
            }
        });
        return future;
    }

    public boolean isOpen() {
        return this.stateMachineAgent.getCurrentState() == this.openedState || this.stateMachineAgent.getCurrentState() == this.processingState || this.stateMachineAgent.getCurrentState() == this.processingFailedState;
    }

    public boolean isClosed() {
        return this.stateMachineAgent.getCurrentState() == this.closedState;
    }

    static class Context
    extends SimpleStateMachineContext {
        private CompletableFuture<Void> openClosefuture;
        private CompletableFuture<Void> processingFuture;
        private long processingStartTime = -1L;
        private ControlMessageType lastRequestMessageType;

        Context(StateMachine<Context> stateMachine) {
            super(stateMachine);
        }

        public void lastRequestMessageType(ControlMessageType messageType) {
            this.lastRequestMessageType = messageType;
        }

        public void scheduledProcessing(CompletableFuture<Void> future, long startTime) {
            this.processingFuture = future;
            this.processingStartTime = startTime;
        }

        public CompletableFuture<Void> getProcessingFuture() {
            return this.processingFuture;
        }

        public long getProcessingStartTime() {
            return this.processingStartTime;
        }

        public void setOpenCloseFuture(CompletableFuture<Void> future) {
            this.openClosefuture = future;
        }

        public void completeOpenCloseFuture() {
            if (this.openClosefuture != null) {
                this.openClosefuture.complete(null);
                this.openClosefuture = null;
            }
        }

        public ControlMessageType getLastRequestMessageType() {
            return this.lastRequestMessageType;
        }
    }

    class ClosedState
    implements WaitState<Context> {
        ClosedState() {
        }

        public void work(Context context) throws Exception {
            if (ControlMessageHandlerManager.this.isRunning.compareAndSet(true, false)) {
                context.completeOpenCloseFuture();
                ControlMessageHandlerManager.this.actorRef.close();
            }
        }
    }

    class ProcessingFailedState
    implements TransitionState<Context> {
        ProcessingFailedState() {
        }

        public void work(Context context) throws Exception {
            boolean success = ControlMessageHandlerManager.this.errorResponseWriter.errorCode(ErrorCode.MESSAGE_NOT_SUPPORTED).errorMessage("Cannot handle control message with type '%s'.", context.getLastRequestMessageType().name()).failedRequest((DirectBuffer)ControlMessageHandlerManager.this.requestBuffer, 0, ControlMessageHandlerManager.this.requestBuffer.capacity()).tryWriteResponseOrLogFailure(ControlMessageHandlerManager.this.eventMetada.getRequestStreamId(), ControlMessageHandlerManager.this.eventMetada.getRequestId());
            context.take(0);
        }
    }

    class ProcessingState
    implements WaitState<Context> {
        ProcessingState() {
        }

        public void work(Context context) throws Exception {
            CompletableFuture<Void> future = context.getProcessingFuture();
            long startTime = context.getProcessingStartTime();
            if (future.isDone()) {
                context.take(0);
            } else if (this.hasTimeout(startTime)) {
                boolean success = ControlMessageHandlerManager.this.errorResponseWriter.errorCode(ErrorCode.REQUEST_TIMEOUT).errorMessage("Timeout while handle control message.").failedRequest((DirectBuffer)ControlMessageHandlerManager.this.requestBuffer, 0, ControlMessageHandlerManager.this.requestBuffer.capacity()).tryWriteResponseOrLogFailure(ControlMessageHandlerManager.this.eventMetada.getRequestStreamId(), ControlMessageHandlerManager.this.eventMetada.getRequestId());
                context.take(0);
            }
        }

        protected boolean hasTimeout(long startTime) {
            return ClockUtil.getCurrentTimeInMillis() >= startTime + ControlMessageHandlerManager.this.requestTimeoutInMillis;
        }
    }

    class OpenedState
    implements State<Context>,
    FragmentHandler {
        private Context context;

        OpenedState() {
        }

        public int doWork(Context context) throws Exception {
            this.context = context;
            int workCount = 0;
            int result = ControlMessageHandlerManager.this.subscription.poll((FragmentHandler)this, 1);
            if (result > 0) {
                ++workCount;
            }
            return workCount;
        }

        public int onFragment(DirectBuffer buffer, int offset, int length, int streamId, boolean isMarkedFailed) {
            ControlMessageHandlerManager.this.requestHeaderDescriptor.wrap(buffer, offset);
            ControlMessageHandlerManager.this.eventMetada.reset();
            ControlMessageHandlerManager.this.eventMetada.requestId(ControlMessageHandlerManager.this.requestHeaderDescriptor.requestId()).requestStreamId(ControlMessageHandlerManager.this.requestHeaderDescriptor.streamId());
            offset += ControlMessageRequestHeaderDescriptor.headerLength();
            ControlMessageHandlerManager.this.messageHeaderDecoder.wrap((DirectBuffer)ControlMessageHandlerManager.this.requestBuffer, 0);
            ControlMessageHandlerManager.this.requestDecoder.wrap(buffer, offset += ControlMessageHandlerManager.this.messageHeaderDecoder.encodedLength(), ControlMessageHandlerManager.this.requestDecoder.sbeBlockLength(), ControlMessageHandlerManager.this.requestDecoder.sbeSchemaVersion());
            ControlMessageType messageType = ControlMessageHandlerManager.this.requestDecoder.messageType();
            this.context.lastRequestMessageType(messageType);
            this.ensureBufferCapacity(ControlMessageHandlerManager.this.requestDecoder.dataLength());
            ControlMessageHandlerManager.this.requestDecoder.getData((MutableDirectBuffer)ControlMessageHandlerManager.this.requestBuffer, 0, ControlMessageHandlerManager.this.requestDecoder.dataLength());
            ControlMessageHandler handler = (ControlMessageHandler)ControlMessageHandlerManager.this.handlersByTypeId.get((int)messageType.value());
            if (handler != null) {
                CompletableFuture<Void> future = handler.handle((DirectBuffer)ControlMessageHandlerManager.this.requestBuffer, ControlMessageHandlerManager.this.eventMetada);
                long startTime = ClockUtil.getCurrentTimeInMillis();
                this.context.scheduledProcessing(future, startTime);
                this.context.take(3);
            } else {
                this.context.take(4);
            }
            return 0;
        }

        protected void ensureBufferCapacity(int length) {
            byte[] raw = ControlMessageHandlerManager.this.requestBuffer.byteArray();
            if (length <= raw.length) {
                Arrays.fill(raw, (byte)0);
            } else {
                raw = new byte[length];
            }
            ControlMessageHandlerManager.this.requestBuffer.wrap(raw, 0, length);
        }
    }

    class OpeningState
    implements TransitionState<Context> {
        OpeningState() {
        }

        public void work(Context context) throws Exception {
            ControlMessageHandlerManager.this.subscription = ControlMessageHandlerManager.this.controlMessageDispatcher.getSubscriptionByName("control-message-handler");
            context.take(0);
            context.completeOpenCloseFuture();
        }
    }
}

