package io.aiven.kafka.connect.common.output;

import io.aiven.kafka.connect.common.config.OutputField;
import io.confluent.connect.avro.AvroData;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/aiven/kafka/connect/common/output/SinkSchemaBuilder.class */
public abstract class SinkSchemaBuilder {
    private final Logger logger;
    private final Collection<OutputField> fields;
    private final AvroData avroData;
    private final boolean envelopeEnabled;

    public SinkSchemaBuilder(Collection<OutputField> collection, AvroData avroData, boolean z) {
        this.logger = LoggerFactory.getLogger(SinkSchemaBuilder.class);
        this.fields = collection;
        this.avroData = avroData;
        this.envelopeEnabled = z;
    }

    public SinkSchemaBuilder(Collection<OutputField> collection, AvroData avroData) {
        this.logger = LoggerFactory.getLogger(SinkSchemaBuilder.class);
        this.fields = collection;
        this.avroData = avroData;
        this.envelopeEnabled = true;
    }

    protected abstract String getNamespace();

    public Schema buildSchema(SinkRecord sinkRecord) {
        Objects.requireNonNull(sinkRecord, "record");
        if (Objects.isNull(sinkRecord.keySchema())) {
            throw new DataException("Record key without schema");
        }
        if (Objects.isNull(sinkRecord.valueSchema())) {
            throw new DataException("Record value without schema");
        }
        this.logger.debug("Create schema for record");
        this.logger.debug("Record Key Schema {}", sinkRecord.keySchema());
        this.logger.debug("Record Value Schema {}", sinkRecord.valueSchema());
        return avroSchemaFor(sinkRecord);
    }

    protected Schema avroSchemaFor(SinkRecord sinkRecord) {
        if (!this.envelopeEnabled) {
            return tryUnwrapEnvelope(sinkRecord);
        }
        SchemaBuilder.FieldAssembler fields = SchemaBuilder.builder(getNamespace()).record("connector_records").fields();
        for (OutputField outputField : this.fields) {
            fields.name(outputField.getFieldType().name).type(outputFieldSchema(outputField, sinkRecord)).noDefault();
        }
        return (Schema) fields.endRecord();
    }

    private Schema tryUnwrapEnvelope(SinkRecord sinkRecord) {
        OutputField next = getFields().iterator().next();
        Schema outputFieldSchema = outputFieldSchema(next, sinkRecord);
        if (outputFieldSchema.getType() != Schema.Type.MAP) {
            return outputFieldSchema.getType() == Schema.Type.RECORD ? getAvroData().fromConnectSchema(sinkRecord.valueSchema()) : (Schema) SchemaBuilder.builder(getNamespace()).record("connector_records").fields().name(next.getFieldType().name).type(outputFieldSchema).noDefault().endRecord();
        }
        Map map = (Map) sinkRecord.value();
        SchemaBuilder.FieldAssembler fields = SchemaBuilder.builder(getNamespace()).record("connector_records").fields();
        Iterator it = map.entrySet().iterator();
        while (it.hasNext()) {
            fields.name((String) ((Map.Entry) it.next()).getKey()).type(outputFieldSchema.getValueType()).noDefault();
        }
        return (Schema) fields.endRecord();
    }

    private Schema headersSchema(SinkRecord sinkRecord) {
        if (sinkRecord.headers().isEmpty()) {
            return (Schema) SchemaBuilder.builder().nullType();
        }
        org.apache.kafka.connect.data.Schema schema = null;
        for (Header header : sinkRecord.headers()) {
            if (Objects.isNull(header.schema())) {
                throw new DataException("Header " + header + " without schema");
            }
            if (Objects.isNull(schema)) {
                schema = header.schema();
            } else if (schema.type() != header.schema().type()) {
                throw new DataException("Header schema " + header.schema() + " is not the same as " + schema);
            }
        }
        return (Schema) SchemaBuilder.map().values(this.avroData.fromConnectSchema(schema));
    }

    protected Schema outputFieldSchema(OutputField outputField, SinkRecord sinkRecord) {
        switch (outputField.getFieldType()) {
            case KEY:
                return this.avroData.fromConnectSchema(sinkRecord.keySchema());
            case OFFSET:
            case TIMESTAMP:
                return (Schema) SchemaBuilder.builder().longType();
            case VALUE:
                return this.avroData.fromConnectSchema(sinkRecord.valueSchema());
            case HEADERS:
                return headersSchema(sinkRecord);
            default:
                throw new ConnectException("Unknown field type " + outputField);
        }
    }

    public Collection<OutputField> getFields() {
        return this.fields;
    }

    public AvroData getAvroData() {
        return this.avroData;
    }

    public boolean isEnvelopeEnabled() {
        return this.envelopeEnabled;
    }
}
