package org.apache.pulsar.broker.service;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/AbstractReplicator.class */
public abstract class AbstractReplicator {
    protected final BrokerService brokerService;
    protected final String topicName;
    protected final String localCluster;
    protected final String remoteCluster;
    protected final PulsarClientImpl replicationClient;
    protected final PulsarClientImpl client;
    public static final String REPL_PRODUCER_NAME_DELIMITER = "-->";
    protected final int producerQueueSize;
    protected final ProducerBuilder<byte[]> producerBuilder;
    protected final String replicatorPrefix;
    protected static final AtomicReferenceFieldUpdater<AbstractReplicator, State> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(AbstractReplicator.class, State.class, "state");
    private static final Logger log = LoggerFactory.getLogger(AbstractReplicator.class);
    protected final Backoff backOff = new Backoff(100, TimeUnit.MILLISECONDS, 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
    private volatile State state = State.Stopped;
    protected volatile ProducerImpl producer = null;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/pulsar/broker/service/AbstractReplicator$State.class */
    public enum State {
        Stopped,
        Starting,
        Started,
        Stopping
    }

    public AbstractReplicator(String str, String str2, String str3, String str4, BrokerService brokerService, PulsarClientImpl pulsarClientImpl) throws PulsarServerException {
        this.brokerService = brokerService;
        this.topicName = str;
        this.replicatorPrefix = str2;
        this.localCluster = str3.intern();
        this.remoteCluster = str4.intern();
        this.replicationClient = pulsarClientImpl;
        this.client = brokerService.pulsar().getClient();
        this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();
        this.producerBuilder = pulsarClientImpl.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(str).messageRoutingMode(MessageRoutingMode.SinglePartition).enableBatching(false).sendTimeout(0, TimeUnit.SECONDS).maxPendingMessages(this.producerQueueSize).producerName(String.format("%s%s%s", getReplicatorName(str2, str3), REPL_PRODUCER_NAME_DELIMITER, str4));
        STATE_UPDATER.set(this, State.Stopped);
    }

    protected abstract void readEntries(org.apache.pulsar.client.api.Producer<byte[]> producer);

    protected abstract Position getReplicatorReadPosition();

    protected abstract long getNumberOfEntriesInBacklog();

    protected abstract void disableReplicatorRead();

    public String getRemoteCluster() {
        return this.remoteCluster;
    }

    public synchronized void startProducer() {
        if (STATE_UPDATER.get(this) == State.Stopping) {
            long next = this.backOff.next();
            if (log.isDebugEnabled()) {
                log.debug("[{}][{} -> {}] waiting for producer to close before attempting to reconnect, retrying in {} s", new Object[]{this.topicName, this.localCluster, this.remoteCluster, Double.valueOf(next / 1000.0d)});
            }
            this.brokerService.executor().schedule(this::startProducer, next, TimeUnit.MILLISECONDS);
            return;
        }
        State state = STATE_UPDATER.get(this);
        if (STATE_UPDATER.compareAndSet(this, State.Stopped, State.Starting)) {
            log.info("[{}][{} -> {}] Starting replicator", new Object[]{this.topicName, this.localCluster, this.remoteCluster});
            this.producerBuilder.createAsync().thenAccept(producer -> {
                readEntries(producer);
            }).exceptionally(th -> {
                if (!STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) {
                    log.warn("[{}][{} -> {}] Failed to create remote producer. Replicator state: {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, STATE_UPDATER.get(this), th});
                    return null;
                }
                long next2 = this.backOff.next();
                log.warn("[{}][{} -> {}] Failed to create remote producer ({}), retrying in {} s", new Object[]{this.topicName, this.localCluster, this.remoteCluster, th.getMessage(), Double.valueOf(next2 / 1000.0d)});
                this.brokerService.executor().schedule(this::startProducer, next2, TimeUnit.MILLISECONDS);
                return null;
            });
        } else if (state != State.Started) {
            log.info("[{}][{} -> {}] Replicator already being started. Replicator state: {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, state});
        } else if (log.isDebugEnabled()) {
            log.debug("[{}][{} -> {}] Replicator was already running", new Object[]{this.topicName, this.localCluster, this.remoteCluster});
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized CompletableFuture<Void> closeProducerAsync() {
        if (this.producer == null) {
            STATE_UPDATER.set(this, State.Stopped);
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Void> closeAsync = this.producer.closeAsync();
        closeAsync.thenRun(() -> {
            STATE_UPDATER.set(this, State.Stopped);
            this.producer = null;
            disableReplicatorRead();
        }).exceptionally(th -> {
            long next = this.backOff.next();
            log.warn("[{}][{} -> {}] Exception: '{}' occurred while trying to close the producer. retrying again in {} s", new Object[]{this.topicName, this.localCluster, this.remoteCluster, th.getMessage(), Double.valueOf(next / 1000.0d)});
            this.brokerService.executor().schedule(this::closeProducerAsync, next, TimeUnit.MILLISECONDS);
            return null;
        });
        return closeAsync;
    }

    public CompletableFuture<Void> disconnect() {
        return disconnect(false);
    }

    public synchronized CompletableFuture<Void> disconnect(boolean z) {
        if (z && getNumberOfEntriesInBacklog() > 0) {
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            completableFuture.completeExceptionally(new BrokerServiceException.TopicBusyException("Cannot close a replicator with backlog"));
            if (log.isDebugEnabled()) {
                log.debug("[{}][{} -> {}] Replicator disconnect failed since topic has backlog", new Object[]{this.topicName, this.localCluster, this.remoteCluster});
            }
            return completableFuture;
        }
        if (STATE_UPDATER.get(this) == State.Stopping) {
            return CompletableFuture.completedFuture(null);
        }
        if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopping) || STATE_UPDATER.compareAndSet(this, State.Started, State.Stopping)) {
            log.info("[{}][{} -> {}] Disconnect replicator at position {} with backlog {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, getReplicatorReadPosition(), Long.valueOf(getNumberOfEntriesInBacklog())});
        }
        return closeProducerAsync();
    }

    public CompletableFuture<Void> remove() {
        return CompletableFuture.completedFuture(null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isWritable() {
        ProducerImpl producerImpl = this.producer;
        return producerImpl != null && producerImpl.isWritable();
    }

    public static String getRemoteCluster(String str) {
        String[] split = str.split("\\.");
        return split[split.length - 1];
    }

    public static String getReplicatorName(String str, String str2) {
        return (str + "." + str2).intern();
    }

    public static CompletableFuture<Void> validatePartitionedTopicAsync(String str, BrokerService brokerService) {
        TopicName topicName = TopicName.get(str);
        return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources().partitionedTopicExistsAsync(topicName).thenCompose(bool -> {
            if (!bool.booleanValue()) {
                return CompletableFuture.completedFuture(null);
            }
            String str2 = topicName + " is a partitioned-topic and replication can't be started for partitioned-producer ";
            log.error(str2);
            return FutureUtil.failedFuture(new BrokerServiceException.NamingException(str2));
        });
    }

    public State getState() {
        return this.state;
    }
}
