/*
 * Decompiled with CFR 0.152.
 */
package io.zeebe.broker.event.processor;

import io.zeebe.broker.event.TopicSubscriptionServiceNames;
import io.zeebe.broker.event.processor.SubscribeProcessor;
import io.zeebe.broker.event.processor.SubscriptionRegistry;
import io.zeebe.broker.event.processor.TopicSubscriberEvent;
import io.zeebe.broker.event.processor.TopicSubscriberState;
import io.zeebe.broker.event.processor.TopicSubscriptionEvent;
import io.zeebe.broker.event.processor.TopicSubscriptionPushProcessor;
import io.zeebe.broker.event.processor.TopicSubscriptionState;
import io.zeebe.broker.logstreams.LogStreamServiceNames;
import io.zeebe.broker.logstreams.processor.MetadataFilter;
import io.zeebe.broker.logstreams.processor.StreamProcessorService;
import io.zeebe.broker.system.SystemServiceNames;
import io.zeebe.broker.transport.clientapi.CommandResponseWriter;
import io.zeebe.broker.transport.clientapi.ErrorResponseWriter;
import io.zeebe.broker.transport.clientapi.SubscribedEventWriter;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.log.LogStreamWriter;
import io.zeebe.logstreams.log.LoggedEvent;
import io.zeebe.logstreams.processor.EventProcessor;
import io.zeebe.logstreams.processor.StreamProcessor;
import io.zeebe.logstreams.processor.StreamProcessorContext;
import io.zeebe.logstreams.processor.StreamProcessorController;
import io.zeebe.logstreams.snapshot.ZbMapSnapshotSupport;
import io.zeebe.logstreams.spi.SnapshotSupport;
import io.zeebe.map.Bytes2LongZbMap;
import io.zeebe.map.ZbMap;
import io.zeebe.protocol.clientapi.ErrorCode;
import io.zeebe.protocol.clientapi.EventType;
import io.zeebe.protocol.impl.BrokerEventMetadata;
import io.zeebe.servicecontainer.Service;
import io.zeebe.servicecontainer.ServiceName;
import io.zeebe.servicecontainer.ServiceStartContext;
import io.zeebe.util.DeferredCommandContext;
import io.zeebe.util.buffer.BufferReader;
import io.zeebe.util.buffer.BufferWriter;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.agrona.DirectBuffer;

public class TopicSubscriptionManagementProcessor
implements StreamProcessor {
    protected static final int MAXIMUM_SUBSCRIPTION_NAME_LENGTH = 32;
    protected final SnapshotSupport snapshotResource;
    protected LogStream targetStream;
    protected DirectBuffer logStreamTopicName;
    protected int logStreamPartitionId;
    protected final ServiceName<LogStream> streamServiceName;
    protected final SubscriptionRegistry subscriptionRegistry = new SubscriptionRegistry();
    protected final ErrorResponseWriter errorWriter;
    protected final CommandResponseWriter responseWriter;
    protected final Supplier<SubscribedEventWriter> eventWriterFactory;
    protected final ServiceStartContext serviceContext;
    protected final Bytes2LongZbMap ackMap;
    protected DeferredCommandContext cmdContext;
    protected final AckProcessor ackProcessor = new AckProcessor();
    protected final SubscribeProcessor subscribeProcessor = new SubscribeProcessor(32, this);
    protected final SubscribedProcessor subscribedProcessor = new SubscribedProcessor();
    protected final BrokerEventMetadata metadata = new BrokerEventMetadata();
    protected final TopicSubscriptionEvent subscriptionEvent = new TopicSubscriptionEvent();
    protected final TopicSubscriberEvent subscriberEvent = new TopicSubscriberEvent();
    protected LoggedEvent currentEvent;

    public TopicSubscriptionManagementProcessor(ServiceName<LogStream> streamServiceName, CommandResponseWriter responseWriter, ErrorResponseWriter errorWriter, Supplier<SubscribedEventWriter> eventWriterFactory, ServiceStartContext serviceContext) {
        this.streamServiceName = streamServiceName;
        this.responseWriter = responseWriter;
        this.errorWriter = errorWriter;
        this.eventWriterFactory = eventWriterFactory;
        this.serviceContext = serviceContext;
        this.ackMap = new Bytes2LongZbMap(32);
        this.snapshotResource = new ZbMapSnapshotSupport((ZbMap)this.ackMap);
    }

    public void onOpen(StreamProcessorContext context) {
        this.cmdContext = context.getStreamProcessorCmdQueue();
        LogStream sourceStream = context.getSourceStream();
        this.logStreamTopicName = sourceStream.getTopicName();
        this.logStreamPartitionId = sourceStream.getPartitionId();
        this.targetStream = context.getTargetStream();
    }

    public void onClose() {
        this.ackMap.close();
    }

    public SnapshotSupport getStateResource() {
        return this.snapshotResource;
    }

    public LogStream getTargetStream() {
        return this.targetStream;
    }

    public EventProcessor onEvent(LoggedEvent event) {
        this.metadata.reset();
        event.readMetadata((BufferReader)this.metadata);
        this.currentEvent = event;
        if (this.metadata.getEventType() == EventType.SUBSCRIPTION_EVENT) {
            return this.onSubscriptionEvent(event);
        }
        if (this.metadata.getEventType() == EventType.SUBSCRIBER_EVENT) {
            return this.onSubscriberEvent(event);
        }
        return null;
    }

    protected EventProcessor onSubscriberEvent(LoggedEvent event) {
        this.subscriberEvent.reset();
        this.subscriberEvent.wrap(event.getValueBuffer(), event.getValueOffset(), event.getValueLength());
        if (this.subscriberEvent.getState() == TopicSubscriberState.SUBSCRIBE) {
            this.subscribeProcessor.wrap(this.currentEvent, this.metadata, this.subscriberEvent);
            return this.subscribeProcessor;
        }
        if (this.subscriberEvent.getState() == TopicSubscriberState.SUBSCRIBED) {
            return this.subscribedProcessor;
        }
        return null;
    }

    protected EventProcessor onSubscriptionEvent(LoggedEvent event) {
        this.subscriptionEvent.reset();
        this.subscriptionEvent.wrap(event.getValueBuffer(), event.getValueOffset(), event.getValueLength());
        if (this.subscriptionEvent.getState() == TopicSubscriptionState.ACKNOWLEDGE) {
            return this.ackProcessor;
        }
        return null;
    }

    protected void putAck(DirectBuffer subscriptionName, long ackPosition) {
        this.ackMap.put(subscriptionName, 0, subscriptionName.capacity(), ackPosition);
    }

    public CompletableFuture<Void> closePushProcessorAsync(long subscriberKey) {
        return this.cmdContext.runAsync(future -> {
            TopicSubscriptionPushProcessor processor = this.subscriptionRegistry.removeProcessorByKey(subscriberKey);
            if (processor != null) {
                this.closePushProcessor(processor).handle((r, t) -> t == null ? future.complete(null) : future.completeExceptionally((Throwable)t));
            } else {
                future.complete(null);
            }
        });
    }

    protected CompletableFuture<Void> closePushProcessor(TopicSubscriptionPushProcessor processor) {
        ServiceName<StreamProcessorController> subscriptionProcessorName = TopicSubscriptionServiceNames.subscriptionPushServiceName(this.streamServiceName.getName(), processor.getNameAsString());
        return this.serviceContext.removeService(subscriptionProcessorName);
    }

    public long determineResumePosition(DirectBuffer subscriptionName, long startPosition, boolean forceStart) {
        long lastAckedPosition = this.ackMap.get(subscriptionName, 0, subscriptionName.capacity(), -1L);
        if (forceStart) {
            return startPosition;
        }
        if (lastAckedPosition >= 0L) {
            return lastAckedPosition + 1L;
        }
        return startPosition;
    }

    public CompletableFuture<TopicSubscriptionPushProcessor> openPushProcessorAsync(int clientChannelId, long subscriberKey, long resumePosition, DirectBuffer subscriptionName, int prefetchCapacity) {
        TopicSubscriptionPushProcessor processor = new TopicSubscriptionPushProcessor(clientChannelId, subscriberKey, resumePosition, subscriptionName, prefetchCapacity, this.eventWriterFactory.get());
        ServiceName<StreamProcessorController> serviceName = TopicSubscriptionServiceNames.subscriptionPushServiceName(this.streamServiceName.getName(), processor.getNameAsString());
        StreamProcessorService streamProcessorService = new StreamProcessorService(serviceName.getName(), 40, processor).eventFilter(TopicSubscriptionPushProcessor.eventFilter()).readOnly(true);
        return this.serviceContext.createService(serviceName, (Service)streamProcessorService).dependency(this.streamServiceName, streamProcessorService.getSourceStreamInjector()).dependency(this.streamServiceName, streamProcessorService.getTargetStreamInjector()).dependency(LogStreamServiceNames.SNAPSHOT_STORAGE_SERVICE, streamProcessorService.getSnapshotStorageInjector()).dependency(SystemServiceNames.ACTOR_SCHEDULER_SERVICE, streamProcessorService.getActorSchedulerInjector()).install().thenApply(v -> processor);
    }

    public boolean writeRequestResponseError(BrokerEventMetadata metadata, LoggedEvent event, String error) {
        return this.errorWriter.errorCode(ErrorCode.REQUEST_PROCESSING_FAILURE).errorMessage(error).failedRequest(event.getValueBuffer(), event.getValueOffset(), event.getValueLength()).tryWriteResponse(metadata.getRequestStreamId(), metadata.getRequestId());
    }

    public void registerPushProcessor(TopicSubscriptionPushProcessor processor) {
        this.subscriptionRegistry.addSubscription(processor);
    }

    public void onClientChannelCloseAsync(int channelId) {
        this.cmdContext.runAsync(() -> {
            Iterator<TopicSubscriptionPushProcessor> subscriptionsIt = this.subscriptionRegistry.iterateSubscriptions();
            while (subscriptionsIt.hasNext()) {
                TopicSubscriptionPushProcessor processor = subscriptionsIt.next();
                if (processor.getChannelId() != channelId) continue;
                subscriptionsIt.remove();
                this.closePushProcessor(processor);
            }
        });
    }

    public static MetadataFilter filter() {
        return m -> EventType.SUBSCRIPTION_EVENT == m.getEventType() || EventType.SUBSCRIBER_EVENT == m.getEventType();
    }

    protected class SubscribedProcessor
    implements EventProcessor {
        protected SubscribedProcessor() {
        }

        public void processEvent() {
        }

        public boolean executeSideEffects() {
            boolean responseWritten = TopicSubscriptionManagementProcessor.this.responseWriter.topicName(TopicSubscriptionManagementProcessor.this.logStreamTopicName).partitionId(TopicSubscriptionManagementProcessor.this.logStreamPartitionId).eventWriter((BufferWriter)TopicSubscriptionManagementProcessor.this.subscriberEvent).position(TopicSubscriptionManagementProcessor.this.currentEvent.getPosition()).key(TopicSubscriptionManagementProcessor.this.currentEvent.getKey()).tryWriteResponse(TopicSubscriptionManagementProcessor.this.metadata.getRequestStreamId(), TopicSubscriptionManagementProcessor.this.metadata.getRequestId());
            if (responseWritten) {
                TopicSubscriptionPushProcessor pushProcessor = TopicSubscriptionManagementProcessor.this.subscriptionRegistry.getProcessorByName(TopicSubscriptionManagementProcessor.this.subscriberEvent.getName());
                pushProcessor.enable();
            }
            return responseWritten;
        }

        public long writeEvent(LogStreamWriter writer) {
            DirectBuffer openedSubscriptionName = TopicSubscriptionManagementProcessor.this.subscriberEvent.getName();
            TopicSubscriptionManagementProcessor.this.subscriptionEvent.reset();
            TopicSubscriptionManagementProcessor.this.subscriptionEvent.setState(TopicSubscriptionState.ACKNOWLEDGE).setName(openedSubscriptionName, 0, openedSubscriptionName.capacity()).setAckPosition(TopicSubscriptionManagementProcessor.this.subscriberEvent.getStartPosition() - 1L);
            TopicSubscriptionManagementProcessor.this.metadata.eventType(EventType.SUBSCRIPTION_EVENT).requestStreamId(-1).requestId(-1L).raftTermId(TopicSubscriptionManagementProcessor.this.targetStream.getTerm());
            return writer.key(TopicSubscriptionManagementProcessor.this.currentEvent.getKey()).metadataWriter((BufferWriter)TopicSubscriptionManagementProcessor.this.metadata).valueWriter((BufferWriter)TopicSubscriptionManagementProcessor.this.subscriptionEvent).tryWrite();
        }
    }

    protected class AckProcessor
    implements EventProcessor {
        protected AckProcessor() {
        }

        public void processEvent() {
            TopicSubscriptionManagementProcessor.this.subscriptionEvent.setState(TopicSubscriptionState.ACKNOWLEDGED);
        }

        public long writeEvent(LogStreamWriter writer) {
            TopicSubscriptionManagementProcessor.this.metadata.protocolVersion(1).raftTermId(TopicSubscriptionManagementProcessor.this.targetStream.getTerm());
            return writer.key(TopicSubscriptionManagementProcessor.this.currentEvent.getKey()).metadataWriter((BufferWriter)TopicSubscriptionManagementProcessor.this.metadata).valueWriter((BufferWriter)TopicSubscriptionManagementProcessor.this.subscriptionEvent).tryWrite();
        }

        public boolean executeSideEffects() {
            TopicSubscriptionPushProcessor subscriptionProcessor = TopicSubscriptionManagementProcessor.this.subscriptionRegistry.getProcessorByName(TopicSubscriptionManagementProcessor.this.subscriptionEvent.getName());
            if (subscriptionProcessor != null) {
                subscriptionProcessor.onAck(TopicSubscriptionManagementProcessor.this.subscriptionEvent.getAckPosition());
            }
            if (TopicSubscriptionManagementProcessor.this.metadata.getRequestId() >= 0L) {
                return TopicSubscriptionManagementProcessor.this.responseWriter.topicName(TopicSubscriptionManagementProcessor.this.logStreamTopicName).partitionId(TopicSubscriptionManagementProcessor.this.logStreamPartitionId).eventWriter((BufferWriter)TopicSubscriptionManagementProcessor.this.subscriptionEvent).key(TopicSubscriptionManagementProcessor.this.currentEvent.getKey()).tryWriteResponse(TopicSubscriptionManagementProcessor.this.metadata.getRequestStreamId(), TopicSubscriptionManagementProcessor.this.metadata.getRequestId());
            }
            return true;
        }

        public void updateState() {
            TopicSubscriptionManagementProcessor.this.putAck(TopicSubscriptionManagementProcessor.this.subscriptionEvent.getName(), TopicSubscriptionManagementProcessor.this.subscriptionEvent.getAckPosition());
        }
    }
}

