package io.zeebe.broker.event.processor;

import io.zeebe.broker.event.TopicSubscriptionServiceNames;
import io.zeebe.broker.logstreams.LogStreamServiceNames;
import io.zeebe.broker.logstreams.processor.MetadataFilter;
import io.zeebe.broker.logstreams.processor.StreamProcessorService;
import io.zeebe.broker.system.SystemServiceNames;
import io.zeebe.broker.transport.clientapi.CommandResponseWriter;
import io.zeebe.broker.transport.clientapi.ErrorResponseWriter;
import io.zeebe.broker.transport.clientapi.SubscribedEventWriter;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.log.LogStreamWriter;
import io.zeebe.logstreams.log.LoggedEvent;
import io.zeebe.logstreams.processor.EventProcessor;
import io.zeebe.logstreams.processor.StreamProcessor;
import io.zeebe.logstreams.processor.StreamProcessorContext;
import io.zeebe.logstreams.processor.StreamProcessorController;
import io.zeebe.logstreams.snapshot.ZbMapSnapshotSupport;
import io.zeebe.logstreams.spi.SnapshotSupport;
import io.zeebe.map.Bytes2LongZbMap;
import io.zeebe.protocol.clientapi.ErrorCode;
import io.zeebe.protocol.clientapi.EventType;
import io.zeebe.protocol.impl.BrokerEventMetadata;
import io.zeebe.servicecontainer.ServiceName;
import io.zeebe.servicecontainer.ServiceStartContext;
import io.zeebe.util.DeferredCommandContext;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.agrona.DirectBuffer;

/* loaded from: input_file:io/zeebe/broker/event/processor/TopicSubscriptionManagementProcessor.class */
public class TopicSubscriptionManagementProcessor implements StreamProcessor {
    protected static final int MAXIMUM_SUBSCRIPTION_NAME_LENGTH = 32;
    protected LogStream logStream;
    protected int logStreamPartitionId;
    protected final ServiceName<LogStream> streamServiceName;
    protected final ErrorResponseWriter errorWriter;
    protected final CommandResponseWriter responseWriter;
    protected final Supplier<SubscribedEventWriter> eventWriterFactory;
    protected final ServiceStartContext serviceContext;
    protected DeferredCommandContext cmdContext;
    protected LoggedEvent currentEvent;
    protected final SubscriptionRegistry subscriptionRegistry = new SubscriptionRegistry();
    protected final AckProcessor ackProcessor = new AckProcessor();
    protected final SubscribeProcessor subscribeProcessor = new SubscribeProcessor(MAXIMUM_SUBSCRIPTION_NAME_LENGTH, this);
    protected final SubscribedProcessor subscribedProcessor = new SubscribedProcessor();
    protected final BrokerEventMetadata metadata = new BrokerEventMetadata();
    protected final TopicSubscriptionEvent subscriptionEvent = new TopicSubscriptionEvent();
    protected final TopicSubscriberEvent subscriberEvent = new TopicSubscriberEvent();
    protected final Bytes2LongZbMap ackMap = new Bytes2LongZbMap(MAXIMUM_SUBSCRIPTION_NAME_LENGTH);
    protected final SnapshotSupport snapshotResource = new ZbMapSnapshotSupport(this.ackMap);

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/zeebe/broker/event/processor/TopicSubscriptionManagementProcessor$AckProcessor.class */
    public class AckProcessor implements EventProcessor {
        protected AckProcessor() {
        }

        public void processEvent() {
            TopicSubscriptionManagementProcessor.this.subscriptionEvent.setState(TopicSubscriptionState.ACKNOWLEDGED);
        }

        public long writeEvent(LogStreamWriter logStreamWriter) {
            TopicSubscriptionManagementProcessor.this.metadata.protocolVersion(1);
            return logStreamWriter.key(TopicSubscriptionManagementProcessor.this.currentEvent.getKey()).metadataWriter(TopicSubscriptionManagementProcessor.this.metadata).valueWriter(TopicSubscriptionManagementProcessor.this.subscriptionEvent).tryWrite();
        }

        public boolean executeSideEffects() {
            TopicSubscriptionPushProcessor processorByName = TopicSubscriptionManagementProcessor.this.subscriptionRegistry.getProcessorByName(TopicSubscriptionManagementProcessor.this.subscriptionEvent.getName());
            if (processorByName != null) {
                processorByName.onAck(TopicSubscriptionManagementProcessor.this.subscriptionEvent.getAckPosition());
            }
            if (TopicSubscriptionManagementProcessor.this.metadata.getRequestId() >= 0) {
                return TopicSubscriptionManagementProcessor.this.responseWriter.partitionId(TopicSubscriptionManagementProcessor.this.logStreamPartitionId).eventWriter(TopicSubscriptionManagementProcessor.this.subscriptionEvent).key(TopicSubscriptionManagementProcessor.this.currentEvent.getKey()).tryWriteResponse(TopicSubscriptionManagementProcessor.this.metadata.getRequestStreamId(), TopicSubscriptionManagementProcessor.this.metadata.getRequestId());
            }
            return true;
        }

        public void updateState() {
            TopicSubscriptionManagementProcessor.this.putAck(TopicSubscriptionManagementProcessor.this.subscriptionEvent.getName(), TopicSubscriptionManagementProcessor.this.subscriptionEvent.getAckPosition());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/zeebe/broker/event/processor/TopicSubscriptionManagementProcessor$SubscribedProcessor.class */
    public class SubscribedProcessor implements EventProcessor {
        protected SubscribedProcessor() {
        }

        public void processEvent() {
        }

        public boolean executeSideEffects() {
            boolean tryWriteResponse = TopicSubscriptionManagementProcessor.this.responseWriter.partitionId(TopicSubscriptionManagementProcessor.this.logStreamPartitionId).eventWriter(TopicSubscriptionManagementProcessor.this.subscriberEvent).position(TopicSubscriptionManagementProcessor.this.currentEvent.getPosition()).key(TopicSubscriptionManagementProcessor.this.currentEvent.getKey()).tryWriteResponse(TopicSubscriptionManagementProcessor.this.metadata.getRequestStreamId(), TopicSubscriptionManagementProcessor.this.metadata.getRequestId());
            if (tryWriteResponse) {
                TopicSubscriptionManagementProcessor.this.subscriptionRegistry.getProcessorByName(TopicSubscriptionManagementProcessor.this.subscriberEvent.getName()).enable();
            }
            return tryWriteResponse;
        }

        public long writeEvent(LogStreamWriter logStreamWriter) {
            DirectBuffer name = TopicSubscriptionManagementProcessor.this.subscriberEvent.getName();
            TopicSubscriptionManagementProcessor.this.subscriptionEvent.reset();
            TopicSubscriptionManagementProcessor.this.subscriptionEvent.setState(TopicSubscriptionState.ACKNOWLEDGE).setName(name, 0, name.capacity()).setAckPosition(TopicSubscriptionManagementProcessor.this.subscriberEvent.getStartPosition() - 1);
            TopicSubscriptionManagementProcessor.this.metadata.eventType(EventType.SUBSCRIPTION_EVENT).requestStreamId(-1).requestId(-1L);
            return logStreamWriter.key(TopicSubscriptionManagementProcessor.this.currentEvent.getKey()).metadataWriter(TopicSubscriptionManagementProcessor.this.metadata).valueWriter(TopicSubscriptionManagementProcessor.this.subscriptionEvent).tryWrite();
        }
    }

    public TopicSubscriptionManagementProcessor(ServiceName<LogStream> serviceName, CommandResponseWriter commandResponseWriter, ErrorResponseWriter errorResponseWriter, Supplier<SubscribedEventWriter> supplier, ServiceStartContext serviceStartContext) {
        this.streamServiceName = serviceName;
        this.responseWriter = commandResponseWriter;
        this.errorWriter = errorResponseWriter;
        this.eventWriterFactory = supplier;
        this.serviceContext = serviceStartContext;
    }

    public void onOpen(StreamProcessorContext streamProcessorContext) {
        this.cmdContext = streamProcessorContext.getStreamProcessorCmdQueue();
        LogStream logStream = streamProcessorContext.getLogStream();
        this.logStreamPartitionId = logStream.getPartitionId();
        this.logStream = logStream;
    }

    public void onClose() {
        this.ackMap.close();
    }

    public SnapshotSupport getStateResource() {
        return this.snapshotResource;
    }

    public LogStream getLogStream() {
        return this.logStream;
    }

    public EventProcessor onEvent(LoggedEvent loggedEvent) {
        this.metadata.reset();
        loggedEvent.readMetadata(this.metadata);
        this.currentEvent = loggedEvent;
        if (this.metadata.getEventType() == EventType.SUBSCRIPTION_EVENT) {
            return onSubscriptionEvent(loggedEvent);
        }
        if (this.metadata.getEventType() == EventType.SUBSCRIBER_EVENT) {
            return onSubscriberEvent(loggedEvent);
        }
        return null;
    }

    protected EventProcessor onSubscriberEvent(LoggedEvent loggedEvent) {
        this.subscriberEvent.reset();
        this.subscriberEvent.wrap(loggedEvent.getValueBuffer(), loggedEvent.getValueOffset(), loggedEvent.getValueLength());
        if (this.subscriberEvent.getState() == TopicSubscriberState.SUBSCRIBE) {
            this.subscribeProcessor.wrap(this.currentEvent, this.metadata, this.subscriberEvent);
            return this.subscribeProcessor;
        }
        if (this.subscriberEvent.getState() == TopicSubscriberState.SUBSCRIBED) {
            return this.subscribedProcessor;
        }
        return null;
    }

    protected EventProcessor onSubscriptionEvent(LoggedEvent loggedEvent) {
        this.subscriptionEvent.reset();
        this.subscriptionEvent.wrap(loggedEvent.getValueBuffer(), loggedEvent.getValueOffset(), loggedEvent.getValueLength());
        if (this.subscriptionEvent.getState() == TopicSubscriptionState.ACKNOWLEDGE) {
            return this.ackProcessor;
        }
        return null;
    }

    protected void putAck(DirectBuffer directBuffer, long j) {
        this.ackMap.put(directBuffer, 0, directBuffer.capacity(), j);
    }

    public CompletableFuture<Void> closePushProcessorAsync(long j) {
        return this.cmdContext.runAsync(completableFuture -> {
            TopicSubscriptionPushProcessor removeProcessorByKey = this.subscriptionRegistry.removeProcessorByKey(j);
            if (removeProcessorByKey != null) {
                closePushProcessor(removeProcessorByKey).handle((r4, th) -> {
                    return Boolean.valueOf(th == null ? completableFuture.complete(null) : completableFuture.completeExceptionally(th));
                });
            } else {
                completableFuture.complete(null);
            }
        });
    }

    protected CompletableFuture<Void> closePushProcessor(TopicSubscriptionPushProcessor topicSubscriptionPushProcessor) {
        return this.serviceContext.removeService(TopicSubscriptionServiceNames.subscriptionPushServiceName(this.streamServiceName.getName(), topicSubscriptionPushProcessor.getNameAsString()));
    }

    public long determineResumePosition(DirectBuffer directBuffer, long j, boolean z) {
        long j2 = this.ackMap.get(directBuffer, 0, directBuffer.capacity(), -1L);
        if (!z && j2 >= 0) {
            return j2 + 1;
        }
        return j;
    }

    public CompletableFuture<TopicSubscriptionPushProcessor> openPushProcessorAsync(int i, long j, long j2, DirectBuffer directBuffer, int i2) {
        TopicSubscriptionPushProcessor topicSubscriptionPushProcessor = new TopicSubscriptionPushProcessor(i, j, j2, directBuffer, i2, this.eventWriterFactory.get());
        ServiceName<StreamProcessorController> subscriptionPushServiceName = TopicSubscriptionServiceNames.subscriptionPushServiceName(this.streamServiceName.getName(), topicSubscriptionPushProcessor.getNameAsString());
        StreamProcessorService readOnly = new StreamProcessorService(subscriptionPushServiceName.getName(), 40, topicSubscriptionPushProcessor).eventFilter(TopicSubscriptionPushProcessor.eventFilter()).readOnly(true);
        return this.serviceContext.createService(subscriptionPushServiceName, readOnly).dependency(this.streamServiceName, readOnly.getLogStreamInjector()).dependency(LogStreamServiceNames.SNAPSHOT_STORAGE_SERVICE, readOnly.getSnapshotStorageInjector()).dependency(SystemServiceNames.ACTOR_SCHEDULER_SERVICE, readOnly.getActorSchedulerInjector()).install().thenApply(r3 -> {
            return topicSubscriptionPushProcessor;
        });
    }

    public boolean writeRequestResponseError(BrokerEventMetadata brokerEventMetadata, LoggedEvent loggedEvent, String str) {
        return this.errorWriter.errorCode(ErrorCode.REQUEST_PROCESSING_FAILURE).errorMessage(str).failedRequest(loggedEvent.getValueBuffer(), loggedEvent.getValueOffset(), loggedEvent.getValueLength()).tryWriteResponse(brokerEventMetadata.getRequestStreamId(), brokerEventMetadata.getRequestId());
    }

    public void registerPushProcessor(TopicSubscriptionPushProcessor topicSubscriptionPushProcessor) {
        this.subscriptionRegistry.addSubscription(topicSubscriptionPushProcessor);
    }

    public void onClientChannelCloseAsync(int i) {
        this.cmdContext.runAsync(() -> {
            Iterator<TopicSubscriptionPushProcessor> iterateSubscriptions = this.subscriptionRegistry.iterateSubscriptions();
            while (iterateSubscriptions.hasNext()) {
                TopicSubscriptionPushProcessor next = iterateSubscriptions.next();
                if (next.getChannelId() == i) {
                    iterateSubscriptions.remove();
                    closePushProcessor(next);
                }
            }
        });
    }

    public static MetadataFilter filter() {
        return brokerEventMetadata -> {
            return EventType.SUBSCRIPTION_EVENT == brokerEventMetadata.getEventType() || EventType.SUBSCRIBER_EVENT == brokerEventMetadata.getEventType();
        };
    }
}
