/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.beam.direct.io;

import cz.o2.proxima.beam.core.io.StreamElementCoder;
import cz.o2.proxima.beam.direct.io.BeamCommitLogReader;
import cz.o2.proxima.direct.commitlog.CommitLogReader;
import cz.o2.proxima.direct.commitlog.LogObserver;
import cz.o2.proxima.direct.commitlog.Offset;
import cz.o2.proxima.direct.core.Partition;
import cz.o2.proxima.internal.shaded.com.google.common.base.MoreObjects;
import cz.o2.proxima.repository.RepositoryFactory;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.Position;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.beam.repackaged.kryo.com.esotericsoftware.kryo.Serializer;
import org.apache.beam.repackaged.kryo.com.esotericsoftware.kryo.serializers.JavaSerializer;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.kryo.KryoCoder;
import org.apache.beam.sdk.extensions.kryo.KryoRegistrar;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;

class DirectUnboundedSource
extends UnboundedSource<StreamElement, Checkpoint> {
    private static final long serialVersionUID = 1L;
    private final RepositoryFactory factory;
    private final String name;
    private final CommitLogReader reader;
    private final Position position;
    private final boolean eventTime;
    private final List<Partition> partitions;
    private final long limit;
    @Nullable
    private final Partition partition;

    static DirectUnboundedSource of(RepositoryFactory factory, String name, CommitLogReader reader, Position position, boolean eventTime, long limit) {
        return new DirectUnboundedSource(factory, name, reader, position, eventTime, limit, null);
    }

    DirectUnboundedSource(RepositoryFactory factory, String name, CommitLogReader reader, Position position, boolean eventTime, long limit, @Nullable Partition partition) {
        this.factory = factory;
        this.name = name;
        this.reader = reader;
        this.position = position;
        this.eventTime = eventTime;
        this.partitions = reader.getPartitions();
        this.limit = limit;
        this.partition = partition;
    }

    public List<UnboundedSource<StreamElement, Checkpoint>> split(int desiredNumSplits, PipelineOptions options) {
        if (this.partition != null) {
            return Collections.singletonList(this);
        }
        long splittable = this.partitions.stream().filter(Partition::isSplittable).count();
        long nonSplittable = (long)this.partitions.size() - splittable;
        int splitDesired = splittable > 0L ? Math.max(0, (int)(((long)desiredNumSplits - nonSplittable) / splittable)) : 0;
        int resulting = (int)((long)this.partitions.size() - splittable + splittable * (long)splitDesired);
        return this.partitions.stream().flatMap(p -> p.isSplittable() && splitDesired > 0 ? p.split(splitDesired).stream() : Stream.of(p)).map(p -> new DirectUnboundedSource(this.factory, this.name, this.reader, this.position, this.eventTime, this.limit / (long)resulting, (Partition)p)).collect(Collectors.toList());
    }

    public UnboundedSource.UnboundedReader<StreamElement> createReader(PipelineOptions po, Checkpoint cmt) {
        Offset offset = cmt == null ? null : cmt.getOffset();
        long readerLimit = cmt == null ? this.limit : cmt.getLimit();
        return BeamCommitLogReader.unbounded(this, this.name, this.reader, this.position, this.eventTime, readerLimit, this.partition, offset);
    }

    public Coder<Checkpoint> getCheckpointMarkCoder() {
        return KryoCoder.of((KryoRegistrar[])new KryoRegistrar[]{(KryoRegistrar & Serializable)kryo -> kryo.addDefaultSerializer(Checkpoint.class, (Serializer)new JavaSerializer())});
    }

    public Coder<StreamElement> getOutputCoder() {
        return StreamElementCoder.of(this.factory);
    }

    public boolean requiresDeduping() {
        return !this.reader.hasExternalizableOffsets() && this.eventTime;
    }

    static class Checkpoint
    implements UnboundedSource.CheckpointMark,
    Serializable {
        private static final long serialVersionUID = 1L;
        @Nullable
        private final Offset offset;
        private final long limit;
        @Nullable
        private final transient LogObserver.OffsetCommitter committer;
        @Nullable
        private final transient LogObserver.OffsetCommitter nackCommitter;
        @Nullable
        private final transient BeamCommitLogReader reader;

        Checkpoint(BeamCommitLogReader reader) {
            this.offset = reader.getCurrentOffset();
            this.limit = reader.getLimit();
            this.committer = reader.hasExternalizableOffsets() ? null : reader.getLastReadCommitter();
            this.nackCommitter = reader.hasExternalizableOffsets() ? null : reader.getLastWrittenCommitter();
            this.reader = reader;
        }

        public void finalizeCheckpoint() {
            if (this.committer != null) {
                this.committer.confirm();
            }
            if (this.nackCommitter != null) {
                this.nackCommitter.nack();
            }
            if (this.reader != null && !this.reader.hasExternalizableOffsets()) {
                this.reader.clearIncomingQueue();
            }
        }

        public String toString() {
            return MoreObjects.toStringHelper((Object)this).add("offset", (Object)this.offset).add("limit", this.limit).add("committer", (Object)this.committer).add("nackCommitter", (Object)this.nackCommitter).toString();
        }

        @Nullable
        public Offset getOffset() {
            return this.offset;
        }

        public long getLimit() {
            return this.limit;
        }
    }
}

