package io.camunda.zeebe.broker.exporter.stream;

import io.camunda.zeebe.broker.Loggers;
import io.camunda.zeebe.broker.exporter.stream.ExporterDirectorContext;
import io.camunda.zeebe.broker.exporter.stream.ExporterStateDistributeMessage;
import io.camunda.zeebe.broker.system.partitions.PartitionMessagingService;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.logstreams.log.LogRecordAwaiter;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.logstreams.log.LogStreamReader;
import io.camunda.zeebe.logstreams.log.LoggedEvent;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.ActorControl;
import io.camunda.zeebe.scheduler.ActorSchedulingService;
import io.camunda.zeebe.scheduler.SchedulingHints;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.scheduler.retry.BackOffRetryStrategy;
import io.camunda.zeebe.scheduler.retry.EndlessRetryStrategy;
import io.camunda.zeebe.scheduler.retry.RetryStrategy;
import io.camunda.zeebe.stream.api.EventFilter;
import io.camunda.zeebe.stream.impl.records.RecordValues;
import io.camunda.zeebe.stream.impl.records.TypedRecordImpl;
import io.camunda.zeebe.util.exception.UnrecoverableException;
import io.camunda.zeebe.util.health.FailureListener;
import io.camunda.zeebe.util.health.HealthMonitorable;
import io.camunda.zeebe.util.health.HealthReport;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.agrona.LangUtil;
import org.slf4j.Logger;

/* loaded from: input_file:io/camunda/zeebe/broker/exporter/stream/ExporterDirector.class */
public final class ExporterDirector extends Actor implements HealthMonitorable, LogRecordAwaiter {
    private static final String ERROR_MESSAGE_EXPORTING_ABORTED = "Expected to export record '{}' successfully, but exception was thrown.";
    private static final String ERROR_MESSAGE_RECOVER_FROM_SNAPSHOT_FAILED = "Expected to find event with the snapshot position %s in log stream, but nothing was found. Failed to recover '%s'.";
    private static final String EXPORTER_STATE_TOPIC_FORMAT = "exporterState-%d";
    private static final Logger LOG;
    private final List<ExporterContainer> containers;
    private final LogStream logStream;
    private final RecordExporter recordExporter;
    private final ZeebeDb zeebeDb;
    private final ExporterMetrics metrics;
    private final String name;
    private LogStreamReader logStreamReader;
    private EventFilter eventFilter;
    private ExportersState state;
    private boolean inExportingPhase;
    private boolean isPaused;
    private ExporterPhase exporterPhase;
    private final PartitionMessagingService partitionMessagingService;
    private final String exporterPositionsTopic;
    private final ExporterDirectorContext.ExporterMode exporterMode;
    private final Duration distributionInterval;
    private ExporterStateDistributionService exporterDistributionService;
    private final int partitionId;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final AtomicBoolean isOpened = new AtomicBoolean(false);
    private final Set<FailureListener> listeners = new HashSet();
    private volatile HealthReport healthReport = HealthReport.healthy(this);
    private final RetryStrategy exportingRetryStrategy = new BackOffRetryStrategy(this.actor, Duration.ofSeconds(10));
    private final RetryStrategy recordWrapStrategy = new EndlessRetryStrategy(this.actor);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/camunda/zeebe/broker/exporter/stream/ExporterDirector$ExporterEventFilter.class */
    public static class ExporterEventFilter implements EventFilter {
        private final RecordMetadata metadata = new RecordMetadata();
        private final Map<RecordType, Boolean> acceptRecordTypes;
        private final Map<ValueType, Boolean> acceptValueTypes;

        ExporterEventFilter(Map<RecordType, Boolean> map, Map<ValueType, Boolean> map2) {
            this.acceptRecordTypes = map;
            this.acceptValueTypes = map2;
        }

        public boolean applies(LoggedEvent loggedEvent) {
            loggedEvent.readMetadata(this.metadata);
            return this.acceptRecordTypes.get(this.metadata.getRecordType()).booleanValue() && this.acceptValueTypes.get(this.metadata.getValueType()).booleanValue();
        }

        public String toString() {
            return "ExporterEventFilter{acceptRecordTypes=" + this.acceptRecordTypes + ", acceptValueTypes=" + this.acceptValueTypes + "}";
        }
    }

    /* loaded from: input_file:io/camunda/zeebe/broker/exporter/stream/ExporterDirector$RecordExporter.class */
    private static class RecordExporter {
        private final RecordValues recordValues = new RecordValues();
        private final RecordMetadata rawMetadata = new RecordMetadata();
        private final List<ExporterContainer> containers;
        private final TypedRecordImpl typedEvent;
        private final ExporterMetrics exporterMetrics;
        private boolean shouldExport;
        private int exporterIndex;

        RecordExporter(ExporterMetrics exporterMetrics, List<ExporterContainer> list, int i) {
            this.containers = list;
            this.typedEvent = new TypedRecordImpl(i);
            this.exporterMetrics = exporterMetrics;
        }

        void wrap(LoggedEvent loggedEvent) {
            loggedEvent.readMetadata(this.rawMetadata);
            UnifiedRecordValue readRecordValue = this.recordValues.readRecordValue(loggedEvent, this.rawMetadata.getValueType());
            this.shouldExport = readRecordValue != null;
            if (this.shouldExport) {
                this.typedEvent.wrap(loggedEvent, this.rawMetadata, readRecordValue);
                this.exporterIndex = 0;
            }
        }

        public boolean export() {
            if (!this.shouldExport) {
                return true;
            }
            int size = this.containers.size();
            while (this.exporterIndex < size) {
                ExporterContainer exporterContainer = this.containers.get(this.exporterIndex);
                if (!exporterContainer.exportRecord(this.rawMetadata, this.typedEvent)) {
                    return false;
                }
                this.exporterIndex++;
                this.exporterMetrics.setLastExportedPosition(exporterContainer.getId(), this.typedEvent.getPosition());
            }
            return true;
        }

        TypedRecordImpl getTypedEvent() {
            return this.typedEvent;
        }
    }

    public ExporterDirector(ExporterDirectorContext exporterDirectorContext, boolean z) {
        this.name = exporterDirectorContext.getName();
        this.containers = (List) exporterDirectorContext.getDescriptors().stream().map(ExporterContainer::new).collect(Collectors.toList());
        this.logStream = (LogStream) Objects.requireNonNull(exporterDirectorContext.getLogStream());
        this.partitionId = this.logStream.getPartitionId();
        this.metrics = new ExporterMetrics(this.partitionId);
        this.recordExporter = new RecordExporter(this.metrics, this.containers, this.partitionId);
        this.zeebeDb = exporterDirectorContext.getZeebeDb();
        this.isPaused = z;
        this.partitionMessagingService = exporterDirectorContext.getPartitionMessagingService();
        this.exporterPositionsTopic = String.format(EXPORTER_STATE_TOPIC_FORMAT, Integer.valueOf(this.partitionId));
        this.exporterMode = exporterDirectorContext.getExporterMode();
        this.distributionInterval = exporterDirectorContext.getDistributionInterval();
    }

    public ActorFuture<Void> startAsync(ActorSchedulingService actorSchedulingService) {
        return actorSchedulingService.submitActor(this, SchedulingHints.ioBound());
    }

    public ActorFuture<Void> stopAsync() {
        return this.actor.close();
    }

    public ActorFuture<Void> pauseExporting() {
        return this.actor.isClosed() ? CompletableActorFuture.completed((Object) null) : this.actor.call(() -> {
            this.isPaused = true;
            this.exporterPhase = ExporterPhase.PAUSED;
        });
    }

    public ActorFuture<Void> resumeExporting() {
        return this.actor.isClosed() ? CompletableActorFuture.completed((Object) null) : this.actor.call(() -> {
            this.isPaused = false;
            this.exporterPhase = ExporterPhase.EXPORTING;
            if (this.exporterMode == ExporterDirectorContext.ExporterMode.ACTIVE) {
                this.actor.submit(this::readNextEvent);
            }
        });
    }

    public ActorFuture<ExporterPhase> getPhase() {
        return this.actor.isClosed() ? CompletableActorFuture.completed(ExporterPhase.CLOSED) : this.actor.call(() -> {
            return this.exporterPhase;
        });
    }

    protected Map<String, String> createContext() {
        Map<String, String> createContext = super.createContext();
        createContext.put("partitionId", Integer.toString(this.partitionId));
        return createContext;
    }

    public String getName() {
        return this.name;
    }

    protected void onActorStarting() {
        if (this.exporterMode == ExporterDirectorContext.ExporterMode.ACTIVE) {
            this.actor.runOnCompletionBlockingCurrentPhase(this.logStream.newLogStreamReader(), (logStreamReader, th) -> {
                if (th == null) {
                    this.logStreamReader = logStreamReader;
                } else {
                    LOG.error("Unexpected error on retrieving reader from log {}", this.logStream.getLogName(), th);
                    this.actor.close();
                }
            });
        }
    }

    protected void onActorStarted() {
        try {
            LOG.debug("Recovering exporter from snapshot");
            recoverFromSnapshot();
            this.exporterDistributionService = new ExporterStateDistributionService(this::consumeExporterStateFromLeader, this.partitionMessagingService, this.exporterPositionsTopic);
            initContainers();
        } catch (Exception e) {
            onFailure();
            LangUtil.rethrowUnchecked(e);
        }
        this.isOpened.set(true);
        clearExporterState();
        if (this.exporterMode == ExporterDirectorContext.ExporterMode.ACTIVE) {
            startActiveExportingMode();
        } else {
            startPassiveExportingMode();
        }
    }

    protected void onActorClosing() {
        if (this.logStreamReader != null) {
            this.logStreamReader.close();
        }
        this.logStream.removeRecordAvailableListener(this);
    }

    protected void onActorClosed() {
        LOG.debug("Closed exporter director '{}'.", getName());
        this.exporterPhase = ExporterPhase.CLOSED;
    }

    protected void onActorCloseRequested() {
        this.isOpened.set(false);
        if (this.exporterMode == ExporterDirectorContext.ExporterMode.ACTIVE) {
            this.containers.forEach((v0) -> {
                v0.close();
            });
        } else {
            this.exporterDistributionService.close();
        }
    }

    protected void handleFailure(Throwable th) {
        LOG.error("Actor '{}' failed in phase {} with: {} .", new Object[]{this.name, this.actor.getLifecyclePhase(), th, th});
        this.actor.fail(th);
        if (th instanceof UnrecoverableException) {
            this.healthReport = HealthReport.dead(this).withIssue(th);
            Iterator<FailureListener> it = this.listeners.iterator();
            while (it.hasNext()) {
                it.next().onUnrecoverableFailure(this.healthReport);
            }
            return;
        }
        this.healthReport = HealthReport.unhealthy(this).withIssue(th);
        Iterator<FailureListener> it2 = this.listeners.iterator();
        while (it2.hasNext()) {
            it2.next().onFailure(this.healthReport);
        }
    }

    private void consumeExporterStateFromLeader(String str, ExporterStateDistributeMessage.ExporterStateEntry exporterStateEntry) {
        if (this.state.getPosition(str) < exporterStateEntry.position()) {
            this.state.setExporterState(str, exporterStateEntry.position(), exporterStateEntry.metadata());
        }
    }

    private void initContainers() throws Exception {
        for (ExporterContainer exporterContainer : this.containers) {
            exporterContainer.initContainer(this.actor, this.metrics, this.state);
            exporterContainer.configureExporter();
        }
        this.eventFilter = createEventFilter(this.containers);
        LOG.debug("Set event filter for exporters: {}", this.eventFilter);
    }

    private void recoverFromSnapshot() {
        this.state = new ExportersState(this.zeebeDb, this.zeebeDb.createContext());
        LOG.debug("Recovered exporter '{}' from snapshot at lastExportedPosition {}", getName(), Long.valueOf(this.state.getLowestPosition()));
    }

    private ExporterEventFilter createEventFilter(List<ExporterContainer> list) {
        List list2 = (List) list.stream().map(exporterContainer -> {
            return exporterContainer.getContext().getFilter();
        }).collect(Collectors.toList());
        return new ExporterEventFilter((Map) Arrays.stream(RecordType.values()).collect(Collectors.toMap(Function.identity(), recordType -> {
            return Boolean.valueOf(list2.stream().anyMatch(recordFilter -> {
                return recordFilter.acceptType(recordType);
            }));
        })), (Map) Arrays.stream(ValueType.values()).collect(Collectors.toMap(Function.identity(), valueType -> {
            return Boolean.valueOf(list2.stream().anyMatch(recordFilter -> {
                return recordFilter.acceptValue(valueType);
            }));
        })));
    }

    private void onFailure() {
        this.isOpened.set(false);
        this.actor.close();
    }

    private void startActiveExportingMode() {
        this.logStream.registerRecordAvailableListener(this);
        for (ExporterContainer exporterContainer : this.containers) {
            exporterContainer.initPosition();
            exporterContainer.openExporter();
        }
        if (!this.state.hasExporters()) {
            this.actor.close();
            return;
        }
        long lowestPosition = this.state.getLowestPosition();
        if (!this.logStreamReader.seekToNextEvent(lowestPosition)) {
            throw new IllegalStateException(String.format(ERROR_MESSAGE_RECOVER_FROM_SNAPSHOT_FAILED, Long.valueOf(lowestPosition), getName()));
        }
        if (this.isPaused) {
            this.exporterPhase = ExporterPhase.PAUSED;
        } else {
            this.exporterPhase = ExporterPhase.EXPORTING;
            this.actor.submit(this::readNextEvent);
        }
        this.actor.runAtFixedRate(this.distributionInterval, this::distributeExporterState);
    }

    private void startPassiveExportingMode() {
        Iterator<ExporterContainer> it = this.containers.iterator();
        while (it.hasNext()) {
            it.next().initPosition();
        }
        if (!this.state.hasExporters()) {
            this.actor.close();
            return;
        }
        ExporterStateDistributionService exporterStateDistributionService = this.exporterDistributionService;
        ActorControl actorControl = this.actor;
        Objects.requireNonNull(actorControl);
        exporterStateDistributionService.subscribeForExporterState(actorControl::run);
    }

    private void distributeExporterState() {
        ExporterStateDistributeMessage exporterStateDistributeMessage = new ExporterStateDistributeMessage();
        this.state.visitExporterState((str, exporterStateEntry) -> {
            exporterStateDistributeMessage.putExporter(str, exporterStateEntry.getPosition(), exporterStateEntry.getMetadata());
        });
        this.exporterDistributionService.distributeExporterState(exporterStateDistributeMessage);
    }

    private void skipRecord(LoggedEvent loggedEvent) {
        RecordMetadata recordMetadata = new RecordMetadata();
        long position = loggedEvent.getPosition();
        loggedEvent.readMetadata(recordMetadata);
        this.metrics.eventSkipped(recordMetadata.getValueType());
        Iterator<ExporterContainer> it = this.containers.iterator();
        while (it.hasNext()) {
            it.next().updatePositionOnSkipIfUpToDate(position);
        }
        this.actor.submit(this::readNextEvent);
    }

    private void readNextEvent() {
        if (shouldExport()) {
            LoggedEvent loggedEvent = (LoggedEvent) this.logStreamReader.next();
            if (this.eventFilter != null && !this.eventFilter.applies(loggedEvent)) {
                skipRecord(loggedEvent);
            } else {
                this.inExportingPhase = true;
                exportEvent(loggedEvent);
            }
        }
    }

    private boolean shouldExport() {
        return this.isOpened.get() && this.logStreamReader.hasNext() && !this.inExportingPhase && !this.isPaused;
    }

    private void exportEvent(LoggedEvent loggedEvent) {
        this.actor.runOnCompletion(this.recordWrapStrategy.runWithRetry(() -> {
            this.recordExporter.wrap(loggedEvent);
            return true;
        }, this::isClosed), (bool, th) -> {
            if (!$assertionsDisabled && th != null) {
                throw new AssertionError("Throwable must be null");
            }
            RetryStrategy retryStrategy = this.exportingRetryStrategy;
            RecordExporter recordExporter = this.recordExporter;
            Objects.requireNonNull(recordExporter);
            this.actor.runOnCompletion(retryStrategy.runWithRetry(recordExporter::export, this::isClosed), (bool, th) -> {
                if (th != null) {
                    LOG.error(ERROR_MESSAGE_EXPORTING_ABORTED, loggedEvent, th);
                    onFailure();
                } else {
                    this.metrics.eventExported(this.recordExporter.getTypedEvent().getValueType());
                    this.inExportingPhase = false;
                    this.actor.submit(this::readNextEvent);
                }
            });
        });
    }

    private void clearExporterState() {
        List list = (List) this.containers.stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList());
        this.state.visitExporterState((str, exporterStateEntry) -> {
            if (list.contains(str)) {
                return;
            }
            this.state.removeExporterState(str);
            LOG.info("The exporter '{}' is not configured anymore. Its lastExportedPosition is removed from the state.", str);
        });
    }

    private boolean isClosed() {
        return !this.isOpened.get();
    }

    public HealthReport getHealthReport() {
        return this.healthReport;
    }

    public void addFailureListener(FailureListener failureListener) {
        this.actor.run(() -> {
            this.listeners.add(failureListener);
        });
    }

    public void removeFailureListener(FailureListener failureListener) {
        this.actor.run(() -> {
            this.listeners.remove(failureListener);
        });
    }

    public void onRecordAvailable() {
        this.actor.run(this::readNextEvent);
    }

    public ActorFuture<Long> getLowestPosition() {
        return this.actor.isClosed() ? CompletableActorFuture.completed(-1L) : this.actor.call(() -> {
            return Long.valueOf(this.state.getLowestPosition());
        });
    }

    static {
        $assertionsDisabled = !ExporterDirector.class.desiredAssertionStatus();
        LOG = Loggers.EXPORTER_LOGGER;
    }
}
