/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.pulsar.table;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.protobuf.PbFormatOptions;
import org.apache.flink.formats.protobuf.serialize.PbRowDataSerializationSchema;
import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions;
import org.apache.flink.streaming.connectors.pulsar.internal.SchemaUtils;
import org.apache.flink.streaming.connectors.pulsar.table.PulsarDynamicTableSink;
import org.apache.flink.streaming.util.serialization.FlinkSchema;
import org.apache.flink.streaming.util.serialization.PulsarContextAware;
import org.apache.flink.streaming.util.serialization.PulsarSerializationSchema;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;

class DynamicPulsarSerializationSchema
implements PulsarSerializationSchema<RowData>,
PulsarContextAware<RowData> {
    private static final long serialVersionUID = 1L;
    @Nullable
    private final SerializationSchema<RowData> keySerialization;
    private final SerializationSchema<RowData> valueSerialization;
    private final RowData.FieldGetter[] keyFieldGetters;
    private final RowData.FieldGetter[] valueFieldGetters;
    private final boolean hasMetadata;
    private final boolean upsertMode;
    private final int[] metadataPositions;
    private int[] partitions;
    private int parallelInstanceId;
    private int numParallelInstances;
    private DataType valueDataType;
    private String valueFormatType;
    private volatile Schema<RowData> schema;
    private long delayMilliseconds;

    DynamicPulsarSerializationSchema(@Nullable SerializationSchema<RowData> keySerialization, SerializationSchema<RowData> valueSerialization, RowData.FieldGetter[] keyFieldGetters, RowData.FieldGetter[] valueFieldGetters, boolean hasMetadata, int[] metadataPositions, boolean upsertMode, DataType valueDataType, String valueFormatType, long delayMilliseconds) {
        if (upsertMode) {
            Preconditions.checkArgument((keySerialization != null && keyFieldGetters.length > 0 ? 1 : 0) != 0, (Object)"Key must be set in upsert mode for serialization schema.");
        }
        this.keySerialization = keySerialization;
        this.valueSerialization = valueSerialization;
        this.keyFieldGetters = keyFieldGetters;
        this.valueFieldGetters = valueFieldGetters;
        this.hasMetadata = hasMetadata;
        this.metadataPositions = metadataPositions;
        this.upsertMode = upsertMode;
        this.valueDataType = valueDataType;
        this.valueFormatType = valueFormatType;
        this.delayMilliseconds = delayMilliseconds;
    }

    @Override
    public void open(SerializationSchema.InitializationContext context) throws Exception {
        if (this.keySerialization != null) {
            this.keySerialization.open(context);
        }
        this.valueSerialization.open(context);
    }

    @Override
    public void serialize(RowData consumedRow, TypedMessageBuilder<RowData> messageBuilder) {
        Long eventTime;
        Map properties;
        if (this.keySerialization == null && !this.hasMetadata) {
            messageBuilder.value(consumedRow);
            return;
        }
        if (this.delayMilliseconds > 0L) {
            messageBuilder.deliverAfter(this.delayMilliseconds, TimeUnit.MILLISECONDS);
        }
        if (this.keySerialization != null) {
            RowData keyRow = DynamicPulsarSerializationSchema.createProjectedRow(consumedRow, RowKind.INSERT, this.keyFieldGetters);
            messageBuilder.keyBytes(this.keySerialization.serialize((Object)keyRow));
        }
        RowKind kind = consumedRow.getRowKind();
        RowData valueRow = DynamicPulsarSerializationSchema.createProjectedRow(consumedRow, kind, this.valueFieldGetters);
        if (this.upsertMode) {
            if (kind != RowKind.DELETE && kind != RowKind.UPDATE_BEFORE) {
                valueRow.setRowKind(RowKind.INSERT);
                messageBuilder.value(valueRow);
            }
        } else {
            messageBuilder.value(valueRow);
        }
        if ((properties = (Map)this.readMetadata(consumedRow, PulsarDynamicTableSink.WritableMetadata.PROPERTIES)) != null) {
            messageBuilder.properties(properties);
        }
        if ((eventTime = (Long)this.readMetadata(consumedRow, PulsarDynamicTableSink.WritableMetadata.EVENT_TIME)) != null && eventTime >= 0L) {
            messageBuilder.eventTime(eventTime);
        }
    }

    @Override
    public Optional<String> getTargetTopic(RowData element) {
        return Optional.empty();
    }

    private <T> T readMetadata(RowData consumedRow, PulsarDynamicTableSink.WritableMetadata metadata) {
        int pos = this.metadataPositions[metadata.ordinal()];
        if (pos < 0) {
            return null;
        }
        return (T)metadata.converter.read(consumedRow, pos);
    }

    private static RowData createProjectedRow(RowData consumedRow, RowKind kind, RowData.FieldGetter[] fieldGetters) {
        int arity = fieldGetters.length;
        GenericRowData genericRowData = new GenericRowData(kind, arity);
        for (int fieldPos = 0; fieldPos < arity; ++fieldPos) {
            genericRowData.setField(fieldPos, fieldGetters[fieldPos].getFieldOrNull(consumedRow));
        }
        return genericRowData;
    }

    public TypeInformation<RowData> getProducedType() {
        RowType rowType = (RowType)this.valueDataType.getLogicalType();
        return InternalTypeInfo.of((RowType)rowType);
    }

    @Override
    public void setParallelInstanceId(int parallelInstanceId) {
        this.parallelInstanceId = parallelInstanceId;
    }

    @Override
    public void setNumParallelInstances(int numParallelInstances) {
        this.numParallelInstances = numParallelInstances;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Schema<RowData> getSchema() {
        if (this.schema == null) {
            DynamicPulsarSerializationSchema dynamicPulsarSerializationSchema = this;
            synchronized (dynamicPulsarSerializationSchema) {
                if (this.schema == null) {
                    this.schema = this.buildSchema();
                }
            }
        }
        return this.schema;
    }

    private FlinkSchema<RowData> buildSchema() {
        if (StringUtils.isBlank(this.valueFormatType)) {
            return new FlinkSchema<RowData>(Schema.BYTES.getSchemaInfo(), this.valueSerialization, null);
        }
        HashMap<String, String> options = new HashMap<String, String>();
        this.hackPbSerializationSchema(options);
        SchemaInfo schemaInfo = SchemaUtils.tableSchemaToSchemaInfo(this.valueFormatType, this.valueDataType, options);
        return new FlinkSchema<RowData>(schemaInfo, this.valueSerialization, null);
    }

    private void hackPbSerializationSchema(Map<String, String> options) {
        if (this.valueSerialization instanceof PbRowDataSerializationSchema) {
            try {
                String messageClassName = (String)FieldUtils.readDeclaredField(this.valueSerialization, (String)"messageClassName", (boolean)true);
                options.put(PbFormatOptions.MESSAGE_CLASS_NAME.key(), messageClassName);
            }
            catch (IllegalAccessException e) {
                e.printStackTrace();
            }
        }
    }

    static interface MetadataConverter
    extends Serializable {
        public Object read(RowData var1, int var2);
    }
}

