package cn.tenmg.cdc.log.connectors.mysql.source.reader;

import cn.tenmg.cdc.log.connectors.mysql.source.metrics.MySqlSourceReaderMetrics;
import cn.tenmg.cdc.log.connectors.mysql.source.offset.BinlogOffset;
import cn.tenmg.cdc.log.connectors.mysql.source.split.MySqlSplitState;
import cn.tenmg.cdc.log.connectors.mysql.source.utils.RecordUtils;
import cn.tenmg.cdc.log.debezium.DebeziumDeserializationSchema;
import cn.tenmg.cdc.log.debezium.history.FlinkJsonTableChangeSerializer;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.TableChanges;
import java.util.Iterator;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/tenmg/cdc/log/connectors/mysql/source/reader/MySqlRecordEmitter.class */
public final class MySqlRecordEmitter<T> implements RecordEmitter<SourceRecord, T, MySqlSplitState> {
    private static final Logger LOG = LoggerFactory.getLogger(MySqlRecordEmitter.class);
    private static final FlinkJsonTableChangeSerializer TABLE_CHANGE_SERIALIZER = new FlinkJsonTableChangeSerializer();
    private final DebeziumDeserializationSchema<T> debeziumDeserializationSchema;
    private final MySqlSourceReaderMetrics sourceReaderMetrics;
    private final boolean includeSchemaChanges;
    private final OutputCollector<T> outputCollector = new OutputCollector<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cn/tenmg/cdc/log/connectors/mysql/source/reader/MySqlRecordEmitter$OutputCollector.class */
    public static class OutputCollector<T> implements Collector<T> {
        private SourceOutput<T> output;

        private OutputCollector() {
        }

        public void collect(T t) {
            this.output.collect(t);
        }

        public void close() {
        }
    }

    public MySqlRecordEmitter(DebeziumDeserializationSchema<T> debeziumDeserializationSchema, MySqlSourceReaderMetrics mySqlSourceReaderMetrics, boolean z) {
        this.debeziumDeserializationSchema = debeziumDeserializationSchema;
        this.sourceReaderMetrics = mySqlSourceReaderMetrics;
        this.includeSchemaChanges = z;
    }

    public void emitRecord(SourceRecord sourceRecord, SourceOutput<T> sourceOutput, MySqlSplitState mySqlSplitState) throws Exception {
        if (RecordUtils.isWatermarkEvent(sourceRecord)) {
            BinlogOffset watermark = RecordUtils.getWatermark(sourceRecord);
            if (RecordUtils.isHighWatermarkEvent(sourceRecord) && mySqlSplitState.isSnapshotSplitState()) {
                mySqlSplitState.asSnapshotSplitState().setHighWatermark(watermark);
                return;
            }
            return;
        }
        if (!RecordUtils.isSchemaChangeEvent(sourceRecord) || !mySqlSplitState.isBinlogSplitState()) {
            if (RecordUtils.isDataChangeRecord(sourceRecord)) {
                updateStartingOffsetForSplit(mySqlSplitState, sourceRecord);
                reportMetrics(sourceRecord);
                emitElement(sourceRecord, sourceOutput);
                return;
            } else if (RecordUtils.isHeartbeatEvent(sourceRecord)) {
                updateStartingOffsetForSplit(mySqlSplitState, sourceRecord);
                return;
            } else {
                LOG.info("Meet unknown element {}, just skip.", sourceRecord);
                return;
            }
        }
        Iterator<TableChanges.TableChange> it = TABLE_CHANGE_SERIALIZER.deserialize(RecordUtils.getHistoryRecord(sourceRecord).document().getArray(HistoryRecord.Fields.TABLE_CHANGES), true).iterator();
        while (it.hasNext()) {
            TableChanges.TableChange next = it.next();
            mySqlSplitState.asBinlogSplitState().recordSchema(next.getId(), next);
        }
        if (this.includeSchemaChanges) {
            mySqlSplitState.asBinlogSplitState().setStartingOffset(RecordUtils.getBinlogPosition(sourceRecord));
            emitElement(sourceRecord, sourceOutput);
        }
    }

    private void updateStartingOffsetForSplit(MySqlSplitState mySqlSplitState, SourceRecord sourceRecord) {
        if (mySqlSplitState.isBinlogSplitState()) {
            mySqlSplitState.asBinlogSplitState().setStartingOffset(RecordUtils.getBinlogPosition(sourceRecord));
        }
    }

    private void emitElement(SourceRecord sourceRecord, SourceOutput<T> sourceOutput) throws Exception {
        ((OutputCollector) this.outputCollector).output = sourceOutput;
        this.debeziumDeserializationSchema.deserialize(sourceRecord, this.outputCollector);
    }

    private void reportMetrics(SourceRecord sourceRecord) {
        long currentTimeMillis = System.currentTimeMillis();
        this.sourceReaderMetrics.recordProcessTime(currentTimeMillis);
        Long messageTimestamp = RecordUtils.getMessageTimestamp(sourceRecord);
        if (messageTimestamp == null || messageTimestamp.longValue() <= 0) {
            return;
        }
        Long fetchTimestamp = RecordUtils.getFetchTimestamp(sourceRecord);
        if (fetchTimestamp != null && fetchTimestamp.longValue() >= messageTimestamp.longValue()) {
            this.sourceReaderMetrics.recordFetchDelay(fetchTimestamp.longValue() - messageTimestamp.longValue());
        }
        this.sourceReaderMetrics.recordEmitDelay(currentTimeMillis - messageTimestamp.longValue());
    }
}
