package io.zeebe.broker.transport.controlmessage;

import io.zeebe.broker.Loggers;
import io.zeebe.broker.services.DispatcherSubscriptionNames;
import io.zeebe.broker.transport.clientapi.ErrorResponseWriter;
import io.zeebe.dispatcher.Dispatcher;
import io.zeebe.dispatcher.FragmentHandler;
import io.zeebe.dispatcher.Subscription;
import io.zeebe.protocol.clientapi.ControlMessageRequestDecoder;
import io.zeebe.protocol.clientapi.ControlMessageType;
import io.zeebe.protocol.clientapi.ErrorCode;
import io.zeebe.protocol.clientapi.MessageHeaderDecoder;
import io.zeebe.protocol.impl.BrokerEventMetadata;
import io.zeebe.transport.ServerOutput;
import io.zeebe.transport.ServerResponse;
import io.zeebe.util.actor.Actor;
import io.zeebe.util.actor.ActorReference;
import io.zeebe.util.actor.ActorScheduler;
import io.zeebe.util.state.SimpleStateMachineContext;
import io.zeebe.util.state.State;
import io.zeebe.util.state.StateMachine;
import io.zeebe.util.state.StateMachineAgent;
import io.zeebe.util.state.TransitionState;
import io.zeebe.util.state.WaitState;
import io.zeebe.util.time.ClockUtil;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.agrona.DirectBuffer;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.concurrent.UnsafeBuffer;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/broker/transport/controlmessage/ControlMessageHandlerManager.class */
public class ControlMessageHandlerManager implements Actor {
    public static final Logger LOG = Loggers.TRANSPORT_LOGGER;
    protected static final String NAME = "control.message.handler";
    protected static final int TRANSITION_DEFAULT = 0;
    protected static final int TRANSITION_OPEN = 1;
    protected static final int TRANSITION_CLOSE = 2;
    protected static final int TRANSITION_PROCESS = 3;
    protected static final int TRANSITION_FAILED = 4;
    protected final ActorScheduler actorScheduler;
    protected ActorReference actorRef;
    protected final Dispatcher controlMessageDispatcher;
    protected Subscription subscription;
    protected final ErrorResponseWriter errorResponseWriter;
    protected final long requestTimeoutInMillis;
    protected final State<Context> openingState = new OpeningState();
    protected final State<Context> openedState = new OpenedState();
    protected final State<Context> processingState = new ProcessingState();
    protected final State<Context> processingFailedState = new ProcessingFailedState();
    protected final State<Context> closedState = new ClosedState();
    protected final StateMachineAgent<Context> stateMachineAgent = new StateMachineAgent<>(StateMachine.builder(stateMachine -> {
        return new Context(stateMachine);
    }).initialState(this.closedState).from(this.openingState).take(0).to(this.openedState).from(this.openedState).take(3).to(this.processingState).from(this.openedState).take(4).to(this.processingFailedState).from(this.openedState).take(2).to(this.closedState).from(this.processingState).take(0).to(this.openedState).from(this.processingFailedState).take(0).to(this.openedState).from(this.closedState).take(1).to(this.openingState).build());
    protected final AtomicBoolean isRunning = new AtomicBoolean(false);
    protected final ControlMessageRequestHeaderDescriptor requestHeaderDescriptor = new ControlMessageRequestHeaderDescriptor();
    protected final MessageHeaderDecoder messageHeaderDecoder = new MessageHeaderDecoder();
    protected final ControlMessageRequestDecoder requestDecoder = new ControlMessageRequestDecoder();
    protected final UnsafeBuffer requestBuffer = new UnsafeBuffer(new byte[32768]);
    protected final Int2ObjectHashMap<ControlMessageHandler> handlersByTypeId = new Int2ObjectHashMap<>();
    protected final BrokerEventMetadata eventMetada = new BrokerEventMetadata();
    protected final ServerResponse response = new ServerResponse();

    /* loaded from: input_file:io/zeebe/broker/transport/controlmessage/ControlMessageHandlerManager$ClosedState.class */
    class ClosedState implements WaitState<Context> {
        ClosedState() {
        }

        public void work(Context context) throws Exception {
            if (ControlMessageHandlerManager.this.isRunning.compareAndSet(true, false)) {
                context.completeOpenCloseFuture();
                ControlMessageHandlerManager.this.actorRef.close();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/zeebe/broker/transport/controlmessage/ControlMessageHandlerManager$Context.class */
    public static class Context extends SimpleStateMachineContext {
        private CompletableFuture<Void> openClosefuture;
        private CompletableFuture<Void> processingFuture;
        private long processingStartTime;
        private ControlMessageType lastRequestMessageType;

        Context(StateMachine<Context> stateMachine) {
            super(stateMachine);
            this.processingStartTime = -1L;
        }

        public void lastRequestMessageType(ControlMessageType controlMessageType) {
            this.lastRequestMessageType = controlMessageType;
        }

        public void scheduledProcessing(CompletableFuture<Void> completableFuture, long j) {
            this.processingFuture = completableFuture;
            this.processingStartTime = j;
        }

        public CompletableFuture<Void> getProcessingFuture() {
            return this.processingFuture;
        }

        public long getProcessingStartTime() {
            return this.processingStartTime;
        }

        public void setOpenCloseFuture(CompletableFuture<Void> completableFuture) {
            this.openClosefuture = completableFuture;
        }

        public void completeOpenCloseFuture() {
            if (this.openClosefuture != null) {
                this.openClosefuture.complete(null);
                this.openClosefuture = null;
            }
        }

        public ControlMessageType getLastRequestMessageType() {
            return this.lastRequestMessageType;
        }
    }

    /* loaded from: input_file:io/zeebe/broker/transport/controlmessage/ControlMessageHandlerManager$OpenedState.class */
    class OpenedState implements State<Context>, FragmentHandler {
        private Context context;

        OpenedState() {
        }

        public int doWork(Context context) throws Exception {
            this.context = context;
            int i = 0;
            if (ControlMessageHandlerManager.this.subscription.poll(this, 1) > 0) {
                i = 0 + 1;
            }
            return i;
        }

        public int onFragment(DirectBuffer directBuffer, int i, int i2, int i3, boolean z) {
            ControlMessageHandlerManager.this.requestHeaderDescriptor.wrap(directBuffer, i);
            ControlMessageHandlerManager.this.eventMetada.reset();
            ControlMessageHandlerManager.this.eventMetada.requestId(ControlMessageHandlerManager.this.requestHeaderDescriptor.requestId()).requestStreamId(ControlMessageHandlerManager.this.requestHeaderDescriptor.streamId());
            int headerLength = i + ControlMessageRequestHeaderDescriptor.headerLength();
            ControlMessageHandlerManager.this.messageHeaderDecoder.wrap(ControlMessageHandlerManager.this.requestBuffer, 0);
            ControlMessageHandlerManager.this.requestDecoder.wrap(directBuffer, headerLength + ControlMessageHandlerManager.this.messageHeaderDecoder.encodedLength(), ControlMessageHandlerManager.this.requestDecoder.sbeBlockLength(), ControlMessageHandlerManager.this.requestDecoder.sbeSchemaVersion());
            ControlMessageType messageType = ControlMessageHandlerManager.this.requestDecoder.messageType();
            this.context.lastRequestMessageType(messageType);
            int partitionId = ControlMessageHandlerManager.this.requestDecoder.partitionId();
            ensureBufferCapacity(ControlMessageHandlerManager.this.requestDecoder.dataLength());
            ControlMessageHandlerManager.this.requestDecoder.getData(ControlMessageHandlerManager.this.requestBuffer, 0, ControlMessageHandlerManager.this.requestDecoder.dataLength());
            ControlMessageHandler controlMessageHandler = (ControlMessageHandler) ControlMessageHandlerManager.this.handlersByTypeId.get(messageType.value());
            if (controlMessageHandler == null) {
                this.context.take(4);
                return 0;
            }
            this.context.scheduledProcessing(controlMessageHandler.handle(partitionId, ControlMessageHandlerManager.this.requestBuffer, ControlMessageHandlerManager.this.eventMetada), ClockUtil.getCurrentTimeInMillis());
            this.context.take(3);
            return 0;
        }

        protected void ensureBufferCapacity(int i) {
            byte[] byteArray = ControlMessageHandlerManager.this.requestBuffer.byteArray();
            if (i <= byteArray.length) {
                Arrays.fill(byteArray, (byte) 0);
            } else {
                byteArray = new byte[i];
            }
            ControlMessageHandlerManager.this.requestBuffer.wrap(byteArray, 0, i);
        }
    }

    /* loaded from: input_file:io/zeebe/broker/transport/controlmessage/ControlMessageHandlerManager$OpeningState.class */
    class OpeningState implements TransitionState<Context> {
        OpeningState() {
        }

        public void work(Context context) throws Exception {
            ControlMessageHandlerManager.this.subscription = ControlMessageHandlerManager.this.controlMessageDispatcher.getSubscriptionByName(DispatcherSubscriptionNames.TRANSPORT_CONTROL_MESSAGE_HANDLER_SUBSCRIPTION);
            context.take(0);
            context.completeOpenCloseFuture();
        }
    }

    /* loaded from: input_file:io/zeebe/broker/transport/controlmessage/ControlMessageHandlerManager$ProcessingFailedState.class */
    class ProcessingFailedState implements TransitionState<Context> {
        ProcessingFailedState() {
        }

        public void work(Context context) throws Exception {
            ControlMessageHandlerManager.this.errorResponseWriter.errorCode(ErrorCode.MESSAGE_NOT_SUPPORTED).errorMessage("Cannot handle control message with type '%s'.", context.getLastRequestMessageType().name()).failedRequest(ControlMessageHandlerManager.this.requestBuffer, 0, ControlMessageHandlerManager.this.requestBuffer.capacity()).tryWriteResponseOrLogFailure(ControlMessageHandlerManager.this.eventMetada.getRequestStreamId(), ControlMessageHandlerManager.this.eventMetada.getRequestId());
            context.take(0);
        }
    }

    /* loaded from: input_file:io/zeebe/broker/transport/controlmessage/ControlMessageHandlerManager$ProcessingState.class */
    class ProcessingState implements WaitState<Context> {
        ProcessingState() {
        }

        public void work(Context context) throws Exception {
            CompletableFuture<Void> processingFuture = context.getProcessingFuture();
            long processingStartTime = context.getProcessingStartTime();
            if (processingFuture.isDone()) {
                logExceptionIfAny(processingFuture);
                context.take(0);
            } else if (hasTimeout(processingStartTime)) {
                ControlMessageHandlerManager.this.errorResponseWriter.errorCode(ErrorCode.REQUEST_TIMEOUT).errorMessage("Timeout while handle control message.").failedRequest(ControlMessageHandlerManager.this.requestBuffer, 0, ControlMessageHandlerManager.this.requestBuffer.capacity()).tryWriteResponseOrLogFailure(ControlMessageHandlerManager.this.eventMetada.getRequestStreamId(), ControlMessageHandlerManager.this.eventMetada.getRequestId());
                context.take(0);
            }
        }

        private void logExceptionIfAny(CompletableFuture<Void> completableFuture) throws InterruptedException {
            if (completableFuture.isCompletedExceptionally()) {
                try {
                    completableFuture.get();
                } catch (ExecutionException e) {
                    ControlMessageHandlerManager.LOG.error("Could not process control message request successfully. A response may not be sent.", e.getCause());
                }
            }
        }

        protected boolean hasTimeout(long j) {
            return ClockUtil.getCurrentTimeInMillis() >= j + ControlMessageHandlerManager.this.requestTimeoutInMillis;
        }
    }

    public ControlMessageHandlerManager(ServerOutput serverOutput, Dispatcher dispatcher, long j, ActorScheduler actorScheduler, List<ControlMessageHandler> list) {
        this.actorScheduler = actorScheduler;
        this.controlMessageDispatcher = dispatcher;
        this.requestTimeoutInMillis = j;
        this.errorResponseWriter = new ErrorResponseWriter(serverOutput);
        for (ControlMessageHandler controlMessageHandler : list) {
            this.handlersByTypeId.put(controlMessageHandler.getMessageType().value(), controlMessageHandler);
        }
    }

    public String name() {
        return NAME;
    }

    public int doWork() {
        return this.stateMachineAgent.doWork();
    }

    public CompletableFuture<Void> openAsync() {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.stateMachineAgent.addCommand(context -> {
            if (context.tryTake(1)) {
                context.setOpenCloseFuture(completableFuture);
            } else {
                completableFuture.completeExceptionally(new IllegalStateException("Cannot open control message handler."));
            }
        });
        if (this.isRunning.compareAndSet(false, true)) {
            try {
                this.actorRef = this.actorScheduler.schedule(this);
            } catch (Exception e) {
                this.isRunning.set(false);
                completableFuture.completeExceptionally(e);
            }
        }
        return completableFuture;
    }

    public CompletableFuture<Void> closeAsync() {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.stateMachineAgent.addCommand(context -> {
            if (context.tryTake(2)) {
                context.setOpenCloseFuture(completableFuture);
            } else {
                completableFuture.complete(null);
            }
        });
        return completableFuture;
    }

    public boolean isOpen() {
        return this.stateMachineAgent.getCurrentState() == this.openedState || this.stateMachineAgent.getCurrentState() == this.processingState || this.stateMachineAgent.getCurrentState() == this.processingFailedState;
    }

    public boolean isClosed() {
        return this.stateMachineAgent.getCurrentState() == this.closedState;
    }
}
