package io.camunda.zeebe.broker.system.partitions.impl;

import io.atomix.raft.snapshot.impl.SnapshotChunkImpl;
import io.camunda.zeebe.broker.system.partitions.PartitionMessagingService;
import io.camunda.zeebe.broker.system.partitions.SnapshotReplication;
import io.camunda.zeebe.engine.Loggers;
import io.camunda.zeebe.snapshots.SnapshotChunk;
import io.camunda.zeebe.util.sched.Actor;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.slf4j.Logger;

/* loaded from: input_file:io/camunda/zeebe/broker/system/partitions/impl/StateReplication.class */
public final class StateReplication implements SnapshotReplication {
    private static final String REPLICATION_TOPIC_FORMAT = "replication-%d";
    private static final Logger LOG = Loggers.STREAM_PROCESSING;
    private final String replicationTopic;
    private final DirectBuffer readBuffer = new UnsafeBuffer(0, 0);
    private final PartitionMessagingService messagingService;
    private final String threadName;
    private ExecutorService executorService;

    public StateReplication(PartitionMessagingService partitionMessagingService, int i, int i2) {
        this.messagingService = partitionMessagingService;
        this.replicationTopic = String.format(REPLICATION_TOPIC_FORMAT, Integer.valueOf(i));
        this.threadName = Actor.buildActorName(i2, "StateReplication", i);
    }

    @Override // io.camunda.zeebe.broker.system.partitions.SnapshotReplication
    public void replicate(SnapshotChunk snapshotChunk) {
        LOG.trace("Replicate on topic {} snapshot chunk {} for snapshot {}.", new Object[]{this.replicationTopic, snapshotChunk.getChunkName(), snapshotChunk.getSnapshotId()});
        this.messagingService.broadcast(this.replicationTopic, serializeSnapshotChunk(snapshotChunk));
    }

    @Override // io.camunda.zeebe.broker.system.partitions.SnapshotReplication
    public void consume(Consumer<SnapshotChunk> consumer) {
        this.executorService = Executors.newSingleThreadExecutor(runnable -> {
            return new Thread(runnable, this.threadName);
        });
        this.messagingService.subscribe(this.replicationTopic, byteBuffer -> {
            SnapshotChunk deserializeChunk = deserializeChunk(byteBuffer);
            LOG.trace("Received on topic {} replicated snapshot chunk {} for snapshot {}.", new Object[]{this.replicationTopic, deserializeChunk.getChunkName(), deserializeChunk.getSnapshotId()});
            consumer.accept(deserializeChunk);
        }, this.executorService);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.messagingService.unsubscribe(this.replicationTopic);
        if (this.executorService != null) {
            this.executorService.shutdownNow();
            this.executorService.awaitTermination(10L, TimeUnit.SECONDS);
            this.executorService = null;
        }
    }

    private ByteBuffer serializeSnapshotChunk(SnapshotChunk snapshotChunk) {
        return new SnapshotChunkImpl(snapshotChunk).toByteBuffer();
    }

    private SnapshotChunk deserializeChunk(ByteBuffer byteBuffer) {
        SnapshotChunkImpl snapshotChunkImpl = new SnapshotChunkImpl();
        this.readBuffer.wrap(byteBuffer);
        snapshotChunkImpl.wrap(this.readBuffer, 0, this.readBuffer.capacity());
        return snapshotChunkImpl;
    }
}
