package io.zeebe.broker.subscription.message.processor;

import io.zeebe.broker.clustering.base.topology.TopologyManager;
import io.zeebe.broker.logstreams.processor.KeyGenerator;
import io.zeebe.broker.logstreams.processor.StreamProcessorLifecycleAware;
import io.zeebe.broker.logstreams.processor.TypedRecordProcessor;
import io.zeebe.broker.logstreams.processor.TypedStreamEnvironment;
import io.zeebe.broker.logstreams.processor.TypedStreamProcessor;
import io.zeebe.broker.subscription.command.SubscriptionCommandSender;
import io.zeebe.broker.subscription.message.state.MessageStateController;
import io.zeebe.logstreams.state.StateSnapshotController;
import io.zeebe.logstreams.state.StateStorage;
import io.zeebe.protocol.clientapi.ValueType;
import io.zeebe.protocol.intent.Intent;
import io.zeebe.protocol.intent.MessageIntent;
import io.zeebe.protocol.intent.MessageSubscriptionIntent;
import io.zeebe.util.sched.ActorControl;
import java.time.Duration;

/* loaded from: input_file:io/zeebe/broker/subscription/message/processor/MessageStreamProcessor.class */
public class MessageStreamProcessor implements StreamProcessorLifecycleAware {
    public static final Duration MESSAGE_TIME_TO_LIVE_CHECK_INTERVAL = Duration.ofSeconds(60);
    public static final Duration SUBSCRIPTION_TIMEOUT = Duration.ofSeconds(10);
    public static final Duration SUBSCRIPTION_CHECK_INTERVAL = Duration.ofSeconds(30);
    private final MessageStateController messageStateController = new MessageStateController();
    private final TopologyManager topologyManager;
    private final SubscriptionCommandSender subscriptionCommandSender;

    public MessageStreamProcessor(SubscriptionCommandSender subscriptionCommandSender, TopologyManager topologyManager) {
        this.subscriptionCommandSender = subscriptionCommandSender;
        this.topologyManager = topologyManager;
    }

    public TypedStreamProcessor createStreamProcessors(TypedStreamEnvironment typedStreamEnvironment) {
        return typedStreamEnvironment.newStreamProcessor().keyGenerator(KeyGenerator.createMessageKeyGenerator(typedStreamEnvironment.getStream().getPartitionId(), this.messageStateController)).onCommand(ValueType.MESSAGE, (Intent) MessageIntent.PUBLISH, (TypedRecordProcessor<?>) new PublishMessageProcessor(this.messageStateController, this.subscriptionCommandSender)).onCommand(ValueType.MESSAGE, (Intent) MessageIntent.DELETE, (TypedRecordProcessor<?>) new DeleteMessageProcessor(this.messageStateController)).onCommand(ValueType.MESSAGE_SUBSCRIPTION, (Intent) MessageSubscriptionIntent.OPEN, (TypedRecordProcessor<?>) new OpenMessageSubscriptionProcessor(this.messageStateController, this.subscriptionCommandSender)).onCommand(ValueType.MESSAGE_SUBSCRIPTION, (Intent) MessageSubscriptionIntent.CORRELATE, (TypedRecordProcessor<?>) new CorrelateMessageSubscriptionProcessor(this.messageStateController)).onCommand(ValueType.MESSAGE_SUBSCRIPTION, (Intent) MessageSubscriptionIntent.CLOSE, (TypedRecordProcessor<?>) new CloseMessageSubscriptionProcessor(this.messageStateController, this.subscriptionCommandSender)).withStateController(this.messageStateController).withListener(this).build();
    }

    public StateSnapshotController createStateSnapshotController(StateStorage stateStorage) {
        return new StateSnapshotController(this.messageStateController, stateStorage);
    }

    @Override // io.zeebe.broker.logstreams.processor.StreamProcessorLifecycleAware
    public void onOpen(TypedStreamProcessor typedStreamProcessor) {
        ActorControl actor = typedStreamProcessor.getActor();
        TypedStreamEnvironment environment = typedStreamProcessor.getEnvironment();
        this.subscriptionCommandSender.init(this.topologyManager, actor, environment.getStream());
        typedStreamProcessor.getActor().runAtFixedRate(MESSAGE_TIME_TO_LIVE_CHECK_INTERVAL, new MessageTimeToLiveChecker(environment.buildCommandWriter(), this.messageStateController));
        actor.runAtFixedRate(SUBSCRIPTION_CHECK_INTERVAL, new PendingMessageSubscriptionChecker(this.subscriptionCommandSender, this.messageStateController, SUBSCRIPTION_TIMEOUT.toMillis()));
    }
}
