package cz.o2.proxima.beam.core.direct.io;

import cz.o2.proxima.beam.core.io.StreamElementCoder;
import cz.o2.proxima.core.repository.RepositoryFactory;
import cz.o2.proxima.core.storage.Partition;
import cz.o2.proxima.core.storage.StreamElement;
import cz.o2.proxima.core.storage.commitlog.Position;
import cz.o2.proxima.direct.core.commitlog.CommitLogObserver;
import cz.o2.proxima.direct.core.commitlog.CommitLogReader;
import cz.o2.proxima.direct.core.commitlog.Offset;
import cz.o2.proxima.internal.com.google.common.base.MoreObjects;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.Generated;
import org.apache.beam.repackaged.kryo.com.esotericsoftware.kryo.serializers.JavaSerializer;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.kryo.KryoCoder;
import org.apache.beam.sdk.extensions.kryo.KryoRegistrar;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:cz/o2/proxima/beam/core/direct/io/DirectUnboundedSource.class */
public class DirectUnboundedSource extends UnboundedSource<StreamElement, Checkpoint> {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(DirectUnboundedSource.class);
    private static final long serialVersionUID = 1;
    private final RepositoryFactory factory;

    @Nullable
    private final String name;
    private final CommitLogReader.Factory<?> readerFactory;
    private final Position position;
    private final boolean eventTime;
    private final List<Partition> partitions = new ArrayList();
    private final long limit;

    @Nullable
    private final Partition partition;
    private final URI uri;

    @Nullable
    private transient CommitLogReader reader;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:cz/o2/proxima/beam/core/direct/io/DirectUnboundedSource$Checkpoint.class */
    public static class Checkpoint implements UnboundedSource.CheckpointMark, Serializable {
        private static final long serialVersionUID = 1;

        @Nullable
        private final Offset offset;
        private final long limit;
        private final URI uri;

        @Nullable
        private final transient CommitLogObserver.OffsetCommitter committer;

        @Nullable
        private final transient CommitLogObserver.OffsetCommitter nackCommitter;
        private final transient BeamCommitLogReader reader;

        /* JADX INFO: Access modifiers changed from: package-private */
        public Checkpoint(@Nonnull BeamCommitLogReader beamCommitLogReader) {
            this.offset = beamCommitLogReader.getCurrentOffset();
            this.limit = beamCommitLogReader.getLimit();
            this.uri = beamCommitLogReader.getUri();
            this.committer = beamCommitLogReader.hasExternalizableOffsets() ? null : beamCommitLogReader.getLastReadCommitter();
            this.nackCommitter = beamCommitLogReader.hasExternalizableOffsets() ? null : beamCommitLogReader.getLastWrittenCommitter();
            this.reader = (BeamCommitLogReader) Objects.requireNonNull(beamCommitLogReader);
        }

        public void finalizeCheckpoint() {
            if (this.committer != null) {
                this.committer.confirm();
            }
            if (this.nackCommitter != null) {
                this.nackCommitter.nack();
            }
            if (this.reader == null || this.reader.hasExternalizableOffsets()) {
                return;
            }
            this.reader.clearIncomingQueue();
        }

        public String toString() {
            return MoreObjects.toStringHelper(this).add("offset", this.offset).add("limit", this.limit).add("committer", this.committer).add("nackCommitter", this.nackCommitter).toString();
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            Checkpoint checkpoint = (Checkpoint) obj;
            return this.limit == checkpoint.limit && Objects.equals(this.offset, checkpoint.offset) && Objects.equals(this.uri, checkpoint.uri);
        }

        public int hashCode() {
            return Objects.hash(this.offset, Long.valueOf(this.limit), this.uri);
        }

        @Generated
        @Nullable
        public Offset getOffset() {
            return this.offset;
        }

        @Generated
        public long getLimit() {
            return this.limit;
        }
    }

    static DirectUnboundedSource of(RepositoryFactory repositoryFactory, String str, CommitLogReader commitLogReader, Position position, boolean z, long j) {
        return new DirectUnboundedSource(repositoryFactory, str, commitLogReader, position, z, j, null);
    }

    DirectUnboundedSource(@Nonnull RepositoryFactory repositoryFactory, @Nullable String str, @Nonnull CommitLogReader commitLogReader, @Nonnull Position position, boolean z, long j, @Nullable Partition partition) {
        this.factory = (RepositoryFactory) Objects.requireNonNull(repositoryFactory);
        this.name = str;
        this.readerFactory = ((CommitLogReader) Objects.requireNonNull(commitLogReader)).asFactory();
        this.position = position;
        this.eventTime = z;
        this.limit = j;
        this.partition = partition;
        this.uri = commitLogReader.getUri();
        this.reader = commitLogReader;
    }

    public List<UnboundedSource<StreamElement, Checkpoint>> split(int i, PipelineOptions pipelineOptions) {
        if (this.partition != null) {
            return Collections.singletonList(this);
        }
        if (this.partitions.isEmpty()) {
            this.partitions.addAll(reader().getPartitions());
        }
        long count = this.partitions.stream().filter((v0) -> {
            return v0.isSplittable();
        }).count();
        int max = count > 0 ? Math.max(0, (int) ((i - (this.partitions.size() - count)) / count)) : 0;
        int size = (int) ((this.partitions.size() - count) + (count * max));
        return (List) this.partitions.stream().flatMap(partition -> {
            return (!partition.isSplittable() || max <= 0) ? Stream.of(partition) : partition.split(max).stream();
        }).map(partition2 -> {
            return new DirectUnboundedSource(this.factory, this.name, reader(), this.position, this.eventTime, this.limit / size, partition2);
        }).collect(Collectors.toList());
    }

    private CommitLogReader reader() {
        if (this.reader == null) {
            this.reader = (CommitLogReader) this.readerFactory.apply(this.factory.apply());
        }
        return this.reader;
    }

    public UnboundedSource.UnboundedReader<StreamElement> createReader(PipelineOptions pipelineOptions, Checkpoint checkpoint) {
        Offset offset = checkpoint == null ? null : checkpoint.getOffset();
        long limit = checkpoint == null ? this.limit : checkpoint.getLimit();
        CommitLogReader reader = reader();
        log.info("Created reader reading from {} with offset {} and limit {}", new Object[]{reader.getUri(), offset, Long.valueOf(limit)});
        return BeamCommitLogReader.unbounded(this, this.name, reader, this.position, this.eventTime, limit, this.partition, offset);
    }

    public Coder<Checkpoint> getCheckpointMarkCoder() {
        return KryoCoder.of(new KryoRegistrar[]{kryo -> {
            kryo.addDefaultSerializer(Checkpoint.class, new JavaSerializer());
        }});
    }

    public Coder<StreamElement> getOutputCoder() {
        return StreamElementCoder.of(this.factory);
    }

    public boolean requiresDeduping() {
        return !reader().hasExternalizableOffsets() && this.eventTime;
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        DirectUnboundedSource directUnboundedSource = (DirectUnboundedSource) obj;
        return this.eventTime == directUnboundedSource.eventTime && this.limit == directUnboundedSource.limit && Objects.equals(this.name, directUnboundedSource.name) && this.position == directUnboundedSource.position && Objects.equals(this.partition, directUnboundedSource.partition) && Objects.equals(this.uri, directUnboundedSource.uri);
    }

    public int hashCode() {
        return Objects.hash(this.name, this.position, Boolean.valueOf(this.eventTime), Long.valueOf(this.limit), this.partition, this.uri);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1847973403:
                if (implMethodName.equals("lambda$getCheckpointMarkCoder$d632b9be$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/extensions/kryo/KryoRegistrar") && serializedLambda.getFunctionalInterfaceMethodName().equals("registerClasses") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/beam/repackaged/kryo/com/esotericsoftware/kryo/Kryo;)V") && serializedLambda.getImplClass().equals("cz/o2/proxima/beam/core/direct/io/DirectUnboundedSource") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/repackaged/kryo/com/esotericsoftware/kryo/Kryo;)V")) {
                    return kryo -> {
                        kryo.addDefaultSerializer(Checkpoint.class, new JavaSerializer());
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
