package io.zeebe.broker.engine.impl;

import io.atomix.cluster.messaging.ClusterEventService;
import io.atomix.cluster.messaging.Subscription;
import io.zeebe.engine.Loggers;
import io.zeebe.logstreams.state.SnapshotChunk;
import io.zeebe.logstreams.state.SnapshotReplication;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/broker/engine/impl/StateReplication.class */
public class StateReplication implements SnapshotReplication {
    public static final String REPLICATION_TOPIC_FORMAT = "replication-%d";
    private static final Logger LOG = Loggers.STREAM_PROCESSING;
    private final String replicationTopic;
    private final DirectBuffer readBuffer = new UnsafeBuffer(0, 0);
    private final ClusterEventService eventService;
    private ExecutorService executorService;
    private Subscription subscription;

    public StateReplication(ClusterEventService clusterEventService, int i) {
        this.eventService = clusterEventService;
        this.replicationTopic = String.format(REPLICATION_TOPIC_FORMAT, Integer.valueOf(i));
    }

    public void replicate(SnapshotChunk snapshotChunk) {
        this.eventService.broadcast(this.replicationTopic, snapshotChunk, snapshotChunk2 -> {
            LOG.debug("Replicate on topic {} snapshot chunk {} for snapshot pos {}.", new Object[]{this.replicationTopic, snapshotChunk2.getChunkName(), Long.valueOf(snapshotChunk2.getSnapshotPosition())});
            return new SnapshotChunkImpl(snapshotChunk2).toBytes();
        });
    }

    public void consume(Consumer<SnapshotChunk> consumer) {
        this.executorService = Executors.newSingleThreadExecutor(runnable -> {
            return new Thread(runnable, this.replicationTopic);
        });
        this.subscription = (Subscription) this.eventService.subscribe(this.replicationTopic, bArr -> {
            this.readBuffer.wrap(bArr);
            SnapshotChunkImpl snapshotChunkImpl = new SnapshotChunkImpl();
            snapshotChunkImpl.wrap(this.readBuffer, 0, bArr.length);
            LOG.debug("Received on topic {} replicated snapshot chunk {} for snapshot pos {}.", new Object[]{this.replicationTopic, snapshotChunkImpl.getChunkName(), Long.valueOf(snapshotChunkImpl.getSnapshotPosition())});
            return snapshotChunkImpl;
        }, consumer, this.executorService).join();
    }

    public void close() {
        if (this.subscription != null) {
            this.subscription.close().join();
            this.subscription = null;
        }
        if (this.executorService != null) {
            this.executorService.shutdownNow();
            this.executorService = null;
        }
    }
}
