package org.apache.flink.streaming.connectors.pulsar.table;

import java.io.Serializable;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.protobuf.PbFormatOptions;
import org.apache.flink.formats.protobuf.serialize.PbRowDataSerializationSchema;
import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions;
import org.apache.flink.streaming.connectors.pulsar.internal.SchemaUtils;
import org.apache.flink.streaming.connectors.pulsar.table.PulsarDynamicTableSink;
import org.apache.flink.streaming.util.serialization.FlinkSchema;
import org.apache.flink.streaming.util.serialization.PulsarContextAware;
import org.apache.flink.streaming.util.serialization.PulsarSerializationSchema;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.RowKind;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/streaming/connectors/pulsar/table/DynamicPulsarSerializationSchema.class */
public class DynamicPulsarSerializationSchema implements PulsarSerializationSchema<RowData>, PulsarContextAware<RowData> {
    private static final long serialVersionUID = 1;

    @Nullable
    private final SerializationSchema<RowData> keySerialization;
    private final SerializationSchema<RowData> valueSerialization;
    private final RowData.FieldGetter[] keyFieldGetters;
    private final RowData.FieldGetter[] valueFieldGetters;
    private final boolean hasMetadata;
    private final boolean upsertMode;
    private final int[] metadataPositions;
    private int[] partitions;
    private int parallelInstanceId;
    private int numParallelInstances;
    private DataType valueDataType;
    private String valueFormatType;
    private volatile Schema<RowData> schema;
    private long delayMilliseconds;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/pulsar/table/DynamicPulsarSerializationSchema$MetadataConverter.class */
    public interface MetadataConverter extends Serializable {
        Object read(RowData rowData, int i);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DynamicPulsarSerializationSchema(@Nullable SerializationSchema<RowData> serializationSchema, SerializationSchema<RowData> serializationSchema2, RowData.FieldGetter[] fieldGetterArr, RowData.FieldGetter[] fieldGetterArr2, boolean z, int[] iArr, boolean z2, DataType dataType, String str, long j) {
        if (z2) {
            Preconditions.checkArgument(serializationSchema != null && fieldGetterArr.length > 0, "Key must be set in upsert mode for serialization schema.");
        }
        this.keySerialization = serializationSchema;
        this.valueSerialization = serializationSchema2;
        this.keyFieldGetters = fieldGetterArr;
        this.valueFieldGetters = fieldGetterArr2;
        this.hasMetadata = z;
        this.metadataPositions = iArr;
        this.upsertMode = z2;
        this.valueDataType = dataType;
        this.valueFormatType = str;
        this.delayMilliseconds = j;
    }

    @Override // org.apache.flink.streaming.util.serialization.PulsarSerializationSchema
    public void open(SerializationSchema.InitializationContext initializationContext) throws Exception {
        if (this.keySerialization != null) {
            this.keySerialization.open(initializationContext);
        }
        this.valueSerialization.open(initializationContext);
    }

    @Override // org.apache.flink.streaming.util.serialization.PulsarSerializationSchema
    public void serialize(RowData rowData, TypedMessageBuilder<RowData> typedMessageBuilder) {
        if (this.keySerialization == null && !this.hasMetadata) {
            typedMessageBuilder.value(rowData);
            return;
        }
        if (this.delayMilliseconds > 0) {
            typedMessageBuilder.deliverAfter(this.delayMilliseconds, TimeUnit.MILLISECONDS);
        }
        if (this.keySerialization != null) {
            typedMessageBuilder.keyBytes(this.keySerialization.serialize(createProjectedRow(rowData, RowKind.INSERT, this.keyFieldGetters)));
        }
        RowKind rowKind = rowData.getRowKind();
        RowData createProjectedRow = createProjectedRow(rowData, rowKind, this.valueFieldGetters);
        if (!this.upsertMode) {
            typedMessageBuilder.value(createProjectedRow);
        } else if (rowKind != RowKind.DELETE && rowKind != RowKind.UPDATE_BEFORE) {
            createProjectedRow.setRowKind(RowKind.INSERT);
            typedMessageBuilder.value(createProjectedRow);
        }
        Map map = (Map) readMetadata(rowData, PulsarDynamicTableSink.WritableMetadata.PROPERTIES);
        if (map != null) {
            typedMessageBuilder.properties(map);
        }
        Long l = (Long) readMetadata(rowData, PulsarDynamicTableSink.WritableMetadata.EVENT_TIME);
        if (l == null || l.longValue() < 0) {
            return;
        }
        typedMessageBuilder.eventTime(l.longValue());
    }

    @Override // org.apache.flink.streaming.util.serialization.PulsarContextAware
    public Optional<String> getTargetTopic(RowData rowData) {
        return Optional.empty();
    }

    private <T> T readMetadata(RowData rowData, PulsarDynamicTableSink.WritableMetadata writableMetadata) {
        int i = this.metadataPositions[writableMetadata.ordinal()];
        if (i < 0) {
            return null;
        }
        return (T) writableMetadata.converter.read(rowData, i);
    }

    private static RowData createProjectedRow(RowData rowData, RowKind rowKind, RowData.FieldGetter[] fieldGetterArr) {
        int length = fieldGetterArr.length;
        GenericRowData genericRowData = new GenericRowData(rowKind, length);
        for (int i = 0; i < length; i++) {
            genericRowData.setField(i, fieldGetterArr[i].getFieldOrNull(rowData));
        }
        return genericRowData;
    }

    public TypeInformation<RowData> getProducedType() {
        return InternalTypeInfo.of(this.valueDataType.getLogicalType());
    }

    @Override // org.apache.flink.streaming.util.serialization.PulsarContextAware
    public void setParallelInstanceId(int i) {
        this.parallelInstanceId = i;
    }

    @Override // org.apache.flink.streaming.util.serialization.PulsarContextAware
    public void setNumParallelInstances(int i) {
        this.numParallelInstances = i;
    }

    @Override // org.apache.flink.streaming.util.serialization.PulsarContextAware
    public Schema<RowData> getSchema() {
        if (this.schema == null) {
            synchronized (this) {
                if (this.schema == null) {
                    this.schema = buildSchema();
                }
            }
        }
        return this.schema;
    }

    private FlinkSchema<RowData> buildSchema() {
        if (StringUtils.isBlank(this.valueFormatType)) {
            return new FlinkSchema<>(Schema.BYTES.getSchemaInfo(), this.valueSerialization, null);
        }
        Configuration configuration = new Configuration();
        hackPbSerializationSchema(configuration);
        return new FlinkSchema<>(SchemaUtils.tableSchemaToSchemaInfo(this.valueFormatType, this.valueDataType, configuration), this.valueSerialization, null);
    }

    private void hackPbSerializationSchema(Configuration configuration) {
        if (this.valueSerialization instanceof PbRowDataSerializationSchema) {
            try {
                configuration.set(PbFormatOptions.MESSAGE_CLASS_NAME, (String) FieldUtils.readDeclaredField(this.valueSerialization, "messageClassName", true));
            } catch (IllegalAccessException e) {
                e.printStackTrace();
            }
        }
    }
}
