package za.co.absa.enceladus.kafka;

import java.util.Properties;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;
import scala.Option;
import scala.Predef$;
import scala.StringContext;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.runtime.BoxesRunTime;
import za.co.absa.abris.avro.AvroSerDe$;
import za.co.absa.enceladus.kafka.files.JobFilesDispatcher$;
import za.co.absa.enceladus.kafka.parameters.KafkaParametersProcessor$;
import za.co.absa.enceladus.kafka.schema.SchemasProcessor$;

/* compiled from: KafkaDispatcher.scala */
/* loaded from: input_file:za/co/absa/enceladus/kafka/KafkaDispatcher$.class */
public final class KafkaDispatcher$ {
    public static final KafkaDispatcher$ MODULE$ = null;
    private final Logger log;
    private final String DEFAULT_SCHEMA_NAME;
    private final String DEFAULT_SCHEMA_NAMESPACE;
    private final String KAFKA_REGISTERED_FORMAT;
    private final List<String> DEFAULT_COLUMNS;

    static {
        new KafkaDispatcher$();
    }

    public Logger log() {
        return this.log;
    }

    private String DEFAULT_SCHEMA_NAME() {
        return this.DEFAULT_SCHEMA_NAME;
    }

    private String DEFAULT_SCHEMA_NAMESPACE() {
        return this.DEFAULT_SCHEMA_NAMESPACE;
    }

    private String KAFKA_REGISTERED_FORMAT() {
        return this.KAFKA_REGISTERED_FORMAT;
    }

    private List<String> DEFAULT_COLUMNS() {
        return this.DEFAULT_COLUMNS;
    }

    public void dispatch(Dataset<Row> dataset, Option<String> option) {
        try {
            throwableDispatch(dataset, option);
        } catch (Throwable th) {
            log().error("Could not dispatch data to Kafka.", th);
        }
    }

    private void throwableDispatch(Dataset<Row> dataset, Option<String> option) {
        if (option.isEmpty()) {
            log().error("No Kafka settings were informed.");
            return;
        }
        if (!hasSchema(dataset)) {
            log().error("Dataframe does not have a valid schema, thus, it is not possible to define the structures of the records.");
            return;
        }
        Properties loadProperties = KafkaParametersProcessor$.MODULE$.loadProperties((String) option.get());
        if (KafkaParametersProcessor$.MODULE$.validate(loadProperties)) {
            sendFiles(loadProperties, dataset);
            List<String> columnsToDispatch = KafkaParametersProcessor$.MODULE$.getColumnsToDispatch(loadProperties);
            log().info(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Sending default fields to Kafka: [", "]"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{DEFAULT_COLUMNS().mkString(",")})));
            log().info(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Sending extra fields to Kafka: [", "]"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{columnsToDispatch.mkString(",")})));
            Dataset select = dataset.filter(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"size(", ") > 0"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{DEFAULT_COLUMNS().head()}))).select((String) DEFAULT_COLUMNS().head(), columnsToDispatch.$colon$colon$colon((List) DEFAULT_COLUMNS().tail()));
            generateAvroSchema(select.schema(), loadProperties);
            KafkaParametersProcessor$.MODULE$.WriterStreamOptions(AvroSerDe$.MODULE$.Serializer(select).toAvro(DEFAULT_SCHEMA_NAME(), DEFAULT_SCHEMA_NAMESPACE()).write().format(KAFKA_REGISTERED_FORMAT())).addOptions(loadProperties).save();
        }
    }

    private boolean hasSchema(Dataset<Row> dataset) {
        return dataset.schema() != null && dataset.schema().nonEmpty();
    }

    private void sendFiles(Properties properties, Dataset<Row> dataset) {
        List<String> jobFilesPaths = KafkaParametersProcessor$.MODULE$.getJobFilesPaths(properties);
        if (!jobFilesPaths.nonEmpty()) {
            log().info("Nothing to send to executors for Kafka operations.");
        } else {
            log().info(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Found ", " files to be sent to executors."})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(jobFilesPaths.size())})));
            JobFilesDispatcher$.MODULE$.sendFiles(jobFilesPaths, dataset.sparkSession());
        }
    }

    private void generateAvroSchema(StructType structType, Properties properties) {
        Option<String> avroSchemaDestination = KafkaParametersProcessor$.MODULE$.getAvroSchemaDestination(properties);
        if (!avroSchemaDestination.isDefined()) {
            log().info("No instructions to generate Avro schemas were found. Skipping this step.");
        } else {
            log().info(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Going to convert between Spark and Avro schemas. Destination will be stored at: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{avroSchemaDestination})));
            SchemasProcessor$.MODULE$.storeAsAvro(structType, (String) avroSchemaDestination.get(), DEFAULT_SCHEMA_NAME(), DEFAULT_SCHEMA_NAMESPACE(), KafkaParametersProcessor$.MODULE$.getTopic(properties));
        }
    }

    private KafkaDispatcher$() {
        MODULE$ = this;
        this.log = LogManager.getLogger(getClass());
        this.DEFAULT_SCHEMA_NAME = "conformance";
        this.DEFAULT_SCHEMA_NAMESPACE = "enceladus";
        this.KAFKA_REGISTERED_FORMAT = "org.apache.spark.sql.kafka010.KafkaSourceProvider";
        this.DEFAULT_COLUMNS = List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"errCol"}));
    }
}
