package io.camunda.zeebe.streamprocessor;

import io.camunda.zeebe.engine.Loggers;
import io.camunda.zeebe.engine.api.SimpleProcessingScheduleService;
import io.camunda.zeebe.engine.api.Task;
import io.camunda.zeebe.engine.api.TaskResult;
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;
import io.camunda.zeebe.scheduler.ActorControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.scheduler.retry.AbortableRetryStrategy;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import java.time.Duration;
import java.util.Objects;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;
import org.slf4j.Logger;

/* loaded from: input_file:io/camunda/zeebe/streamprocessor/ProcessingScheduleServiceImpl.class */
public class ProcessingScheduleServiceImpl implements SimpleProcessingScheduleService, AutoCloseable {
    private static final Logger LOG = Loggers.STREAM_PROCESSING;
    private final Supplier<StreamProcessor.Phase> streamProcessorPhaseSupplier;
    private final BooleanSupplier abortCondition;
    private final Supplier<ActorFuture<LogStreamBatchWriter>> writerAsyncSupplier;
    private LogStreamBatchWriter logStreamBatchWriter;
    private ActorControl actorControl;
    private AbortableRetryStrategy writeRetryStrategy;
    private CompletableActorFuture<Void> openFuture;

    public ProcessingScheduleServiceImpl(Supplier<StreamProcessor.Phase> supplier, BooleanSupplier booleanSupplier, Supplier<ActorFuture<LogStreamBatchWriter>> supplier2) {
        this.streamProcessorPhaseSupplier = supplier;
        this.abortCondition = booleanSupplier;
        this.writerAsyncSupplier = supplier2;
    }

    @Override // io.camunda.zeebe.engine.api.SimpleProcessingScheduleService
    public void runDelayed(Duration duration, Runnable runnable) {
        useActorControl(() -> {
            this.actorControl.runDelayed(duration, runnable);
        });
    }

    @Override // io.camunda.zeebe.engine.api.SimpleProcessingScheduleService
    public void runDelayed(Duration duration, Task task) {
        runDelayed(duration, toRunnable(task));
    }

    @Override // io.camunda.zeebe.engine.api.SimpleProcessingScheduleService
    public void runAtFixedRate(Duration duration, Task task) {
        runDelayed(duration, toRunnable(taskResultBuilder -> {
            try {
                TaskResult execute = task.execute(taskResultBuilder);
                runAtFixedRate(duration, task);
                return execute;
            } catch (Throwable th) {
                runAtFixedRate(duration, task);
                throw th;
            }
        }));
    }

    private void useActorControl(Runnable runnable) {
        if (this.actorControl == null) {
            LOG.debug("ProcessingScheduleService hasn't been opened yet, ignore scheduled task.");
        } else {
            runnable.run();
        }
    }

    public ActorFuture<Void> open(ActorControl actorControl) {
        if (this.openFuture != null) {
            return this.openFuture;
        }
        this.openFuture = new CompletableActorFuture<>();
        this.writeRetryStrategy = new AbortableRetryStrategy(actorControl);
        actorControl.runOnCompletion(this.writerAsyncSupplier.get(), (logStreamBatchWriter, th) -> {
            if (th != null) {
                this.openFuture.completeExceptionally(th);
                return;
            }
            this.logStreamBatchWriter = logStreamBatchWriter;
            this.actorControl = actorControl;
            this.openFuture.complete((Object) null);
        });
        return this.openFuture;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.actorControl = null;
        this.logStreamBatchWriter = null;
        this.writeRetryStrategy = null;
        this.openFuture = null;
    }

    Runnable toRunnable(Task task) {
        return () -> {
            if (this.abortCondition.getAsBoolean()) {
                return;
            }
            StreamProcessor.Phase phase = this.streamProcessorPhaseSupplier.get();
            if (phase != StreamProcessor.Phase.PROCESSING) {
                LOG.trace("Not able to execute scheduled task right now. [streamProcessorPhase: {}]", phase);
                this.actorControl.submit(toRunnable(task));
                return;
            }
            LogStreamBatchWriter logStreamBatchWriter = this.logStreamBatchWriter;
            Objects.requireNonNull(logStreamBatchWriter);
            TaskResult execute = task.execute(new BufferedTaskResultBuilder((v1, v2) -> {
                return r2.canWriteAdditionalEvent(v1, v2);
            }));
            this.logStreamBatchWriter.reset();
            execute.getRecordBatch().forEach(immutableRecordBatchEntry -> {
                this.logStreamBatchWriter.event().key(immutableRecordBatchEntry.key()).metadataWriter(immutableRecordBatchEntry.recordMetadata()).sourceIndex(immutableRecordBatchEntry.sourceIndex()).valueWriter(immutableRecordBatchEntry.recordValue()).done();
            });
            this.writeRetryStrategy.runWithRetry(() -> {
                LOG.trace("Write scheduled TaskResult to dispatcher!");
                return this.logStreamBatchWriter.tryWrite() >= 0;
            }, this.abortCondition).onComplete((bool, th) -> {
                if (th != null) {
                    LOG.warn("Writing of scheduled TaskResult failed!", th);
                }
            });
        };
    }
}
