/*
 * Decompiled with CFR 0.152.
 */
package io.zeebe.broker.event.processor;

import io.zeebe.broker.event.processor.TopicSubscriberEvent;
import io.zeebe.broker.event.processor.TopicSubscriberState;
import io.zeebe.broker.event.processor.TopicSubscriptionManagementProcessor;
import io.zeebe.broker.event.processor.TopicSubscriptionPushProcessor;
import io.zeebe.logstreams.log.LogStreamWriter;
import io.zeebe.logstreams.log.LoggedEvent;
import io.zeebe.logstreams.processor.EventProcessor;
import io.zeebe.protocol.impl.BrokerEventMetadata;
import io.zeebe.util.buffer.BufferWriter;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.agrona.DirectBuffer;

public class SubscribeProcessor
implements EventProcessor {
    protected final int maximumNameLength;
    protected final TopicSubscriptionManagementProcessor manager;
    protected LoggedEvent event;
    protected BrokerEventMetadata metadata;
    protected TopicSubscriberEvent subscriberEvent;
    protected EventProcessor state;
    protected final RequestFailureProcessor failedRequestState = new RequestFailureProcessor();
    protected final CreateSubscriptionServiceProcessor createProcessorState = new CreateSubscriptionServiceProcessor();
    protected final AwaitSubscriptionServiceProcessor awaitProcessorState = new AwaitSubscriptionServiceProcessor();
    protected final SubscriptionServiceSuccessProcessor successState = new SubscriptionServiceSuccessProcessor();

    public SubscribeProcessor(int maximumNameLength, TopicSubscriptionManagementProcessor manager) {
        this.maximumNameLength = maximumNameLength;
        this.manager = manager;
    }

    public void wrap(LoggedEvent event, BrokerEventMetadata metadata, TopicSubscriberEvent subscriberEvent) {
        this.event = event;
        this.metadata = metadata;
        this.subscriberEvent = subscriberEvent;
    }

    public void processEvent() {
        DirectBuffer subscriptionName = this.subscriberEvent.getName();
        if (subscriptionName.capacity() > this.maximumNameLength) {
            this.failedRequestState.wrapError("Cannot open topic subscription " + this.subscriberEvent.getNameAsString() + ". Subscription name must be " + this.maximumNameLength + " characters or shorter.");
            this.state = this.failedRequestState;
            return;
        }
        this.state = this.createProcessorState;
    }

    public long writeEvent(LogStreamWriter writer) {
        return this.state.writeEvent(writer);
    }

    public boolean executeSideEffects() {
        return this.state.executeSideEffects();
    }

    protected class SubscriptionServiceSuccessProcessor
    implements EventProcessor {
        protected TopicSubscriptionPushProcessor processor;

        protected SubscriptionServiceSuccessProcessor() {
        }

        public void wrap(TopicSubscriptionPushProcessor processor) {
            this.processor = processor;
        }

        public void processEvent() {
        }

        public boolean executeSideEffects() {
            SubscribeProcessor.this.manager.registerPushProcessor(this.processor);
            return true;
        }

        public long writeEvent(LogStreamWriter writer) {
            SubscribeProcessor.this.metadata.protocolVersion(1).raftTermId(SubscribeProcessor.this.manager.getTargetStream().getTerm());
            SubscribeProcessor.this.subscriberEvent.setStartPosition(this.processor.getStartPosition()).setState(TopicSubscriberState.SUBSCRIBED);
            return writer.metadataWriter((BufferWriter)SubscribeProcessor.this.metadata).valueWriter((BufferWriter)SubscribeProcessor.this.subscriberEvent).key(SubscribeProcessor.this.event.getKey()).tryWrite();
        }
    }

    protected class AwaitSubscriptionServiceProcessor
    implements EventProcessor {
        protected CompletableFuture<TopicSubscriptionPushProcessor> processorFuture;

        protected AwaitSubscriptionServiceProcessor() {
        }

        public void wrap(CompletableFuture<TopicSubscriptionPushProcessor> processorFuture) {
            this.processorFuture = processorFuture;
        }

        public void processEvent() {
        }

        public boolean executeSideEffects() {
            if (!this.processorFuture.isDone()) {
                return false;
            }
            try {
                TopicSubscriptionPushProcessor processor = this.processorFuture.get();
                SubscribeProcessor.this.successState.wrap(processor);
                SubscribeProcessor.this.state = SubscribeProcessor.this.successState;
                return false;
            }
            catch (CancellationException | ExecutionException e) {
                String errorMessage = e.getMessage();
                SubscribeProcessor.this.failedRequestState.wrapError(errorMessage);
                SubscribeProcessor.this.state = SubscribeProcessor.this.failedRequestState;
                return false;
            }
            catch (InterruptedException e) {
                throw new RuntimeException("Unexpected exception", e);
            }
        }
    }

    protected class CreateSubscriptionServiceProcessor
    implements EventProcessor {
        protected CreateSubscriptionServiceProcessor() {
        }

        public void processEvent() {
        }

        public boolean executeSideEffects() {
            DirectBuffer subscriptionName = SubscribeProcessor.this.subscriberEvent.getName();
            long resumePosition = SubscribeProcessor.this.manager.determineResumePosition(subscriptionName, SubscribeProcessor.this.subscriberEvent.getStartPosition(), SubscribeProcessor.this.subscriberEvent.getForceStart());
            CompletableFuture<TopicSubscriptionPushProcessor> processorFuture = SubscribeProcessor.this.manager.openPushProcessorAsync(SubscribeProcessor.this.metadata.getRequestStreamId(), SubscribeProcessor.this.event.getKey(), resumePosition, subscriptionName, SubscribeProcessor.this.subscriberEvent.getPrefetchCapacity());
            SubscribeProcessor.this.awaitProcessorState.wrap(processorFuture);
            SubscribeProcessor.this.state = SubscribeProcessor.this.awaitProcessorState;
            return false;
        }
    }

    protected class RequestFailureProcessor
    implements EventProcessor {
        protected String error;

        protected RequestFailureProcessor() {
        }

        public void wrapError(String error) {
            this.error = error;
        }

        public void processEvent() {
        }

        public boolean executeSideEffects() {
            return SubscribeProcessor.this.manager.writeRequestResponseError(SubscribeProcessor.this.metadata, SubscribeProcessor.this.event, this.error);
        }

        public long writeEvent(LogStreamWriter writer) {
            return 0L;
        }
    }
}

