package io.zeebe.broker.event.processor;

import io.zeebe.broker.Loggers;
import io.zeebe.broker.clustering.base.partitions.Partition;
import io.zeebe.broker.logstreams.processor.KeyGenerator;
import io.zeebe.broker.logstreams.processor.MetadataFilter;
import io.zeebe.broker.logstreams.processor.StreamProcessorServiceFactory;
import io.zeebe.broker.transport.clientapi.CommandResponseWriter;
import io.zeebe.broker.transport.clientapi.ErrorResponseWriter;
import io.zeebe.broker.transport.clientapi.SubscribedRecordWriter;
import io.zeebe.logstreams.impl.service.LogStreamServiceNames;
import io.zeebe.logstreams.impl.service.StreamProcessorService;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.log.LogStreamRecordWriter;
import io.zeebe.logstreams.log.LoggedEvent;
import io.zeebe.logstreams.processor.EventProcessor;
import io.zeebe.logstreams.processor.StreamProcessor;
import io.zeebe.logstreams.processor.StreamProcessorContext;
import io.zeebe.logstreams.snapshot.ComposedSnapshot;
import io.zeebe.logstreams.snapshot.UnpackedObjectSnapshotSupport;
import io.zeebe.logstreams.snapshot.ZbMapSnapshotSupport;
import io.zeebe.logstreams.spi.ComposableSnapshotSupport;
import io.zeebe.logstreams.spi.SnapshotSupport;
import io.zeebe.map.Bytes2LongZbMap;
import io.zeebe.protocol.clientapi.ErrorCode;
import io.zeebe.protocol.clientapi.RecordType;
import io.zeebe.protocol.clientapi.ValueType;
import io.zeebe.protocol.impl.record.RecordMetadata;
import io.zeebe.protocol.intent.SubscriberIntent;
import io.zeebe.protocol.intent.SubscriptionIntent;
import io.zeebe.servicecontainer.ServiceContainer;
import io.zeebe.servicecontainer.ServiceName;
import io.zeebe.util.sched.ActorControl;
import io.zeebe.util.sched.future.ActorFuture;
import io.zeebe.util.sched.future.CompletableActorFuture;
import java.util.Iterator;
import java.util.function.Supplier;
import org.agrona.DirectBuffer;

/* loaded from: input_file:io/zeebe/broker/event/processor/TopicSubscriptionManagementProcessor.class */
public class TopicSubscriptionManagementProcessor implements StreamProcessor {
    protected static final int MAXIMUM_SUBSCRIPTION_NAME_LENGTH = 32;
    protected final SnapshotSupport snapshotResource;
    protected LogStream logStream;
    protected int logStreamPartitionId;
    protected final Partition partition;
    protected final ServiceName<Partition> partitionServiceName;
    protected final MetadataFilter pushProcessorEventFilter;
    protected final ErrorResponseWriter errorWriter;
    protected final CommandResponseWriter responseWriter;
    protected final Supplier<SubscribedRecordWriter> eventWriterFactory;
    protected final StreamProcessorServiceFactory streamProcessorServiceFactory;
    protected final ServiceContainer serviceContext;
    private ActorControl actor;
    protected LoggedEvent currentEvent;
    private final KeyGenerator keyGenerator;
    protected final SubscriptionRegistry subscriptionRegistry = new SubscriptionRegistry();
    protected final AckProcessor ackProcessor = new AckProcessor();
    protected final SubscribeProcessor subscribeProcessor = new SubscribeProcessor(MAXIMUM_SUBSCRIPTION_NAME_LENGTH, this);
    protected final SubscribedProcessor subscribedProcessor = new SubscribedProcessor();
    protected final RecordMetadata metadata = new RecordMetadata();
    protected final TopicSubscriptionEvent subscriptionEvent = new TopicSubscriptionEvent();
    protected final TopicSubscriberEvent subscriberEvent = new TopicSubscriberEvent();
    protected final Bytes2LongZbMap ackMap = new Bytes2LongZbMap(MAXIMUM_SUBSCRIPTION_NAME_LENGTH);

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/zeebe/broker/event/processor/TopicSubscriptionManagementProcessor$AckProcessor.class */
    public class AckProcessor implements EventProcessor {
        protected AckProcessor() {
        }

        public long writeEvent(LogStreamRecordWriter logStreamRecordWriter) {
            TopicSubscriptionManagementProcessor.this.metadata.recordType(RecordType.EVENT).valueType(ValueType.SUBSCRIPTION).intent(SubscriptionIntent.ACKNOWLEDGED).protocolVersion(1);
            return logStreamRecordWriter.key(TopicSubscriptionManagementProcessor.this.currentEvent.getKey()).metadataWriter(TopicSubscriptionManagementProcessor.this.metadata).valueWriter(TopicSubscriptionManagementProcessor.this.subscriptionEvent).tryWrite();
        }

        public boolean executeSideEffects() {
            TopicSubscriptionPushProcessor processorByName = TopicSubscriptionManagementProcessor.this.subscriptionRegistry.getProcessorByName(TopicSubscriptionManagementProcessor.this.subscriptionEvent.getName());
            if (processorByName != null) {
                processorByName.onAck(TopicSubscriptionManagementProcessor.this.subscriptionEvent.getAckPosition());
            }
            if (TopicSubscriptionManagementProcessor.this.metadata.getRequestId() >= 0) {
                return TopicSubscriptionManagementProcessor.this.responseWriter.partitionId(TopicSubscriptionManagementProcessor.this.logStreamPartitionId).valueWriter(TopicSubscriptionManagementProcessor.this.subscriptionEvent).key(TopicSubscriptionManagementProcessor.this.currentEvent.getKey()).timestamp(TopicSubscriptionManagementProcessor.this.currentEvent.getTimestamp()).recordType(RecordType.EVENT).valueType(ValueType.SUBSCRIPTION).intent(SubscriptionIntent.ACKNOWLEDGED).tryWriteResponse(TopicSubscriptionManagementProcessor.this.metadata.getRequestStreamId(), TopicSubscriptionManagementProcessor.this.metadata.getRequestId());
            }
            return true;
        }

        public void updateState() {
            TopicSubscriptionManagementProcessor.this.putAck(TopicSubscriptionManagementProcessor.this.subscriptionEvent.getName(), TopicSubscriptionManagementProcessor.this.subscriptionEvent.getAckPosition());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/zeebe/broker/event/processor/TopicSubscriptionManagementProcessor$SubscribedProcessor.class */
    public class SubscribedProcessor implements EventProcessor {
        protected SubscribedProcessor() {
        }

        public boolean executeSideEffects() {
            boolean tryWriteResponse = TopicSubscriptionManagementProcessor.this.responseWriter.partitionId(TopicSubscriptionManagementProcessor.this.logStreamPartitionId).recordType(RecordType.EVENT).valueType(ValueType.SUBSCRIBER).intent(SubscriberIntent.SUBSCRIBED).valueWriter(TopicSubscriptionManagementProcessor.this.subscriberEvent).position(TopicSubscriptionManagementProcessor.this.currentEvent.getPosition()).key(TopicSubscriptionManagementProcessor.this.currentEvent.getKey()).timestamp(TopicSubscriptionManagementProcessor.this.currentEvent.getTimestamp()).tryWriteResponse(TopicSubscriptionManagementProcessor.this.metadata.getRequestStreamId(), TopicSubscriptionManagementProcessor.this.metadata.getRequestId());
            if (tryWriteResponse) {
                Loggers.SERVICES_LOGGER.debug("Topic push processor for partition {} successfully opened. Send response for request {}", Integer.valueOf(TopicSubscriptionManagementProcessor.this.logStreamPartitionId), Long.valueOf(TopicSubscriptionManagementProcessor.this.metadata.getRequestId()));
                TopicSubscriptionManagementProcessor.this.subscriptionRegistry.getProcessorByName(TopicSubscriptionManagementProcessor.this.subscriberEvent.getName()).enable();
            }
            return tryWriteResponse;
        }

        public long writeEvent(LogStreamRecordWriter logStreamRecordWriter) {
            DirectBuffer name = TopicSubscriptionManagementProcessor.this.subscriberEvent.getName();
            TopicSubscriptionManagementProcessor.this.subscriptionEvent.reset();
            TopicSubscriptionManagementProcessor.this.subscriptionEvent.setName(name, 0, name.capacity()).setAckPosition(TopicSubscriptionManagementProcessor.this.subscriberEvent.getStartPosition() - 1);
            TopicSubscriptionManagementProcessor.this.metadata.recordType(RecordType.COMMAND).valueType(ValueType.SUBSCRIPTION).intent(SubscriptionIntent.ACKNOWLEDGE).requestStreamId(-1).requestId(-1L);
            return logStreamRecordWriter.key(TopicSubscriptionManagementProcessor.this.currentEvent.getKey()).metadataWriter(TopicSubscriptionManagementProcessor.this.metadata).valueWriter(TopicSubscriptionManagementProcessor.this.subscriptionEvent).tryWrite();
        }
    }

    public TopicSubscriptionManagementProcessor(Partition partition, ServiceName<Partition> serviceName, MetadataFilter metadataFilter, CommandResponseWriter commandResponseWriter, ErrorResponseWriter errorResponseWriter, Supplier<SubscribedRecordWriter> supplier, StreamProcessorServiceFactory streamProcessorServiceFactory, ServiceContainer serviceContainer) {
        this.partition = partition;
        this.keyGenerator = new KeyGenerator(partition.getInfo().getPartitionId(), 1L, 1);
        this.partitionServiceName = serviceName;
        this.pushProcessorEventFilter = metadataFilter;
        this.responseWriter = commandResponseWriter;
        this.errorWriter = errorResponseWriter;
        this.eventWriterFactory = supplier;
        this.snapshotResource = new ComposedSnapshot(new ComposableSnapshotSupport[]{new ZbMapSnapshotSupport(this.ackMap), new UnpackedObjectSnapshotSupport(this.keyGenerator)});
        this.serviceContext = serviceContainer;
        this.streamProcessorServiceFactory = streamProcessorServiceFactory;
    }

    public Supplier<SubscribedRecordWriter> getEventWriterFactory() {
        return this.eventWriterFactory;
    }

    public void onOpen(StreamProcessorContext streamProcessorContext) {
        this.actor = streamProcessorContext.getActorControl();
        LogStream logStream = streamProcessorContext.getLogStream();
        this.logStreamPartitionId = logStream.getPartitionId();
        this.logStream = logStream;
    }

    public void onClose() {
        this.ackMap.close();
    }

    public SnapshotSupport getStateResource() {
        return this.snapshotResource;
    }

    public LogStream getLogStream() {
        return this.logStream;
    }

    public EventProcessor onEvent(LoggedEvent loggedEvent) {
        this.metadata.reset();
        loggedEvent.readMetadata(this.metadata);
        this.currentEvent = loggedEvent;
        if (this.metadata.getValueType() == ValueType.SUBSCRIPTION) {
            return onSubscriptionEvent(loggedEvent);
        }
        if (this.metadata.getValueType() == ValueType.SUBSCRIBER) {
            return onSubscriberEvent(loggedEvent);
        }
        return null;
    }

    protected EventProcessor onSubscriberEvent(LoggedEvent loggedEvent) {
        this.subscriberEvent.reset();
        this.subscriberEvent.wrap(loggedEvent.getValueBuffer(), loggedEvent.getValueOffset(), loggedEvent.getValueLength());
        if (this.metadata.getIntent() == SubscriberIntent.SUBSCRIBE) {
            this.subscribeProcessor.wrap(this.currentEvent, this.metadata, this.subscriberEvent);
            return this.subscribeProcessor;
        }
        if (this.metadata.getIntent() == SubscriberIntent.SUBSCRIBED) {
            return this.subscribedProcessor;
        }
        return null;
    }

    public long nextSubscriberKey() {
        return this.keyGenerator.nextKey();
    }

    protected EventProcessor onSubscriptionEvent(LoggedEvent loggedEvent) {
        this.subscriptionEvent.reset();
        this.subscriptionEvent.wrap(loggedEvent.getValueBuffer(), loggedEvent.getValueOffset(), loggedEvent.getValueLength());
        if (this.metadata.getIntent() == SubscriptionIntent.ACKNOWLEDGE) {
            return this.ackProcessor;
        }
        return null;
    }

    protected void putAck(DirectBuffer directBuffer, long j) {
        this.ackMap.put(directBuffer, 0, directBuffer.capacity(), j);
    }

    public ActorFuture<Void> closePushProcessorAsync(long j) {
        CompletableActorFuture completableActorFuture = new CompletableActorFuture();
        this.actor.call(() -> {
            TopicSubscriptionPushProcessor removeProcessorByKey = this.subscriptionRegistry.removeProcessorByKey(j);
            if (removeProcessorByKey == null) {
                completableActorFuture.complete((Object) null);
            } else {
                this.actor.runOnCompletion(closePushProcessor(removeProcessorByKey), (r4, th) -> {
                    if (th == null) {
                        completableActorFuture.complete((Object) null);
                    } else {
                        completableActorFuture.completeExceptionally(th);
                    }
                });
            }
        });
        return completableActorFuture;
    }

    protected ActorFuture<Void> closePushProcessor(TopicSubscriptionPushProcessor topicSubscriptionPushProcessor) {
        return this.serviceContext.removeService(LogStreamServiceNames.streamProcessorService(this.logStream.getLogName(), pushProcessorName(topicSubscriptionPushProcessor)));
    }

    public long determineResumePosition(DirectBuffer directBuffer, long j, boolean z) {
        long j2 = this.ackMap.get(directBuffer, 0, directBuffer.capacity(), -1L);
        if (!z && j2 >= 0) {
            return j2 + 1;
        }
        return j;
    }

    public ActorFuture<StreamProcessorService> openPushProcessorAsync(TopicSubscriptionPushProcessor topicSubscriptionPushProcessor) {
        return this.streamProcessorServiceFactory.createService(this.partition, this.partitionServiceName).processor(topicSubscriptionPushProcessor).processorId(40).processorName(pushProcessorName(topicSubscriptionPushProcessor)).eventFilter(this.pushProcessorEventFilter).additionalDependencies(this.partitionServiceName).readOnly(true).build();
    }

    public boolean writeRequestResponseError(RecordMetadata recordMetadata, String str) {
        return this.errorWriter.errorCode(ErrorCode.REQUEST_PROCESSING_FAILURE).errorMessage(str).tryWriteResponse(recordMetadata.getRequestStreamId(), recordMetadata.getRequestId());
    }

    public void registerPushProcessor(TopicSubscriptionPushProcessor topicSubscriptionPushProcessor) {
        this.subscriptionRegistry.addSubscription(topicSubscriptionPushProcessor);
    }

    public void onClientChannelCloseAsync(int i) {
        this.actor.call(() -> {
            Iterator<TopicSubscriptionPushProcessor> iterateSubscriptions = this.subscriptionRegistry.iterateSubscriptions();
            while (iterateSubscriptions.hasNext()) {
                TopicSubscriptionPushProcessor next = iterateSubscriptions.next();
                if (next.getChannelId() == i) {
                    iterateSubscriptions.remove();
                    closePushProcessor(next);
                }
            }
        });
    }

    public static MetadataFilter filter() {
        return recordMetadata -> {
            return ValueType.SUBSCRIPTION == recordMetadata.getValueType() || ValueType.SUBSCRIBER == recordMetadata.getValueType();
        };
    }

    private static String pushProcessorName(TopicSubscriptionPushProcessor topicSubscriptionPushProcessor) {
        return String.format("topic-push.%s", topicSubscriptionPushProcessor.getNameAsString());
    }
}
