package cz.o2.proxima.beam.direct.io;

import cz.o2.proxima.direct.commitlog.CommitLogReader;
import cz.o2.proxima.repository.RepositoryFactory;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.Position;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.Generated;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/beam/direct/io/DirectBoundedSource.class */
class DirectBoundedSource extends AbstractDirectBoundedSource {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(DirectBoundedSource.class);
    private static final long serialVersionUID = 1;
    private final String name;
    private final CommitLogReader.Factory<?> readerFactory;
    private final Position position;
    private final long limit;
    private final Partition partition;
    private transient CommitLogReader reader;

    /* JADX INFO: Access modifiers changed from: package-private */
    public static DirectBoundedSource of(RepositoryFactory repositoryFactory, String str, CommitLogReader commitLogReader, Position position, long j) {
        return new DirectBoundedSource(repositoryFactory, str, commitLogReader, position, j, null);
    }

    DirectBoundedSource(RepositoryFactory repositoryFactory, String str, CommitLogReader commitLogReader, Position position, long j, @Nullable Partition partition) {
        super(repositoryFactory);
        this.name = str;
        this.readerFactory = ((CommitLogReader) Objects.requireNonNull(commitLogReader)).asFactory();
        this.position = position;
        this.limit = j;
        this.partition = partition;
        this.reader = commitLogReader;
    }

    public List<BoundedSource<StreamElement>> split(long j, PipelineOptions pipelineOptions) {
        if (this.partition != null) {
            return Collections.singletonList(this);
        }
        List partitions = reader().getPartitions();
        int size = partitions.size();
        List<BoundedSource<StreamElement>> list = (List) partitions.stream().map(partition -> {
            return new DirectBoundedSource(this.factory, this.name, reader(), this.position, this.limit / size, partition);
        }).collect(Collectors.toList());
        log.debug("Split source {} into {}", this, list);
        return list;
    }

    public BoundedSource.BoundedReader<StreamElement> createReader(PipelineOptions pipelineOptions) {
        log.debug("Creating reader reading from position {} on partition {}", this.position, this.partition);
        return BeamCommitLogReader.bounded(this, this.name, reader(), this.position, this.limit, this.partition);
    }

    private CommitLogReader reader() {
        if (this.reader == null) {
            this.reader = (CommitLogReader) this.readerFactory.apply(this.factory.apply());
        }
        return this.reader;
    }
}
