package org.apache.pulsar.broker.service.persistent;

import io.netty.buffer.ByteBuf;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.class */
public class GeoPersistentReplicator extends PersistentReplicator {
    private static final Logger log = LoggerFactory.getLogger(GeoPersistentReplicator.class);

    public GeoPersistentReplicator(PersistentTopic persistentTopic, ManagedCursor managedCursor, String str, String str2, BrokerService brokerService, PulsarClientImpl pulsarClientImpl) throws PulsarServerException {
        super(str, persistentTopic, managedCursor, str2, persistentTopic.getName(), brokerService, pulsarClientImpl);
    }

    @Override // org.apache.pulsar.broker.service.AbstractReplicator
    protected String getProducerName() {
        return getReplicatorName(this.replicatorPrefix, this.localCluster) + "-->" + this.remoteCluster;
    }

    @Override // org.apache.pulsar.broker.service.AbstractReplicator
    protected CompletableFuture<Void> prepareCreateProducer() {
        if (this.brokerService.getPulsar().getConfig().isCreateTopicToRemoteClusterForReplication()) {
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.replicationClient.getPartitionedTopicMetadata(this.localTopic.getName(), false, false).whenComplete((partitionedTopicMetadata, th) -> {
            if (th != null) {
                completableFuture.completeExceptionally(FutureUtil.unwrapCompletionException(th));
            } else {
                if (partitionedTopicMetadata.partitions == 0) {
                    completableFuture.complete(null);
                    return;
                }
                String format = String.format("{} Can not create the replicator due to the partitions in the remote cluster is not 0, but is %s", this.replicatorId, Integer.valueOf(partitionedTopicMetadata.partitions));
                log.error(format);
                completableFuture.completeExceptionally(new PulsarClientException.NotAllowedException(format));
            }
        });
        return completableFuture;
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentReplicator
    protected boolean replicateEntries(List<Entry> list) {
        boolean z = false;
        boolean isEnableReplicatedSubscriptions = this.brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions();
        boolean z2 = false;
        boolean z3 = false;
        for (int i = 0; i < list.size(); i++) {
            try {
                Entry entry = list.get(i);
                if (z3) {
                    entry.release();
                } else {
                    int length = entry.getLength();
                    ByteBuf dataBuffer = entry.getDataBuffer();
                    try {
                        MessageImpl<?> deserializeSkipBrokerEntryMetaData = MessageImpl.deserializeSkipBrokerEntryMetaData(dataBuffer);
                        if (Markers.isTxnMarker(deserializeSkipBrokerEntryMetaData.getMessageBuilder())) {
                            this.cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
                            entry.release();
                            deserializeSkipBrokerEntryMetaData.recycle();
                        } else if (deserializeSkipBrokerEntryMetaData.getMessageBuilder().hasTxnidLeastBits() && deserializeSkipBrokerEntryMetaData.getMessageBuilder().hasTxnidMostBits() && this.topic.isTxnAborted(new TxnID(deserializeSkipBrokerEntryMetaData.getMessageBuilder().getTxnidMostBits(), deserializeSkipBrokerEntryMetaData.getMessageBuilder().getTxnidLeastBits()), entry.getPosition())) {
                            this.cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
                            entry.release();
                            deserializeSkipBrokerEntryMetaData.recycle();
                        } else {
                            if (isEnableReplicatedSubscriptions) {
                                checkReplicatedSubscriptionMarker(entry.getPosition(), deserializeSkipBrokerEntryMetaData, dataBuffer);
                            }
                            if (deserializeSkipBrokerEntryMetaData.isReplicated()) {
                                this.cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
                                entry.release();
                                deserializeSkipBrokerEntryMetaData.recycle();
                            } else if (deserializeSkipBrokerEntryMetaData.hasReplicateTo() && !deserializeSkipBrokerEntryMetaData.getReplicateTo().contains(this.remoteCluster)) {
                                if (log.isDebugEnabled()) {
                                    log.debug("[{}] Skipping message at position {}, replicateTo {}", new Object[]{this.replicatorId, entry.getPosition(), deserializeSkipBrokerEntryMetaData.getReplicateTo()});
                                }
                                this.cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
                                entry.release();
                                deserializeSkipBrokerEntryMetaData.recycle();
                            } else if (STATE_UPDATER.get(this) != AbstractReplicator.State.Started || z2) {
                                if (log.isDebugEnabled()) {
                                    log.debug("[{}] Dropping read message at {} because producer is not ready", this.replicatorId, entry.getPosition());
                                }
                                z2 = true;
                                entry.release();
                                deserializeSkipBrokerEntryMetaData.recycle();
                            } else {
                                this.dispatchRateLimiter.ifPresent(dispatchRateLimiter -> {
                                    dispatchRateLimiter.consumeDispatchQuota(1L, entry.getLength());
                                });
                                deserializeSkipBrokerEntryMetaData.setReplicatedFrom(this.localCluster);
                                dataBuffer.retain();
                                CompletableFuture<SchemaInfo> schemaInfo = getSchemaInfo(deserializeSkipBrokerEntryMetaData);
                                if (!schemaInfo.isDone() || schemaInfo.isCompletedExceptionally()) {
                                    entry.release();
                                    dataBuffer.release();
                                    deserializeSkipBrokerEntryMetaData.recycle();
                                    this.fetchSchemaInProgress = true;
                                    z3 = true;
                                    this.cursor.cancelPendingReadRequest();
                                    log.info("[{}] Pause the data replication due to new detected schema", this.replicatorId);
                                    schemaInfo.whenComplete((schemaInfo2, th) -> {
                                        if (th != null) {
                                            log.warn("[{}] Failed to get schema from local cluster, will try in the next loop", this.replicatorId, th);
                                        }
                                        log.info("[{}] Resume the data replication after the schema fetching done", this.replicatorId);
                                        this.cursor.rewind();
                                        this.fetchSchemaInProgress = false;
                                        readMoreEntries();
                                    });
                                } else {
                                    deserializeSkipBrokerEntryMetaData.setSchemaInfoForReplicator(schemaInfo.get());
                                    deserializeSkipBrokerEntryMetaData.getMessageBuilder().clearTxnidMostBits();
                                    deserializeSkipBrokerEntryMetaData.getMessageBuilder().clearTxnidLeastBits();
                                    this.msgOut.recordEvent(dataBuffer.readableBytes());
                                    this.stats.incrementMsgOutCounter();
                                    this.stats.incrementBytesOutCounter(dataBuffer.readableBytes());
                                    PENDING_MESSAGES_UPDATER.incrementAndGet(this);
                                    this.producer.sendAsync(deserializeSkipBrokerEntryMetaData, PersistentReplicator.ProducerSendCallback.create(this, entry, deserializeSkipBrokerEntryMetaData));
                                    z = true;
                                }
                            }
                        }
                    } catch (Throwable th2) {
                        log.error("[{}] Failed to deserialize message at {} (buffer size: {}): {}", new Object[]{this.replicatorId, entry.getPosition(), Integer.valueOf(length), th2.getMessage(), th2});
                        this.cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
                        entry.release();
                    }
                }
            } catch (Exception e) {
                log.error("[{}] Unexpected exception in replication task: {}", new Object[]{this.replicatorId, e.getMessage(), e});
            }
        }
        return z;
    }
}
