package io.debezium.transforms.outbox;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.data.Envelope;
import io.debezium.schema.FieldNameSelector;
import io.debezium.time.MicroTimestamp;
import io.debezium.time.NanoTimestamp;
import io.debezium.time.Timestamp;
import io.debezium.transforms.ConnectRecordUtil;
import io.debezium.transforms.SmtManager;
import io.debezium.transforms.outbox.EventRouterConfigDefinition;
import io.debezium.transforms.tracing.ActivateTracingSpan;
import io.debezium.util.BoundedConcurrentHashMap;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.transforms.ExtractField;
import org.apache.kafka.connect.transforms.RegexRouter;
import org.apache.kafka.connect.transforms.util.Requirements;
import org.codehaus.plexus.util.SelectorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/transforms/outbox/EventRouterDelegate.class */
public class EventRouterDelegate<R extends ConnectRecord<R>> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) EventRouterDelegate.class);
    private static final String ENVELOPE_PAYLOAD = "payload";
    private ExtractField<R> afterExtractor;
    private EventRouterConfigDefinition.InvalidOperationBehavior invalidOperationBehavior;
    private String fieldSchemaVersion;
    private boolean routeTombstoneOnEmptyPayload;
    private List<EventRouterConfigDefinition.AdditionalField> additionalFields;
    private boolean additionalFieldsErrorOnMissing;
    private BoundedConcurrentHashMap<Schema, Schema> payloadSchemaCache;
    private boolean expandJsonPayload;
    private JsonSchemaData jsonSchemaData;
    private ObjectMapper objectMapper;
    private SmtManager<R> smtManager;
    private final RegexRouter<R> regexRouter = new RegexRouter<>();
    private final ActivateTracingSpan<R> tracingSmt = new ActivateTracingSpan<>();
    private final Map<String, EventRouterConfigurationProvider> configurationProviders = new HashMap();
    private final DefaultConfigurationProvider defaultConfigurationProvider = new DefaultConfigurationProvider();
    private final Map<Integer, Schema> versionedValueSchema = new HashMap();
    private boolean onlyHeadersInOutputMessage = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/debezium/transforms/outbox/EventRouterDelegate$DefaultConfigurationProvider.class */
    public static class DefaultConfigurationProvider implements EventRouterConfigurationProvider {
        private String fieldEventId;
        private String fieldEventKey;
        private String fieldEventTimestamp;
        private String fieldPayload;
        private String routeByField;

        private DefaultConfigurationProvider() {
        }

        @Override // io.debezium.transforms.outbox.EventRouterConfigurationProvider
        public String getName() {
            return "default";
        }

        @Override // io.debezium.transforms.outbox.EventRouterConfigurationProvider
        public void configure(Map<String, ?> map) {
            Configuration from = Configuration.from(map);
            this.fieldEventId = from.getString(EventRouterConfigDefinition.FIELD_EVENT_ID);
            this.fieldEventKey = from.getString(EventRouterConfigDefinition.FIELD_EVENT_KEY);
            this.fieldEventTimestamp = from.getString(EventRouterConfigDefinition.FIELD_EVENT_TIMESTAMP);
            this.fieldPayload = from.getString(EventRouterConfigDefinition.FIELD_PAYLOAD);
            this.routeByField = from.getString(EventRouterConfigDefinition.ROUTE_BY_FIELD);
        }

        @Override // io.debezium.transforms.outbox.EventRouterConfigurationProvider
        public String getFieldEventId() {
            return this.fieldEventId;
        }

        @Override // io.debezium.transforms.outbox.EventRouterConfigurationProvider
        public String getFieldEventKey() {
            return this.fieldEventKey;
        }

        @Override // io.debezium.transforms.outbox.EventRouterConfigurationProvider
        public String getFieldEventTimestamp() {
            return this.fieldEventTimestamp;
        }

        @Override // io.debezium.transforms.outbox.EventRouterConfigurationProvider
        public String getFieldPayload() {
            return this.fieldPayload;
        }

        @Override // io.debezium.transforms.outbox.EventRouterConfigurationProvider
        public String getRouteByField() {
            return this.routeByField;
        }
    }

    @FunctionalInterface
    /* loaded from: input_file:io/debezium/transforms/outbox/EventRouterDelegate$RecordConverter.class */
    public interface RecordConverter<R> {
        R convert(R r);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public R apply(R r, RecordConverter<R> recordConverter) {
        Object obj;
        Schema schema;
        if (r.value() == null) {
            LOGGER.debug("Tombstone message ignored. Message key: \"{}\"", r.key());
            return null;
        }
        if (!this.smtManager.isValidEnvelope(r)) {
            return r;
        }
        Struct requireStruct = Requirements.requireStruct(r.value(), "Detect Debezium Operation");
        String string = requireStruct.getString(Envelope.FieldName.OPERATION);
        if (string.equals(Envelope.Operation.DELETE.code())) {
            LOGGER.debug("Delete message {} ignored", r.key());
            return null;
        }
        if (string.equals(Envelope.Operation.UPDATE.code())) {
            handleUnexpectedOperation(r);
            return null;
        }
        R convert = recordConverter.convert(r);
        if (ActivateTracingSpan.isOpenTelemetryAvailable()) {
            this.tracingSmt.apply(convert);
        }
        R apply = this.afterExtractor.apply(convert);
        Struct requireStruct2 = Requirements.requireStruct(apply.value(), "Read Outbox Event");
        Schema valueSchema = apply.valueSchema();
        EventRouterConfigurationProvider lookupConfigurationProvider = lookupConfigurationProvider(Requirements.requireStruct(convert.value(), "Outbox converter"));
        String fieldEventId = lookupConfigurationProvider.getFieldEventId();
        String fieldEventKey = lookupConfigurationProvider.getFieldEventKey();
        String fieldPayload = lookupConfigurationProvider.getFieldPayload();
        String fieldEventTimestamp = lookupConfigurationProvider.getFieldEventTimestamp();
        String routeByField = lookupConfigurationProvider.getRouteByField();
        Field field = valueSchema.field(fieldPayload);
        if (field == null) {
            throw new ConnectException(String.format("Unable to find payload field %s in event", fieldPayload));
        }
        Schema schema2 = field.schema();
        Long eventTimestampMs = getEventTimestampMs(fieldEventTimestamp, requireStruct, requireStruct2);
        Object obj2 = requireStruct2.get(fieldEventId);
        Object obj3 = requireStruct2.get(fieldPayload);
        Field field2 = valueSchema.field(fieldEventId);
        if (field2 == null) {
            throw new ConnectException(String.format("Unable to find event-id field %s in event", fieldEventId));
        }
        Headers headers = convert.headers();
        headers.add("id", obj2, field2.schema());
        if (this.expandJsonPayload) {
            if (obj3 instanceof String) {
                try {
                    JsonNode parseJsonPayload = parseJsonPayload((String) obj3);
                    schema2 = this.jsonSchemaData.toConnectSchema(fieldPayload, parseJsonPayload);
                    obj3 = this.jsonSchemaData.toConnectData(parseJsonPayload, schema2);
                } catch (Exception e) {
                    LOGGER.warn("JSON expansion failed", (Throwable) e);
                }
            } else {
                LOGGER.warn("Expand JSON payload is turned on but payload is not a string in {}", convert.key());
            }
        }
        Schema valueSchema2 = this.onlyHeadersInOutputMessage ? null : this.fieldSchemaVersion == null ? getValueSchema(schema2, valueSchema, requireStruct2.getString(routeByField)) : getValueSchema(schema2, valueSchema, requireStruct2.getInt32(this.fieldSchemaVersion), requireStruct2.getString(routeByField));
        Struct put = this.onlyHeadersInOutputMessage ? null : new Struct(valueSchema2).put("payload", obj3);
        AtomicReference atomicReference = new AtomicReference();
        this.additionalFields.forEach(additionalField -> {
            if (this.additionalFieldsErrorOnMissing || requireStruct2.schema().field(additionalField.getField()) != null) {
                switch (additionalField.getPlacement()) {
                    case ENVELOPE:
                        put.put(additionalField.getAlias(), requireStruct2.get(additionalField.getField()));
                        return;
                    case HEADER:
                        headers.add(additionalField.getAlias(), requireStruct2.get(additionalField.getField()), valueSchema.field(additionalField.getField()).schema());
                        return;
                    case PARTITION:
                        atomicReference.set(requireStruct2.getInt32(additionalField.getField()));
                        return;
                    default:
                        return;
                }
            }
        });
        if ((obj3 == null || obj3.toString().trim().isEmpty()) && this.routeTombstoneOnEmptyPayload) {
            obj = null;
            schema = null;
        } else if (this.onlyHeadersInOutputMessage) {
            obj = obj3;
            schema = schema2;
        } else {
            obj = put;
            schema = valueSchema2;
        }
        Object defineRecordKey = defineRecordKey(fieldEventKey, requireStruct2);
        ConnectRecord newRecord = convert.newRecord(requireStruct2.getString(routeByField), (Integer) atomicReference.get(), defineRecordKeySchema(fieldEventKey, valueSchema), defineRecordKey, schema, obj, eventTimestampMs, headers);
        LOGGER.debug("Message emitted with event id: \"{}\", event key: \"{}\"", obj2, defineRecordKey);
        return (R) this.regexRouter.apply(newRecord);
    }

    private Long getEventTimestampMs(String str, Struct struct, Struct struct2) {
        if (str == null) {
            return struct.getInt64("ts_ms");
        }
        Field field = struct2.schema().field(str);
        if (field == null) {
            throw new ConnectException(String.format("Unable to find timestamp field %s in event", str));
        }
        Long int64 = struct2.getInt64(str);
        if (int64 == null) {
            return struct.getInt64("ts_ms");
        }
        String name2 = field.schema().name();
        if (name2 == null) {
            throw new ConnectException(String.format("Unsupported field type %s (without logical schema name) for event timestamp", field.schema().type()));
        }
        boolean z = -1;
        switch (name2.hashCode()) {
            case -1830290952:
                if (name2.equals(MicroTimestamp.SCHEMA_NAME)) {
                    z = true;
                    break;
                }
                break;
            case -1378581316:
                if (name2.equals(NanoTimestamp.SCHEMA_NAME)) {
                    z = 2;
                    break;
                }
                break;
            case -517856752:
                if (name2.equals(Timestamp.SCHEMA_NAME)) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return int64;
            case true:
                return Long.valueOf(int64.longValue() / 1000);
            case true:
                return Long.valueOf(int64.longValue() / 1000000);
            default:
                throw new ConnectException(String.format("Unsupported field type %s for event timestamp", name2));
        }
    }

    private Schema defineRecordKeySchema(String str, Schema schema) {
        Field field = null;
        if (str != null) {
            field = schema.field(str);
        }
        return field != null ? field.schema() : Schema.STRING_SCHEMA;
    }

    private Object defineRecordKey(String str, Struct struct) {
        if (str != null) {
            return struct.get(str);
        }
        return null;
    }

    private void handleUnexpectedOperation(R r) {
        switch (this.invalidOperationBehavior) {
            case SKIP_AND_WARN:
                LOGGER.warn("Unexpected update message received {} and ignored", r.key());
                return;
            case SKIP_AND_ERROR:
                LOGGER.error("Unexpected update message received {} and ignored", r.key());
                return;
            case FATAL:
                throw new IllegalStateException(String.format("Unexpected update message received %s, fail.", r.key()));
            default:
                return;
        }
    }

    private JsonNode parseJsonPayload(String str) throws Exception {
        if (str.startsWith("{") || str.startsWith(SelectorUtils.PATTERN_HANDLER_PREFIX)) {
            return this.objectMapper.readTree(str);
        }
        throw new Exception("Unable to parse payload starting with '" + str.charAt(0) + "'");
    }

    public ConfigDef config() {
        return EventRouterConfigDefinition.configDef();
    }

    public void close() {
        if (ActivateTracingSpan.isOpenTelemetryAvailable()) {
            this.tracingSmt.close();
        }
    }

    public void configure(Map<String, ?> map) {
        if (ActivateTracingSpan.isOpenTelemetryAvailable()) {
            this.tracingSmt.configure(map);
            if (!map.containsKey(ActivateTracingSpan.TRACING_CONTEXT_FIELD_REQUIRED.name())) {
                this.tracingSmt.setRequireContextField(true);
            }
        }
        Configuration from = Configuration.from(map);
        this.smtManager = new SmtManager<>(from);
        this.smtManager.validate(from, io.debezium.config.Field.setOf(EventRouterConfigDefinition.CONFIG_FIELDS));
        this.invalidOperationBehavior = EventRouterConfigDefinition.InvalidOperationBehavior.parse(from.getString(EventRouterConfigDefinition.OPERATION_INVALID_BEHAVIOR));
        EventRouterConfigDefinition.JsonPayloadNullFieldBehavior parse = EventRouterConfigDefinition.JsonPayloadNullFieldBehavior.parse(from.getString(EventRouterConfigDefinition.TABLE_JSON_PAYLOAD_NULL_BEHAVIOR));
        this.expandJsonPayload = from.getBoolean(EventRouterConfigDefinition.EXPAND_JSON_PAYLOAD);
        if (this.expandJsonPayload) {
            this.objectMapper = new ObjectMapper();
            this.jsonSchemaData = new JsonSchemaData(parse, FieldNameSelector.defaultNonRelationalSelector(CommonConnectorConfig.FieldNameAdjustmentMode.parse(from.getString(CommonConnectorConfig.FIELD_NAME_ADJUSTMENT_MODE)).createAdjuster()));
        }
        this.defaultConfigurationProvider.configure(map);
        Iterator it = ServiceLoader.load(EventRouterConfigurationProvider.class).iterator();
        while (it.hasNext()) {
            EventRouterConfigurationProvider eventRouterConfigurationProvider = (EventRouterConfigurationProvider) it.next();
            this.configurationProviders.put(eventRouterConfigurationProvider.getName(), eventRouterConfigurationProvider);
            eventRouterConfigurationProvider.configure(map);
        }
        this.fieldSchemaVersion = from.getString(EventRouterConfigDefinition.FIELD_SCHEMA_VERSION);
        this.routeTombstoneOnEmptyPayload = from.getBoolean(EventRouterConfigDefinition.ROUTE_TOMBSTONE_ON_EMPTY_PAYLOAD);
        HashMap hashMap = new HashMap();
        hashMap.put(RegexRouter.ConfigName.REGEX, from.getString(EventRouterConfigDefinition.ROUTE_TOPIC_REGEX));
        hashMap.put(RegexRouter.ConfigName.REPLACEMENT, from.getString(EventRouterConfigDefinition.ROUTE_TOPIC_REPLACEMENT));
        this.regexRouter.configure(hashMap);
        this.afterExtractor = ConnectRecordUtil.extractAfterDelegate();
        this.additionalFields = EventRouterConfigDefinition.parseAdditionalFieldsConfig(from);
        this.additionalFieldsErrorOnMissing = from.getBoolean(EventRouterConfigDefinition.FIELDS_ADDITIONAL_ERROR_ON_MISSING);
        this.onlyHeadersInOutputMessage = this.additionalFields.stream().noneMatch(additionalField -> {
            return additionalField.getPlacement() == EventRouterConfigDefinition.AdditionalFieldPlacement.ENVELOPE;
        });
        this.payloadSchemaCache = new BoundedConcurrentHashMap<>(10000, 10, BoundedConcurrentHashMap.Eviction.LRU);
    }

    private Schema getValueSchema(Schema schema, Schema schema2, String str) {
        Schema schema3 = this.payloadSchemaCache.get(schema);
        if (schema3 == null) {
            schema3 = getSchemaBuilder(schema, schema2, str).build();
            this.payloadSchemaCache.put(schema, schema3);
        }
        return schema3;
    }

    private Schema getValueSchema(Schema schema, Schema schema2, Integer num, String str) {
        if (!this.versionedValueSchema.containsKey(num)) {
            this.versionedValueSchema.put(num, getSchemaBuilder(schema, schema2, str).version(num).build());
        }
        return this.versionedValueSchema.get(num);
    }

    private SchemaBuilder getSchemaBuilder(Schema schema, Schema schema2, String str) {
        SchemaBuilder name2 = SchemaBuilder.struct().name(getSchemaName(schema2, str));
        name2.field("payload", schema);
        this.additionalFields.forEach(additionalField -> {
            if (additionalField.getPlacement() == EventRouterConfigDefinition.AdditionalFieldPlacement.ENVELOPE) {
                name2.field(additionalField.getAlias(), schema2.field(additionalField.getField()).schema());
            }
        });
        return name2;
    }

    private String getSchemaName(Schema schema, String str) {
        String str2;
        String name2 = schema.name();
        if (name2 != null) {
            int lastIndexOf = name2.lastIndexOf(46);
            str2 = lastIndexOf != -1 ? name2.substring(0, lastIndexOf + 1) + str + "." + name2.substring(lastIndexOf + 1) : str + "." + name2;
        } else {
            str2 = str;
        }
        return str2;
    }

    private EventRouterConfigurationProvider lookupConfigurationProvider(Struct struct) {
        if (!this.configurationProviders.isEmpty()) {
            EventRouterConfigurationProvider eventRouterConfigurationProvider = this.configurationProviders.get(struct.getStruct("source").getString("connector"));
            if (eventRouterConfigurationProvider != null) {
                return eventRouterConfigurationProvider;
            }
        }
        return this.defaultConfigurationProvider;
    }
}
