package cn.tenmg.flink.jobs.service;

import cn.tenmg.flink.jobs.StreamService;
import cn.tenmg.flink.jobs.model.KafkaDBMessage;
import cn.tenmg.flink.jobs.model.Params;
import cn.tenmg.flink.jobs.serialization.KafkaDBMessageDeserializationSchema;
import java.util.Arrays;
import java.util.Properties;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

/* loaded from: input_file:cn/tenmg/flink/jobs/service/KafkaDBMessageService.class */
public abstract class KafkaDBMessageService implements StreamService {
    private static final long serialVersionUID = 967105095937872674L;

    protected abstract String getSubscribe();

    protected abstract String getGroupIdPrefix();

    protected abstract String getStartingOffset();

    protected abstract Properties getKafkaProperties();

    protected abstract FilterFunction<KafkaDBMessage> getFilter();

    protected abstract DataStream<KafkaDBMessage> getBatchDataStream(StreamExecutionEnvironment streamExecutionEnvironment, Params params);

    protected abstract KafkaDBMessageDeserializationSchema getKafkaDBMessageDeserializationSchema();

    protected abstract void run(StreamExecutionEnvironment streamExecutionEnvironment, Params params, DataStream<KafkaDBMessage> dataStream) throws Exception;

    @Override // cn.tenmg.flink.jobs.StreamService
    public void run(StreamExecutionEnvironment streamExecutionEnvironment, Params params) throws Exception {
        DataStream<KafkaDBMessage> addSource;
        if (RuntimeExecutionMode.BATCH.equals(params.getRuntimeMode())) {
            addSource = getBatchDataStream(streamExecutionEnvironment, params);
        } else {
            Properties kafkaProperties = getKafkaProperties();
            String groupIdPrefix = getGroupIdPrefix();
            kafkaProperties.setProperty("group.id", groupIdPrefix == null ? "flink-jobs" : groupIdPrefix + "_" + params.getServiceName());
            FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(Arrays.asList(getSubscribe().split(",")), getKafkaDBMessageDeserializationSchema(), kafkaProperties);
            String startingOffset = getStartingOffset();
            if (startingOffset == null) {
                flinkKafkaConsumer.setStartFromLatest();
            } else if ("earliest".equals(startingOffset)) {
                flinkKafkaConsumer.setStartFromEarliest();
            } else if ("groupOffsets".equals(startingOffset)) {
                flinkKafkaConsumer.setStartFromGroupOffsets();
            } else {
                flinkKafkaConsumer.setStartFromLatest();
            }
            flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(true);
            FilterFunction<KafkaDBMessage> filter = getFilter();
            addSource = filter == null ? streamExecutionEnvironment.addSource(flinkKafkaConsumer) : streamExecutionEnvironment.addSource(flinkKafkaConsumer).filter(filter);
        }
        run(streamExecutionEnvironment, params, addSource);
    }
}
