package io.pravega.connectors.flink;

import io.pravega.client.BatchClientFactory;
import io.pravega.client.ClientConfig;
import io.pravega.client.batch.SegmentIterator;
import io.pravega.client.batch.SegmentRange;
import io.pravega.connectors.flink.serialization.WrappingSerializer;
import io.pravega.connectors.flink.util.FlinkPravegaUtils;
import io.pravega.connectors.flink.util.StreamWithBoundaries;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaInputFormat.class */
public class FlinkPravegaInputFormat<T> extends RichInputFormat<T, PravegaInputSplit> {
    private static final Logger log = LoggerFactory.getLogger(FlinkPravegaInputFormat.class);
    private static final long serialVersionUID = 1;
    private static final String DEFAULT_CLIENT_SCOPE_NAME = "__NOT_USED";
    private final ClientConfig clientConfig;
    private final String clientScope = DEFAULT_CLIENT_SCOPE_NAME;
    private final List<StreamWithBoundaries> streams;
    private final DeserializationSchema<T> deserializationSchema;
    private transient BatchClientFactory batchClientFactory;
    private transient SegmentIterator<T> segmentIterator;

    /* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaInputFormat$Builder.class */
    public static class Builder<T> extends AbstractReaderBuilder<Builder<T>> {
        private DeserializationSchema<T> deserializationSchema;

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.pravega.connectors.flink.AbstractReaderBuilder
        public Builder<T> builder() {
            return this;
        }

        public Builder<T> withDeserializationSchema(DeserializationSchema<T> deserializationSchema) {
            this.deserializationSchema = deserializationSchema;
            return builder();
        }

        protected DeserializationSchema<T> getDeserializationSchema() {
            Preconditions.checkState(this.deserializationSchema != null, "Deserialization schema must not be null.");
            return this.deserializationSchema;
        }

        public FlinkPravegaInputFormat<T> build() {
            return new FlinkPravegaInputFormat<>(getPravegaConfig().getClientConfig(), resolveStreams(), getDeserializationSchema());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FlinkPravegaInputFormat(ClientConfig clientConfig, List<StreamWithBoundaries> list, DeserializationSchema<T> deserializationSchema) {
        this.clientConfig = (ClientConfig) Preconditions.checkNotNull(clientConfig, "clientConfig");
        this.streams = (List) Preconditions.checkNotNull(list, "streams");
        this.deserializationSchema = (DeserializationSchema) Preconditions.checkNotNull(deserializationSchema, "deserializationSchema");
    }

    public void openInputFormat() throws IOException {
        super.openInputFormat();
        this.batchClientFactory = getBatchClientFactory(this.clientScope, this.clientConfig);
    }

    @VisibleForTesting
    protected BatchClientFactory getBatchClientFactory(String str, ClientConfig clientConfig) {
        return BatchClientFactory.withScope(str, clientConfig);
    }

    public void closeInputFormat() throws IOException {
        this.batchClientFactory.close();
    }

    public void configure(Configuration configuration) {
    }

    public BaseStatistics getStatistics(BaseStatistics baseStatistics) throws IOException {
        return baseStatistics;
    }

    /* renamed from: createInputSplits, reason: merged with bridge method [inline-methods] */
    public PravegaInputSplit[] m115createInputSplits(int i) throws IOException {
        ArrayList arrayList = new ArrayList();
        BatchClientFactory batchClientFactory = getBatchClientFactory(this.clientScope, this.clientConfig);
        try {
            for (StreamWithBoundaries streamWithBoundaries : this.streams) {
                Iterator<SegmentRange> iterator = batchClientFactory.getSegments(streamWithBoundaries.getStream(), streamWithBoundaries.getFrom(), streamWithBoundaries.getTo()).getIterator();
                while (iterator.hasNext()) {
                    arrayList.add(new PravegaInputSplit(arrayList.size(), iterator.next()));
                }
            }
            if (batchClientFactory != null) {
                batchClientFactory.close();
            }
            log.info("Prepared {} input splits", Integer.valueOf(arrayList.size()));
            return (PravegaInputSplit[]) arrayList.toArray(new PravegaInputSplit[arrayList.size()]);
        } catch (Throwable th) {
            if (batchClientFactory != null) {
                try {
                    batchClientFactory.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public InputSplitAssigner getInputSplitAssigner(PravegaInputSplit[] pravegaInputSplitArr) {
        return new DefaultInputSplitAssigner(pravegaInputSplitArr);
    }

    public void open(PravegaInputSplit pravegaInputSplit) throws IOException {
        this.segmentIterator = this.batchClientFactory.readSegment(pravegaInputSplit.getSegmentRange(), this.deserializationSchema instanceof WrappingSerializer ? this.deserializationSchema.getWrappedSerializer() : new FlinkPravegaUtils.FlinkDeserializer<>(this.deserializationSchema));
    }

    public boolean reachedEnd() throws IOException {
        return !this.segmentIterator.hasNext();
    }

    public T nextRecord(T t) throws IOException {
        return this.segmentIterator.next();
    }

    public void close() throws IOException {
        this.segmentIterator.close();
    }

    public static <T> Builder<T> builder() {
        return new Builder<>();
    }
}
