package com.netflix.conductor.contribs.tasks.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
import com.netflix.conductor.core.utils.Utils;
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component("KAFKA_PUBLISH")
/* loaded from: input_file:com/netflix/conductor/contribs/tasks/kafka/KafkaPublishTask.class */
public class KafkaPublishTask extends WorkflowSystemTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPublishTask.class);
    static final String REQUEST_PARAMETER_NAME = "kafka_request";
    private static final String MISSING_REQUEST = "Missing Kafka request. Task input MUST have a 'kafka_request' key with KafkaTask.Input as value. See documentation for KafkaTask for required input parameters";
    private static final String MISSING_BOOT_STRAP_SERVERS = "No boot strap servers specified";
    private static final String MISSING_KAFKA_TOPIC = "Missing Kafka topic. See documentation for KafkaTask for required input parameters";
    private static final String MISSING_KAFKA_VALUE = "Missing Kafka value.  See documentation for KafkaTask for required input parameters";
    private static final String FAILED_TO_INVOKE = "Failed to invoke kafka task due to: ";
    private final ObjectMapper objectMapper;
    private final String requestParameter;
    private final KafkaProducerManager producerManager;

    /* loaded from: input_file:com/netflix/conductor/contribs/tasks/kafka/KafkaPublishTask$Input.class */
    public static class Input {
        public static final String STRING_SERIALIZER = StringSerializer.class.getCanonicalName();
        private String bootStrapServers;
        private Object key;
        private Object value;
        private Integer requestTimeoutMs;
        private Integer maxBlockMs;
        private String topic;
        private Map<String, Object> headers = new HashMap();
        private String keySerializer = STRING_SERIALIZER;

        public Map<String, Object> getHeaders() {
            return this.headers;
        }

        public void setHeaders(Map<String, Object> map) {
            this.headers = map;
        }

        public String getBootStrapServers() {
            return this.bootStrapServers;
        }

        public void setBootStrapServers(String str) {
            this.bootStrapServers = str;
        }

        public Object getKey() {
            return this.key;
        }

        public void setKey(Object obj) {
            this.key = obj;
        }

        public Object getValue() {
            return this.value;
        }

        public void setValue(Object obj) {
            this.value = obj;
        }

        public Integer getRequestTimeoutMs() {
            return this.requestTimeoutMs;
        }

        public void setRequestTimeoutMs(Integer num) {
            this.requestTimeoutMs = num;
        }

        public String getTopic() {
            return this.topic;
        }

        public void setTopic(String str) {
            this.topic = str;
        }

        public String getKeySerializer() {
            return this.keySerializer;
        }

        public void setKeySerializer(String str) {
            this.keySerializer = str;
        }

        public Integer getMaxBlockMs() {
            return this.maxBlockMs;
        }

        public void setMaxBlockMs(Integer num) {
            this.maxBlockMs = num;
        }

        public String toString() {
            return "Input{headers=" + this.headers + ", bootStrapServers='" + this.bootStrapServers + "', key=" + this.key + ", value=" + this.value + ", requestTimeoutMs=" + this.requestTimeoutMs + ", maxBlockMs=" + this.maxBlockMs + ", topic='" + this.topic + "', keySerializer='" + this.keySerializer + "'}";
        }
    }

    @Autowired
    public KafkaPublishTask(KafkaProducerManager kafkaProducerManager, ObjectMapper objectMapper) {
        super("KAFKA_PUBLISH");
        this.requestParameter = REQUEST_PARAMETER_NAME;
        this.producerManager = kafkaProducerManager;
        this.objectMapper = objectMapper;
        LOGGER.info("KafkaTask initialized.");
    }

    public void start(WorkflowModel workflowModel, TaskModel taskModel, WorkflowExecutor workflowExecutor) {
        long epochMilli = Instant.now().toEpochMilli();
        taskModel.setWorkerId(Utils.getServerId());
        Object obj = taskModel.getInputData().get(this.requestParameter);
        if (Objects.isNull(obj)) {
            markTaskAsFailed(taskModel, MISSING_REQUEST);
            return;
        }
        Input input = (Input) this.objectMapper.convertValue(obj, Input.class);
        if (StringUtils.isBlank(input.getBootStrapServers())) {
            markTaskAsFailed(taskModel, MISSING_BOOT_STRAP_SERVERS);
            return;
        }
        if (StringUtils.isBlank(input.getTopic())) {
            markTaskAsFailed(taskModel, MISSING_KAFKA_TOPIC);
            return;
        }
        if (Objects.isNull(input.getValue())) {
            markTaskAsFailed(taskModel, MISSING_KAFKA_VALUE);
            return;
        }
        try {
            try {
                kafkaPublish(input).get();
                if (isAsyncComplete(taskModel)) {
                    taskModel.setStatus(TaskModel.Status.IN_PROGRESS);
                } else {
                    taskModel.setStatus(TaskModel.Status.COMPLETED);
                }
                LOGGER.debug("Published message {}, Time taken {}", input, Long.valueOf(Instant.now().toEpochMilli() - epochMilli));
            } catch (ExecutionException e) {
                LOGGER.error("Failed to invoke kafka task: {} - execution exception ", taskModel.getTaskId(), e);
                markTaskAsFailed(taskModel, "Failed to invoke kafka task due to: " + e.getMessage());
            }
        } catch (Exception e2) {
            LOGGER.error("Failed to invoke kafka task:{} for input {} - unknown exception", new Object[]{taskModel.getTaskId(), input, e2});
            markTaskAsFailed(taskModel, "Failed to invoke kafka task due to: " + e2.getMessage());
        }
    }

    private void markTaskAsFailed(TaskModel taskModel, String str) {
        taskModel.setReasonForIncompletion(str);
        taskModel.setStatus(TaskModel.Status.FAILED);
    }

    private Future<RecordMetadata> kafkaPublish(Input input) throws Exception {
        long epochMilli = Instant.now().toEpochMilli();
        Producer producer = this.producerManager.getProducer(input);
        LOGGER.debug("Time taken getting producer {}", Long.valueOf(Instant.now().toEpochMilli() - epochMilli));
        Future<RecordMetadata> send = producer.send(new ProducerRecord(input.getTopic(), (Integer) null, (Long) null, getKey(input), this.objectMapper.writeValueAsString(input.getValue()), (Iterable) input.getHeaders().entrySet().stream().map(entry -> {
            return new RecordHeader((String) entry.getKey(), String.valueOf(entry.getValue()).getBytes());
        }).collect(Collectors.toList())));
        LOGGER.debug("Time taken publishing {}", Long.valueOf(Instant.now().toEpochMilli() - epochMilli));
        return send;
    }

    @VisibleForTesting
    Object getKey(Input input) {
        String keySerializer = input.getKeySerializer();
        return LongSerializer.class.getCanonicalName().equals(keySerializer) ? Long.valueOf(Long.parseLong(String.valueOf(input.getKey()))) : IntegerSerializer.class.getCanonicalName().equals(keySerializer) ? Integer.valueOf(Integer.parseInt(String.valueOf(input.getKey()))) : String.valueOf(input.getKey());
    }

    public boolean execute(WorkflowModel workflowModel, TaskModel taskModel, WorkflowExecutor workflowExecutor) {
        return false;
    }

    public void cancel(WorkflowModel workflowModel, TaskModel taskModel, WorkflowExecutor workflowExecutor) {
        taskModel.setStatus(TaskModel.Status.CANCELED);
    }

    public boolean isAsync() {
        return true;
    }
}
