package io.camunda.zeebe.broker.exporter.stream;

import io.camunda.zeebe.broker.Loggers;
import io.camunda.zeebe.broker.exporter.stream.ExporterStateDistributeMessage;
import io.camunda.zeebe.broker.system.partitions.PartitionMessagingService;
import io.camunda.zeebe.util.logging.ThrottledLogger;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.slf4j.Logger;

/* loaded from: input_file:io/camunda/zeebe/broker/exporter/stream/ExporterStateDistributionService.class */
public class ExporterStateDistributionService implements AutoCloseable {
    private static final Logger PERIODIC_LOGGER = new ThrottledLogger(Loggers.EXPORTER_LOGGER, Duration.ofSeconds(60));
    private final PartitionMessagingService partitionMessagingService;
    private final String exporterStateTopic;
    private final BiConsumer<String, ExporterStateDistributeMessage.ExporterStateEntry> exporterStateConsumer;

    public ExporterStateDistributionService(BiConsumer<String, ExporterStateDistributeMessage.ExporterStateEntry> biConsumer, PartitionMessagingService partitionMessagingService, String str) {
        this.exporterStateConsumer = biConsumer;
        this.partitionMessagingService = partitionMessagingService;
        this.exporterStateTopic = str;
    }

    public void subscribeForExporterState(Executor executor) {
        this.partitionMessagingService.subscribe(this.exporterStateTopic, this::storeExporterState, executor);
    }

    private void storeExporterState(ByteBuffer byteBuffer) {
        DirectBuffer unsafeBuffer = new UnsafeBuffer(byteBuffer);
        ExporterStateDistributeMessage exporterStateDistributeMessage = new ExporterStateDistributeMessage();
        exporterStateDistributeMessage.wrap(unsafeBuffer, 0, unsafeBuffer.capacity());
        Map<String, ExporterStateDistributeMessage.ExporterStateEntry> exporterState = exporterStateDistributeMessage.getExporterState();
        Loggers.EXPORTER_LOGGER.trace("Received new exporter state {}", exporterState);
        PERIODIC_LOGGER.debug("Current exporter state {}", exporterState);
        exporterState.forEach(this.exporterStateConsumer);
    }

    public void distributeExporterState(ExporterStateDistributeMessage exporterStateDistributeMessage) {
        this.partitionMessagingService.broadcast(this.exporterStateTopic, exporterStateDistributeMessage.toByteBuffer());
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.partitionMessagingService.unsubscribe(this.exporterStateTopic);
    }
}
