/*
 * Decompiled with CFR 0.152.
 */
package com.lmaye.cloud.starter.delay.queue.producer;

import com.github.rholder.retry.RetryListener;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import com.lmaye.cloud.starter.delay.queue.DelayQueueProperties;
import com.lmaye.cloud.starter.delay.queue.producer.KafkaRetryListener;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;

@Component
public class KafkaProducer {
    private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    @Autowired
    private DelayQueueProperties properties;

    @Async
    public void send(String topic, String msg) throws Exception {
        Callable<SendResult> callable = () -> {
            ListenableFuture send = this.kafkaTemplate.send(topic, (Object)msg);
            send.addCallback(result -> {
                if (!Objects.isNull(result) && !Objects.isNull(result.getRecordMetadata())) {
                    log.debug("\u6d88\u606f\u53d1\u9001\u6210\u529f offset: {}", (Object)result.getRecordMetadata().offset());
                }
            }, throwable -> log.error("\u6d88\u606f\u53d1\u9001\u5931\u8d25: {}", (Object)throwable.getMessage()));
            return (SendResult)send.get();
        };
        Retryer retry = RetryerBuilder.newBuilder().retryIfResult(Objects::isNull).retryIfResult(result -> Objects.isNull(result.getRecordMetadata())).retryIfException().withStopStrategy(StopStrategies.stopAfterAttempt((int)this.properties.getRetryNums().intValue())).withWaitStrategy(WaitStrategies.fixedWait((long)this.properties.getRetrySleepTime(), (TimeUnit)TimeUnit.SECONDS)).withRetryListener((RetryListener)new KafkaRetryListener(this.properties)).build();
        retry.call(callable);
    }
}

