package com.netflix.conductor.contribs.tasks.kafka;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.netflix.conductor.contribs.tasks.kafka.KafkaPublishTask;
import java.time.Duration;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:com/netflix/conductor/contribs/tasks/kafka/KafkaProducerManager.class */
public class KafkaProducerManager {
    private final String requestTimeoutConfig;
    private final Cache<Properties, Producer> kafkaProducerCache;
    private final String maxBlockMsConfig;
    private static final String STRING_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerManager.class);
    private static final RemovalListener<Properties, Producer> LISTENER = removalNotification -> {
        if (removalNotification.getValue() != null) {
            ((Producer) removalNotification.getValue()).close();
            LOGGER.info("Closed producer for {}", removalNotification.getKey());
        }
    };

    @Autowired
    public KafkaProducerManager(@Value("${conductor.tasks.kafka-publish.requestTimeout:100ms}") Duration duration, @Value("${conductor.tasks.kafka-publish.maxBlock:500ms}") Duration duration2, @Value("${conductor.tasks.kafka-publish.cacheSize:10}") int i, @Value("${conductor.tasks.kafka-publish.cacheTime:120000ms}") Duration duration3) {
        this.requestTimeoutConfig = String.valueOf(duration.toMillis());
        this.maxBlockMsConfig = String.valueOf(duration2.toMillis());
        this.kafkaProducerCache = CacheBuilder.newBuilder().removalListener(LISTENER).maximumSize(i).expireAfterAccess(duration3.toMillis(), TimeUnit.MILLISECONDS).build();
    }

    public Producer getProducer(KafkaPublishTask.Input input) {
        Properties producerProperties = getProducerProperties(input);
        return getFromCache(producerProperties, () -> {
            return new KafkaProducer(producerProperties);
        });
    }

    @VisibleForTesting
    Producer getFromCache(Properties properties, Callable<Producer> callable) {
        try {
            return (Producer) this.kafkaProducerCache.get(properties, callable);
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    @VisibleForTesting
    Properties getProducerProperties(KafkaPublishTask.Input input) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", input.getBootStrapServers());
        properties.put("key.serializer", input.getKeySerializer());
        String str = this.requestTimeoutConfig;
        if (Objects.nonNull(input.getRequestTimeoutMs())) {
            str = String.valueOf(input.getRequestTimeoutMs());
        }
        String str2 = this.maxBlockMsConfig;
        if (Objects.nonNull(input.getMaxBlockMs())) {
            str2 = String.valueOf(input.getMaxBlockMs());
        }
        properties.put("request.timeout.ms", str);
        properties.put("max.block.ms", str2);
        properties.put("value.serializer", STRING_SERIALIZER);
        return properties;
    }
}
