package io.camunda.zeebe.engine.processing.message;

import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.state.immutable.ScheduledTaskState;
import io.camunda.zeebe.engine.state.mutable.MutablePendingMessageSubscriptionState;
import io.camunda.zeebe.stream.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.stream.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.stream.api.scheduling.ProcessingScheduleService;
import java.time.Duration;
import java.util.function.Supplier;

/* loaded from: input_file:io/camunda/zeebe/engine/processing/message/MessageObserver.class */
public final class MessageObserver implements StreamProcessorLifecycleAware {
    public static final Duration SUBSCRIPTION_TIMEOUT = Duration.ofSeconds(10);
    public static final Duration SUBSCRIPTION_CHECK_INTERVAL = Duration.ofSeconds(30);
    private final SubscriptionCommandSender subscriptionCommandSender;
    private final Supplier<ScheduledTaskState> scheduledTaskStateFactory;
    private final MutablePendingMessageSubscriptionState pendingState;
    private final int messagesTtlCheckerBatchLimit;
    private final Duration messagesTtlCheckerInterval;
    private final boolean enableMessageTtlCheckerAsync;

    public MessageObserver(Supplier<ScheduledTaskState> supplier, MutablePendingMessageSubscriptionState mutablePendingMessageSubscriptionState, SubscriptionCommandSender subscriptionCommandSender, Duration duration, int i, boolean z) {
        this.subscriptionCommandSender = subscriptionCommandSender;
        this.scheduledTaskStateFactory = supplier;
        this.pendingState = mutablePendingMessageSubscriptionState;
        this.messagesTtlCheckerInterval = duration;
        this.messagesTtlCheckerBatchLimit = i;
        this.enableMessageTtlCheckerAsync = z;
    }

    public void onRecovered(ReadonlyStreamProcessorContext readonlyStreamProcessorContext) {
        scheduleMessageTtlChecker(readonlyStreamProcessorContext);
        schedulePendingMessageSubscriptionChecker(readonlyStreamProcessorContext);
    }

    private void scheduleMessageTtlChecker(ReadonlyStreamProcessorContext readonlyStreamProcessorContext) {
        ProcessingScheduleService scheduleService = readonlyStreamProcessorContext.getScheduleService();
        MessageTimeToLiveChecker messageTimeToLiveChecker = new MessageTimeToLiveChecker(this.messagesTtlCheckerInterval, this.messagesTtlCheckerBatchLimit, this.enableMessageTtlCheckerAsync, scheduleService, this.scheduledTaskStateFactory.get().getMessageState());
        if (this.enableMessageTtlCheckerAsync) {
            scheduleService.runDelayedAsync(this.messagesTtlCheckerInterval, messageTimeToLiveChecker);
        } else {
            scheduleService.runDelayed(this.messagesTtlCheckerInterval, messageTimeToLiveChecker);
        }
    }

    private void schedulePendingMessageSubscriptionChecker(ReadonlyStreamProcessorContext readonlyStreamProcessorContext) {
        readonlyStreamProcessorContext.getScheduleService().runAtFixedRate(SUBSCRIPTION_CHECK_INTERVAL, new PendingMessageSubscriptionChecker(this.subscriptionCommandSender, this.pendingState, SUBSCRIPTION_TIMEOUT.toMillis()));
    }
}
