package io.pravega.connectors.flink;

import java.util.function.Supplier;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sources.BatchTableSource;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaTableSource.class */
public abstract class FlinkPravegaTableSource implements StreamTableSource<Row>, BatchTableSource<Row> {
    private final Supplier<FlinkPravegaReader<Row>> sourceFunctionFactory;
    private final Supplier<FlinkPravegaInputFormat<Row>> inputFormatFactory;
    private final TableSchema schema;
    private final TypeInformation<Row> returnType;

    /* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaTableSource$BuilderBase.class */
    public static abstract class BuilderBase<T extends FlinkPravegaTableSource, B extends AbstractStreamingReaderBuilder> extends AbstractStreamingReaderBuilder<Row, B> {
        private TableSchema schema;

        public B withSchema(TableSchema tableSchema) {
            Preconditions.checkNotNull(tableSchema, "Schema must not be null.");
            Preconditions.checkArgument(this.schema == null, "Schema has already been set.");
            this.schema = tableSchema;
            return (B) builder();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public TableSchema getTableSchema() {
            Preconditions.checkState(this.schema != null, "Schema hasn't been set.");
            return this.schema;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void configureTableSource(T t) {
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public FlinkPravegaInputFormat<Row> buildInputFormat() {
            return new FlinkPravegaInputFormat<>(getPravegaConfig().getClientConfig(), resolveStreams(), getDeserializationSchema());
        }

        @Override // io.pravega.connectors.flink.AbstractStreamingReaderBuilder
        public /* bridge */ /* synthetic */ AbstractStreamingReaderBuilder withEventReadTimeout(Time time) {
            return super.withEventReadTimeout(time);
        }

        @Override // io.pravega.connectors.flink.AbstractStreamingReaderBuilder
        public /* bridge */ /* synthetic */ AbstractStreamingReaderBuilder withCheckpointInitiateTimeout(Time time) {
            return super.withCheckpointInitiateTimeout(time);
        }

        @Override // io.pravega.connectors.flink.AbstractStreamingReaderBuilder
        public /* bridge */ /* synthetic */ AbstractStreamingReaderBuilder withReaderGroupRefreshTime(Time time) {
            return super.withReaderGroupRefreshTime(time);
        }

        @Override // io.pravega.connectors.flink.AbstractStreamingReaderBuilder
        public /* bridge */ /* synthetic */ AbstractStreamingReaderBuilder withReaderGroupName(String str) {
            return super.withReaderGroupName(str);
        }

        @Override // io.pravega.connectors.flink.AbstractStreamingReaderBuilder
        public /* bridge */ /* synthetic */ AbstractStreamingReaderBuilder withReaderGroupScope(String str) {
            return super.withReaderGroupScope(str);
        }

        @Override // io.pravega.connectors.flink.AbstractStreamingReaderBuilder
        public /* bridge */ /* synthetic */ AbstractStreamingReaderBuilder uid(String str) {
            return super.uid(str);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FlinkPravegaTableSource(Supplier<FlinkPravegaReader<Row>> supplier, Supplier<FlinkPravegaInputFormat<Row>> supplier2, TableSchema tableSchema, TypeInformation<Row> typeInformation) {
        this.sourceFunctionFactory = (Supplier) Preconditions.checkNotNull(supplier, "sourceFunctionFactory");
        this.inputFormatFactory = (Supplier) Preconditions.checkNotNull(supplier2, "inputFormatFactory");
        this.schema = (TableSchema) Preconditions.checkNotNull(tableSchema, "schema");
        this.returnType = (TypeInformation) Preconditions.checkNotNull(typeInformation, "returnType");
    }

    public DataStream<Row> getDataStream(StreamExecutionEnvironment streamExecutionEnvironment) {
        FlinkPravegaReader<Row> flinkPravegaReader = this.sourceFunctionFactory.get();
        flinkPravegaReader.initialize();
        return streamExecutionEnvironment.addSource(flinkPravegaReader);
    }

    public DataSet<Row> getDataSet(ExecutionEnvironment executionEnvironment) {
        return executionEnvironment.createInput(this.inputFormatFactory.get());
    }

    public TypeInformation<Row> getReturnType() {
        return this.returnType;
    }

    public TableSchema getTableSchema() {
        return this.schema;
    }
}
