package org.joyqueue.client.internal.producer.support;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.joyqueue.client.internal.cluster.ClusterManager;
import org.joyqueue.client.internal.metadata.domain.TopicMetadata;
import org.joyqueue.client.internal.nameserver.NameServerConfig;
import org.joyqueue.client.internal.producer.MessageProducer;
import org.joyqueue.client.internal.producer.MessageSender;
import org.joyqueue.client.internal.producer.TransactionMessageProducer;
import org.joyqueue.client.internal.producer.callback.AsyncBatchProduceCallback;
import org.joyqueue.client.internal.producer.callback.AsyncProduceCallback;
import org.joyqueue.client.internal.producer.callback.CompletableFutureAsyncBatchProduceCallback;
import org.joyqueue.client.internal.producer.callback.CompletableFutureAsyncProduceCallback;
import org.joyqueue.client.internal.producer.config.ProducerConfig;
import org.joyqueue.client.internal.producer.config.SenderConfig;
import org.joyqueue.client.internal.producer.domain.ProduceMessage;
import org.joyqueue.client.internal.producer.domain.SendResult;
import org.joyqueue.client.internal.producer.exception.ProducerException;
import org.joyqueue.client.internal.producer.interceptor.ProducerInterceptor;
import org.joyqueue.client.internal.producer.interceptor.ProducerInterceptorManager;
import org.joyqueue.client.internal.producer.transport.ProducerClientManager;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.shaded.com.google.common.base.Preconditions;
import org.joyqueue.shaded.org.apache.commons.lang3.StringUtils;
import org.joyqueue.toolkit.service.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/client/internal/producer/support/DefaultMessageProducer.class */
public class DefaultMessageProducer extends Service implements MessageProducer {
    protected static final Logger logger = LoggerFactory.getLogger(DefaultMessageProducer.class);
    private ProducerConfig config;
    private NameServerConfig nameServerConfig;
    private ClusterManager clusterManager;
    private ProducerClientManager producerClientManager;
    private SenderConfig senderConfig;
    private MessageSender messageSender;
    private AtomicLong transactionSequence;
    private MessageProducerInner messageProducerInner;
    private ProducerInterceptorManager producerInterceptorManager = new ProducerInterceptorManager();

    public DefaultMessageProducer(ProducerConfig producerConfig, NameServerConfig nameServerConfig, ClusterManager clusterManager, ProducerClientManager producerClientManager) {
        Preconditions.checkArgument(producerConfig != null, "producer not null");
        Preconditions.checkArgument(nameServerConfig != null, "nameServer not null");
        Preconditions.checkArgument(clusterManager != null, "clusterManager not null");
        Preconditions.checkArgument(producerClientManager != null, "producerClientManager not null");
        Preconditions.checkArgument(StringUtils.isNotBlank(producerConfig.getApp()), "producer.app not blank");
        Preconditions.checkArgument(producerConfig.getRetryPolicy() != null, "producer.retryPolicy not null");
        Preconditions.checkArgument(producerConfig.getQosLevel() != null, "producer.qosLevel not null");
        this.config = producerConfig;
        this.nameServerConfig = nameServerConfig;
        this.clusterManager = clusterManager;
        this.producerClientManager = producerClientManager;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.joyqueue.toolkit.service.Service, org.joyqueue.toolkit.service.Activity
    public void validate() throws Exception {
        this.transactionSequence = new AtomicLong();
        this.senderConfig = new SenderConfig(this.config.isCompress(), this.config.getCompressThreshold(), this.config.getCompressType(), this.config.isBatch());
        this.messageSender = new DefaultMessageSender(this.producerClientManager, this.senderConfig);
        this.messageProducerInner = new MessageProducerInner(this.config, this.nameServerConfig, this.messageSender, this.clusterManager, this.producerClientManager, this.producerInterceptorManager);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.joyqueue.toolkit.service.Activity
    public void doStart() throws Exception {
        this.messageSender.start();
        this.messageProducerInner.start();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.joyqueue.toolkit.service.Activity
    public void doStop() {
        if (this.messageProducerInner != null) {
            this.messageProducerInner.stop();
        }
        if (this.messageSender != null) {
            this.messageSender.stop();
        }
    }

    @Override // org.joyqueue.client.internal.producer.MessageProducer
    public SendResult send(ProduceMessage produceMessage) {
        return send(produceMessage, this.config.getTimeout(), TimeUnit.MILLISECONDS);
    }

    @Override // org.joyqueue.client.internal.producer.MessageProducer
    public SendResult send(ProduceMessage produceMessage, long j, TimeUnit timeUnit) {
        return doSend(produceMessage, j, timeUnit, false, null);
    }

    @Override // org.joyqueue.client.internal.producer.MessageProducer
    public List<SendResult> batchSend(List<ProduceMessage> list) {
        return batchSend(list, this.config.getTimeout(), TimeUnit.MILLISECONDS);
    }

    @Override // org.joyqueue.client.internal.producer.MessageProducer
    public List<SendResult> batchSend(List<ProduceMessage> list, long j, TimeUnit timeUnit) {
        return doBatchSend(list, j, timeUnit, false, null);
    }

    @Override // org.joyqueue.client.internal.producer.MessageProducer
    public void sendOneway(ProduceMessage produceMessage) {
        sendOneway(produceMessage, this.config.getTimeout(), TimeUnit.MILLISECONDS);
    }

    @Override // org.joyqueue.client.internal.producer.MessageProducer
    public void sendOneway(ProduceMessage produceMessage, long j, TimeUnit timeUnit) {
        doSend(produceMessage, j, timeUnit, true, null);
    }

    @Override // org.joyqueue.client.internal.producer.MessageProducer
    public void batchSendOneway(List<ProduceMessage> list) {
        batchSendOneway(list, this.config.getTimeout(), TimeUnit.MILLISECONDS);
    }

    @Override // org.joyqueue.client.internal.producer.MessageProducer
    public void batchSendOneway(List<ProduceMessage> list, long j, TimeUnit timeUnit) {
        doBatchSend(list, j, timeUnit, true, null);
    }

    @Override // org.joyqueue.client.internal.producer.MessageProducer
    public CompletableFuture<SendResult> sendAsync(ProduceMessage produceMessage) {
        return sendAsync(produceMessage, this.config.getTimeout(), TimeUnit.MILLISECONDS);
    }

    @Override // org.joyqueue.client.internal.producer.MessageProducer
    public CompletableFuture<SendResult> sendAsync(ProduceMessage produceMessage, long j, TimeUnit timeUnit) {
        CompletableFuture<SendResult> completableFuture = new CompletableFuture<>();
        doSend(produceMessage, j, timeUnit, false, new CompletableFutureAsyncProduceCallback(completableFuture));
        return completableFuture;
    }

    @Override // org.joyqueue.client.internal.producer.MessageProducer
    public CompletableFuture<List<SendResult>> batchSendAsync(List<ProduceMessage> list) {
        return batchSendAsync(list, this.config.getTimeout(), TimeUnit.MILLISECONDS);
    }

    @Override // org.joyqueue.client.internal.producer.MessageProducer
    public CompletableFuture<List<SendResult>> batchSendAsync(List<ProduceMessage> list, long j, TimeUnit timeUnit) {
        CompletableFuture<List<SendResult>> completableFuture = new CompletableFuture<>();
        doBatchSend(list, j, timeUnit, false, new CompletableFutureAsyncBatchProduceCallback(completableFuture));
        return completableFuture;
    }

    protected SendResult doSend(ProduceMessage produceMessage, long j, TimeUnit timeUnit, boolean z, AsyncProduceCallback asyncProduceCallback) {
        checkState();
        return this.messageProducerInner.send(produceMessage, null, j, timeUnit, z, this.config.isFailover(), asyncProduceCallback);
    }

    protected List<SendResult> doBatchSend(List<ProduceMessage> list, long j, TimeUnit timeUnit, boolean z, AsyncBatchProduceCallback asyncBatchProduceCallback) {
        checkState();
        return this.messageProducerInner.batchSend(list, null, j, timeUnit, z, this.config.isFailover(), asyncBatchProduceCallback);
    }

    @Override // org.joyqueue.client.internal.producer.MessageProducer
    public TransactionMessageProducer beginTransaction() {
        return beginTransaction(this.config.getTransactionTimeout(), TimeUnit.MILLISECONDS);
    }

    @Override // org.joyqueue.client.internal.producer.MessageProducer
    public TransactionMessageProducer beginTransaction(long j, TimeUnit timeUnit) {
        return new DefaultTransactionMessageProducer(null, j, timeUnit, this.transactionSequence.getAndIncrement(), this.config, this.nameServerConfig, this.clusterManager, this.messageSender, this.messageProducerInner);
    }

    @Override // org.joyqueue.client.internal.producer.MessageProducer
    public TransactionMessageProducer beginTransaction(String str) {
        return beginTransaction(str, this.config.getTransactionTimeout(), TimeUnit.MILLISECONDS);
    }

    @Override // org.joyqueue.client.internal.producer.MessageProducer
    public TransactionMessageProducer beginTransaction(String str, long j, TimeUnit timeUnit) {
        checkState();
        Preconditions.checkArgument(StringUtils.isNotBlank(str), "transactionId not blank");
        return new DefaultTransactionMessageProducer(str, j, timeUnit, this.transactionSequence.getAndIncrement(), this.config, this.nameServerConfig, this.clusterManager, this.messageSender, this.messageProducerInner);
    }

    @Override // org.joyqueue.client.internal.producer.MessageProducer
    public TopicMetadata getTopicMetadata(String str) {
        checkState();
        Preconditions.checkArgument(StringUtils.isNotBlank(str), "topic not blank");
        return this.clusterManager.fetchTopicMetadata(this.messageProducerInner.getTopicFullName(str), this.config.getApp());
    }

    @Override // org.joyqueue.client.internal.producer.MessageProducer
    public synchronized void addInterceptor(ProducerInterceptor producerInterceptor) {
        Preconditions.checkArgument(producerInterceptor != null, "interceptor can not be null");
        this.producerInterceptorManager.addInterceptor(producerInterceptor);
    }

    @Override // org.joyqueue.client.internal.producer.MessageProducer
    public synchronized void removeInterceptor(ProducerInterceptor producerInterceptor) {
        Preconditions.checkArgument(producerInterceptor != null, "interceptor can not be null");
        this.producerInterceptorManager.removeInterceptor(producerInterceptor);
    }

    protected void checkState() {
        if (!isStarted()) {
            throw new ProducerException("producer is not started", JoyQueueCode.CN_SERVICE_NOT_AVAILABLE.getCode());
        }
    }
}
