package io.camunda.zeebe.engine.util;

import io.camunda.zeebe.logstreams.log.LogStreamReader;
import io.camunda.zeebe.logstreams.log.LoggedEvent;
import io.camunda.zeebe.logstreams.util.SynchronousLogStream;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
import io.camunda.zeebe.stream.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.stream.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.stream.impl.records.RecordValues;
import io.camunda.zeebe.stream.impl.records.TypedRecordImpl;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:io/camunda/zeebe/engine/util/ProcessingExporterTransistor.class */
public final class ProcessingExporterTransistor implements StreamProcessorLifecycleAware {
    private LogStreamReader logStreamReader;
    private TypedRecordImpl typedEvent;
    private final SynchronousLogStream synchronousLogStream;
    private final RecordValues recordValues = new RecordValues();
    private final RecordMetadata metadata = new RecordMetadata();
    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
    private final RecordingExporter recordingExporter = new RecordingExporter();

    public ProcessingExporterTransistor(SynchronousLogStream synchronousLogStream) {
        this.synchronousLogStream = synchronousLogStream;
    }

    public void onRecovered(ReadonlyStreamProcessorContext readonlyStreamProcessorContext) {
        this.executorService.submit(() -> {
            this.typedEvent = new TypedRecordImpl(readonlyStreamProcessorContext.getPartitionId());
            this.synchronousLogStream.getAsyncLogStream().registerRecordAvailableListener(this::onNewEventCommitted);
            this.logStreamReader = this.synchronousLogStream.newLogStreamReader();
            exportEvents();
        });
    }

    public void onClose() {
        this.executorService.shutdown();
        try {
            this.executorService.awaitTermination(1L, TimeUnit.HOURS);
        } catch (InterruptedException e) {
            this.executorService.shutdownNow();
            throw new RuntimeException(e);
        }
    }

    private void onNewEventCommitted() {
        if (this.executorService.isShutdown()) {
            return;
        }
        this.executorService.submit(this::exportEvents);
    }

    private void exportEvents() {
        if (this.logStreamReader == null) {
            return;
        }
        while (this.logStreamReader.hasNext()) {
            LoggedEvent loggedEvent = (LoggedEvent) this.logStreamReader.next();
            this.metadata.reset();
            loggedEvent.readMetadata(this.metadata);
            this.typedEvent.wrap(loggedEvent, this.metadata, this.recordValues.readRecordValue(loggedEvent, this.metadata.getValueType()));
            this.recordingExporter.export(this.typedEvent);
        }
    }
}
