package io.pravega.connectors.flink;

import java.util.Arrays;
import java.util.function.Function;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.BatchTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.table.utils.TableConnectorUtils;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

@Deprecated
/* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaTableSink.class */
public class FlinkPravegaTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> {
    protected final Function<TableSchema, FlinkPravegaWriter<Row>> writerFactory;
    protected final Function<TableSchema, FlinkPravegaOutputFormat<Row>> outputFormatFactory;
    protected TableSchema schema;

    /* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaTableSink$RowBasedRouter.class */
    public static class RowBasedRouter implements PravegaEventRouter<Row> {
        private final int keyIndex;

        public RowBasedRouter(String str, String[] strArr, DataType[] dataTypeArr) {
            Preconditions.checkArgument(strArr.length == dataTypeArr.length, "Number of provided field names and types does not match.");
            int indexOf = Arrays.asList(strArr).indexOf(str);
            Preconditions.checkArgument(indexOf >= 0, "Key field '" + str + "' not found");
            Preconditions.checkArgument(DataTypes.STRING().equals(dataTypeArr[indexOf]), "Key field must be of type 'STRING'");
            this.keyIndex = indexOf;
        }

        @Override // io.pravega.connectors.flink.PravegaEventRouter
        public String getRoutingKey(Row row) {
            return (String) row.getField(this.keyIndex);
        }

        int getKeyIndex() {
            return this.keyIndex;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FlinkPravegaTableSink(Function<TableSchema, FlinkPravegaWriter<Row>> function, Function<TableSchema, FlinkPravegaOutputFormat<Row>> function2, TableSchema tableSchema) {
        this.writerFactory = (Function) Preconditions.checkNotNull(function, "writerFactory");
        this.outputFormatFactory = (Function) Preconditions.checkNotNull(function2, "outputFormatFactory");
        this.schema = TableSchemaUtils.checkNoGeneratedColumns(tableSchema);
    }

    private FlinkPravegaTableSink createCopy() {
        return new FlinkPravegaTableSink(this.writerFactory, this.outputFormatFactory, this.schema);
    }

    public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
        Preconditions.checkState(this.schema != null, "Table sink is not configured");
        return dataStream.addSink(this.writerFactory.apply(this.schema)).setParallelism(dataStream.getParallelism()).name(TableConnectorUtils.generateRuntimeName(getClass(), getFieldNames()));
    }

    public DataSink<?> consumeDataSet(DataSet<Row> dataSet) {
        Preconditions.checkState(this.schema != null, "Table sink is not configured");
        return dataSet.output(this.outputFormatFactory.apply(this.schema));
    }

    public DataType getConsumedDataType() {
        return this.schema.toRowDataType();
    }

    public TableSchema getTableSchema() {
        return this.schema;
    }

    public FlinkPravegaTableSink configure(String[] strArr, TypeInformation<?>[] typeInformationArr) {
        Preconditions.checkNotNull(strArr, "fieldNames");
        Preconditions.checkNotNull(typeInformationArr, "fieldTypes");
        Preconditions.checkArgument(strArr.length == typeInformationArr.length, "Number of provided field names and types does not match.");
        FlinkPravegaTableSink createCopy = createCopy();
        createCopy.schema = TableSchema.builder().fields(strArr, (DataType[]) Arrays.stream(typeInformationArr).map(TypeConversions::fromLegacyInfoToDataType).toArray(i -> {
            return new DataType[i];
        })).build();
        return createCopy;
    }

    /* renamed from: configure, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ TableSink m124configure(String[] strArr, TypeInformation[] typeInformationArr) {
        return configure(strArr, (TypeInformation<?>[]) typeInformationArr);
    }
}
