package org.apache.pulsar.broker.service.persistent;

import io.netty.buffer.ByteBuf;
import io.prometheus.client.Gauge;
import java.io.IOException;
import java.time.Clock;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.common.api.proto.ClusterMessageId;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.MarkersMessageIdData;
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshotRequest;
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshotResponse;
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsUpdate;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.util.Runnables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.class */
public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.PublishContext {
    private final PersistentTopic topic;
    private final String localCluster;
    private String lastCompletedSnapshotId;
    private volatile Position positionOfLastLocalMarker;
    private final ScheduledFuture<?> timer;
    private static final Logger log = LoggerFactory.getLogger(ReplicatedSubscriptionsController.class);
    private static final Gauge pendingSnapshotsMetric = Gauge.build("pulsar_replicated_subscriptions_pending_snapshots", "Counter of currently pending snapshots").register();
    private long lastCompletedSnapshotStartTime = 0;
    private final ConcurrentMap<String, ReplicatedSubscriptionsSnapshotBuilder> pendingSnapshots = new ConcurrentHashMap();

    public ReplicatedSubscriptionsController(PersistentTopic persistentTopic, String str) {
        this.topic = persistentTopic;
        this.localCluster = str;
        this.timer = persistentTopic.getBrokerService().pulsar().getExecutor().scheduleAtFixedRate(Runnables.catchingAndLoggingThrowables(this::startNewSnapshot), 0L, persistentTopic.getBrokerService().pulsar().getConfiguration().getReplicatedSubscriptionsSnapshotFrequencyMillis(), TimeUnit.MILLISECONDS);
    }

    public void receivedReplicatedSubscriptionMarker(Position position, int i, ByteBuf byteBuf) {
        try {
            switch (i) {
                case 10:
                    receivedSnapshotRequest(Markers.parseReplicatedSubscriptionsSnapshotRequest(byteBuf));
                    break;
                case FLOAT_VALUE:
                    receivedSnapshotResponse(position, Markers.parseReplicatedSubscriptionsSnapshotResponse(byteBuf));
                    break;
                case DATE_VALUE:
                    receiveSubscriptionUpdated(Markers.parseReplicatedSubscriptionsUpdate(byteBuf));
                    break;
            }
        } catch (IOException e) {
            log.warn("[{}] Failed to parse marker: {}", this.topic.getName(), e);
        }
    }

    public void localSubscriptionUpdated(String str, ReplicatedSubscriptionsSnapshot replicatedSubscriptionsSnapshot) {
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Updating subscription to snapshot {}", new Object[]{this.topic, str, replicatedSubscriptionsSnapshot.getClustersList().stream().map(clusterMessageId -> {
                return String.format("%s -> %d:%d", clusterMessageId.getCluster(), Long.valueOf(clusterMessageId.getMessageId().getLedgerId()), Long.valueOf(clusterMessageId.getMessageId().getEntryId()));
            }).collect(Collectors.toList())});
        }
        TreeMap treeMap = new TreeMap();
        int clustersCount = replicatedSubscriptionsSnapshot.getClustersCount();
        for (int i = 0; i < clustersCount; i++) {
            ClusterMessageId clusterAt = replicatedSubscriptionsSnapshot.getClusterAt(i);
            treeMap.put(clusterAt.getCluster(), clusterAt.getMessageId());
        }
        writeMarker(Markers.newReplicatedSubscriptionsUpdate(str, treeMap));
    }

    private void receivedSnapshotRequest(ReplicatedSubscriptionsSnapshotRequest replicatedSubscriptionsSnapshotRequest) {
        PositionImpl lastPosition = this.topic.getLastPosition();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received snapshot request. Last msg id: {}", this.topic.getName(), lastPosition);
        }
        writeMarker(Markers.newReplicatedSubscriptionsSnapshotResponse(replicatedSubscriptionsSnapshotRequest.getSnapshotId(), replicatedSubscriptionsSnapshotRequest.getSourceCluster(), this.localCluster, lastPosition.getLedgerId(), lastPosition.getEntryId()));
    }

    private void receivedSnapshotResponse(Position position, ReplicatedSubscriptionsSnapshotResponse replicatedSubscriptionsSnapshotResponse) {
        String snapshotId = replicatedSubscriptionsSnapshotResponse.getSnapshotId();
        ReplicatedSubscriptionsSnapshotBuilder replicatedSubscriptionsSnapshotBuilder = this.pendingSnapshots.get(snapshotId);
        if (replicatedSubscriptionsSnapshotBuilder != null) {
            replicatedSubscriptionsSnapshotBuilder.receivedSnapshotResponse(position, replicatedSubscriptionsSnapshotResponse);
        } else if (log.isDebugEnabled()) {
            log.debug("[{}] Received late reply for timed-out snapshot {} from {}", new Object[]{this.topic.getName(), snapshotId, replicatedSubscriptionsSnapshotResponse.getCluster().getCluster()});
        }
    }

    private void receiveSubscriptionUpdated(ReplicatedSubscriptionsUpdate replicatedSubscriptionsUpdate) {
        MarkersMessageIdData markersMessageIdData = null;
        int clustersCount = replicatedSubscriptionsUpdate.getClustersCount();
        for (int i = 0; i < clustersCount; i++) {
            ClusterMessageId clusterAt = replicatedSubscriptionsUpdate.getClusterAt(i);
            if (this.localCluster.equals(clusterAt.getCluster())) {
                markersMessageIdData = clusterAt.getMessageId();
            }
        }
        if (markersMessageIdData == null) {
            return;
        }
        PositionImpl positionImpl = new PositionImpl(markersMessageIdData.getLedgerId(), markersMessageIdData.getEntryId());
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Received update for subscription to {}", new Object[]{this.topic, replicatedSubscriptionsUpdate.getSubscriptionName(), positionImpl});
        }
        PersistentSubscription subscription = this.topic.getSubscription(replicatedSubscriptionsUpdate.getSubscriptionName());
        if (subscription != null) {
            subscription.acknowledgeMessage(Collections.singletonList(positionImpl), CommandAck.AckType.Cumulative, Collections.emptyMap());
        } else {
            log.info("[{}][{}] Creating subscription at {}:{} after receiving update from replicated subcription", new Object[]{this.topic, replicatedSubscriptionsUpdate.getSubscriptionName(), Long.valueOf(markersMessageIdData.getLedgerId()), positionImpl});
            this.topic.createSubscription(replicatedSubscriptionsUpdate.getSubscriptionName(), CommandSubscribe.InitialPosition.Latest, true);
        }
    }

    private void startNewSnapshot() {
        cleanupTimedOutSnapshots();
        if (this.topic.getLastDataMessagePublishedTimestamp() < this.lastCompletedSnapshotStartTime) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] There is no new data in topic. Skipping snapshot creation.", this.topic.getName());
                return;
            }
            return;
        }
        MutableBoolean mutableBoolean = new MutableBoolean();
        this.topic.getReplicators().forEach((str, replicator) -> {
            if (replicator.isConnected()) {
                return;
            }
            mutableBoolean.setTrue();
        });
        if (mutableBoolean.isTrue()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Do not attempt to create snapshot when some of the clusters are not reachable.", this.topic.getName());
            }
        } else {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Starting snapshot creation.", this.topic.getName());
            }
            pendingSnapshotsMetric.inc();
            ReplicatedSubscriptionsSnapshotBuilder replicatedSubscriptionsSnapshotBuilder = new ReplicatedSubscriptionsSnapshotBuilder(this, this.topic.getReplicators().keys(), this.topic.getBrokerService().pulsar().getConfiguration(), Clock.systemUTC());
            this.pendingSnapshots.put(replicatedSubscriptionsSnapshotBuilder.getSnapshotId(), replicatedSubscriptionsSnapshotBuilder);
            replicatedSubscriptionsSnapshotBuilder.start();
        }
    }

    public Optional<String> getLastCompletedSnapshotId() {
        return Optional.ofNullable(this.lastCompletedSnapshotId);
    }

    private void cleanupTimedOutSnapshots() {
        Iterator<Map.Entry<String, ReplicatedSubscriptionsSnapshotBuilder>> it = this.pendingSnapshots.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, ReplicatedSubscriptionsSnapshotBuilder> next = it.next();
            if (next.getValue().isTimedOut()) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Snapshot creation timed out for {}", this.topic.getName(), next.getKey());
                }
                pendingSnapshotsMetric.dec();
                it.remove();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void snapshotCompleted(String str) {
        ReplicatedSubscriptionsSnapshotBuilder remove = this.pendingSnapshots.remove(str);
        pendingSnapshotsMetric.dec();
        this.lastCompletedSnapshotId = str;
        if (remove != null) {
            this.lastCompletedSnapshotStartTime = remove.getStartTimeMillis();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void writeMarker(ByteBuf byteBuf) {
        try {
            this.topic.publishMessage(byteBuf, this);
        } finally {
            byteBuf.release();
        }
    }

    @Override // org.apache.pulsar.broker.service.Topic.PublishContext
    public void completed(Exception exc, long j, long j2) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] Published marker at {}:{}. Exception: {}", new Object[]{this.topic.getName(), Long.valueOf(j), Long.valueOf(j2), exc});
        }
        this.positionOfLastLocalMarker = new PositionImpl(j, j2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PersistentTopic topic() {
        return this.topic;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String localCluster() {
        return this.localCluster;
    }

    @Override // org.apache.pulsar.broker.service.Topic.PublishContext
    public boolean isMarkerMessage() {
        return true;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.timer.cancel(true);
    }
}
