package cn.tenmg.cdc.log.connectors.mysql;

import cn.tenmg.cdc.log.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import io.debezium.config.CommonConnectorConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/tenmg/cdc/log/connectors/mysql/SeekBinlogToTimestampFilter.class */
public class SeekBinlogToTimestampFilter<T> implements DebeziumDeserializationSchema<T> {
    private static final long serialVersionUID = -4450118969976653497L;
    protected static final Logger LOG = LoggerFactory.getLogger(SeekBinlogToTimestampFilter.class);
    private final long startupTimestampMillis;
    private final DebeziumDeserializationSchema<T> serializer;
    private transient boolean find = false;
    private transient long filtered = 0;

    public SeekBinlogToTimestampFilter(long j, DebeziumDeserializationSchema<T> debeziumDeserializationSchema) {
        this.startupTimestampMillis = j;
        this.serializer = debeziumDeserializationSchema;
    }

    @Override // cn.tenmg.cdc.log.debezium.DebeziumDeserializationSchema
    public void deserialize(SourceRecord sourceRecord, Collector<T> collector) throws Exception {
        if (this.find) {
            this.serializer.deserialize(sourceRecord, collector);
            return;
        }
        if (this.filtered == 0) {
            LOG.info("Begin to seek binlog to the specific timestamp {}.", Long.valueOf(this.startupTimestampMillis));
        }
        Long int64 = ((Struct) sourceRecord.value()).getStruct("source").getInt64("ts_ms");
        if (int64 != null && int64.longValue() >= this.startupTimestampMillis) {
            this.serializer.deserialize(sourceRecord, collector);
            this.find = true;
            LOG.info("Successfully seek to the specific timestamp {} with filtered {} change events.", Long.valueOf(this.startupTimestampMillis), Long.valueOf(this.filtered));
        } else {
            this.filtered++;
            if (this.filtered % CommonConnectorConfig.DEFAULT_RETRIABLE_RESTART_WAIT == 0) {
                LOG.info("Seeking binlog to specific timestamp with filtered {} change events.", Long.valueOf(this.filtered));
            }
        }
    }

    public TypeInformation<T> getProducedType() {
        return this.serializer.getProducedType();
    }
}
