/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.util;

import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.logstreams.log.LogStreamReader;
import io.camunda.zeebe.logstreams.log.LoggedEvent;
import io.camunda.zeebe.logstreams.util.SynchronousLogStream;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.stream.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.stream.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.stream.impl.records.RecordValues;
import io.camunda.zeebe.stream.impl.records.TypedRecordImpl;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.util.buffer.BufferReader;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public final class ProcessingExporterTransistor
implements StreamProcessorLifecycleAware {
    private final RecordValues recordValues = new RecordValues();
    private final RecordMetadata metadata = new RecordMetadata();
    private LogStreamReader logStreamReader;
    private TypedRecordImpl typedEvent;
    private final SynchronousLogStream synchronousLogStream;
    private final ExecutorService executorService;
    private final RecordingExporter recordingExporter;

    public ProcessingExporterTransistor(SynchronousLogStream synchronousLogStream) {
        this.synchronousLogStream = synchronousLogStream;
        this.executorService = Executors.newSingleThreadExecutor();
        this.recordingExporter = new RecordingExporter();
    }

    public void onRecovered(ReadonlyStreamProcessorContext context) {
        this.executorService.submit(() -> {
            int partitionId = context.getPartitionId();
            this.typedEvent = new TypedRecordImpl(partitionId);
            LogStream asyncLogStream = this.synchronousLogStream.getAsyncLogStream();
            asyncLogStream.registerRecordAvailableListener(this::onNewEventCommitted);
            this.logStreamReader = this.synchronousLogStream.newLogStreamReader();
            this.exportEvents();
        });
    }

    public void onClose() {
        this.executorService.shutdownNow();
    }

    private void onNewEventCommitted() {
        if (this.executorService.isShutdown()) {
            return;
        }
        this.executorService.submit(this::exportEvents);
    }

    private void exportEvents() {
        if (this.logStreamReader == null) {
            return;
        }
        while (this.logStreamReader.hasNext()) {
            LoggedEvent rawEvent = (LoggedEvent)this.logStreamReader.next();
            this.metadata.reset();
            rawEvent.readMetadata((BufferReader)this.metadata);
            UnifiedRecordValue recordValue = this.recordValues.readRecordValue(rawEvent, this.metadata.getValueType());
            this.typedEvent.wrap(rawEvent, this.metadata, recordValue);
            this.recordingExporter.export((Record)this.typedEvent);
        }
    }
}

