package org.apache.spark.shuffle.celeborn;

import org.apache.celeborn.client.ShuffleClient;
import org.apache.celeborn.client.read.MetricsCallback;
import org.apache.celeborn.client.read.RssInputStream;
import org.apache.celeborn.common.CelebornConf;
import org.apache.spark.Aggregator;
import org.apache.spark.InterruptibleIterator;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.TaskContext;
import org.apache.spark.internal.Logging;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.shuffle.ShuffleReadMetricsReporter;
import org.apache.spark.shuffle.ShuffleReader;
import org.apache.spark.sql.execution.columnar.RssBatchBuilder$;
import org.apache.spark.sql.execution.columnar.RssColumnarBatchSerializer;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.util.CompletionIterator$;
import org.apache.spark.util.collection.ExternalSorter;
import org.apache.spark.util.collection.ExternalSorter$;
import org.slf4j.Logger;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Predef$;
import scala.Product2;
import scala.Some;
import scala.collection.Iterator;
import scala.math.Ordering;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;

/* compiled from: RssShuffleReader.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005Uc\u0001\u0002\u000b\u0016\u0001\u0001B\u0001\u0002\u0011\u0001\u0003\u0002\u0003\u0006I!\u0011\u0005\t\u0013\u0002\u0011\t\u0011)A\u0005\u0015\"AQ\n\u0001B\u0001B\u0003%!\n\u0003\u0005O\u0001\t\u0005\t\u0015!\u0003K\u0011!y\u0005A!A!\u0002\u0013Q\u0005\u0002\u0003)\u0001\u0005\u0003\u0005\u000b\u0011B)\t\u0011U\u0003!\u0011!Q\u0001\nYC\u0001\"\u0018\u0001\u0003\u0002\u0003\u0006IA\u0018\u0005\u0006C\u0002!\tA\u0019\u0005\ba\u0002\u0011\r\u0011\"\u0003r\u0011\u0019A\b\u0001)A\u0005e\"9\u0011\u0010\u0001b\u0001\n\u0013Q\bbBA\u0002\u0001\u0001\u0006Ia\u001f\u0005\b\u0003\u000b\u0001A\u0011IA\u0004\u000f%\t9#FA\u0001\u0012\u0003\tIC\u0002\u0005\u0015+\u0005\u0005\t\u0012AA\u0016\u0011\u0019\t\u0007\u0003\"\u0001\u0002.!I\u0011q\u0006\t\u0012\u0002\u0013\u0005\u0011\u0011\u0007\u0005\n\u0003\u001b\u0002\u0012\u0013!C\u0001\u0003\u001f\u0012\u0001CU:t'\",hM\u001a7f%\u0016\fG-\u001a:\u000b\u0005Y9\u0012\u0001C2fY\u0016\u0014wN\u001d8\u000b\u0005aI\u0012aB:ik\u001a4G.\u001a\u0006\u00035m\tQa\u001d9be.T!\u0001H\u000f\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005q\u0012aA8sO\u000e\u0001QcA\u0011/qM!\u0001A\t\u0015;!\t\u0019c%D\u0001%\u0015\u0005)\u0013!B:dC2\f\u0017BA\u0014%\u0005\u0019\te.\u001f*fMB!\u0011F\u000b\u00178\u001b\u00059\u0012BA\u0016\u0018\u00055\u0019\u0006.\u001e4gY\u0016\u0014V-\u00193feB\u0011QF\f\u0007\u0001\t\u0015y\u0003A1\u00011\u0005\u0005Y\u0015CA\u00195!\t\u0019#'\u0003\u00024I\t9aj\u001c;iS:<\u0007CA\u00126\u0013\t1DEA\u0002B]f\u0004\"!\f\u001d\u0005\u000be\u0002!\u0019\u0001\u0019\u0003\u0003\r\u0003\"a\u000f \u000e\u0003qR!!P\r\u0002\u0011%tG/\u001a:oC2L!a\u0010\u001f\u0003\u000f1{wmZ5oO\u00061\u0001.\u00198eY\u0016\u0004$AQ$\u0011\u000b\r#EFR\u001c\u000e\u0003UI!!R\u000b\u0003!I\u001b8o\u00155vM\u001adW\rS1oI2,\u0007CA\u0017H\t%A\u0015!!A\u0001\u0002\u000b\u0005\u0001GA\u0002`IE\nab\u001d;beR\u0004\u0016M\u001d;ji&|g\u000e\u0005\u0002$\u0017&\u0011A\n\n\u0002\u0004\u0013:$\u0018\u0001D3oIB\u000b'\u000f^5uS>t\u0017!D:uCJ$X*\u00199J]\u0012,\u00070A\u0006f]\u0012l\u0015\r]%oI\u0016D\u0018aB2p]R,\u0007\u0010\u001e\t\u0003%Nk\u0011!G\u0005\u0003)f\u00111\u0002V1tW\u000e{g\u000e^3yi\u0006!1m\u001c8g!\t96,D\u0001Y\u0015\tI&,\u0001\u0004d_6lwN\u001c\u0006\u0003-mI!\u0001\u0018-\u0003\u0019\r+G.\u001a2pe:\u001cuN\u001c4\u0002\u000f5,GO]5dgB\u0011\u0011fX\u0005\u0003A^\u0011!d\u00155vM\u001adWMU3bI6+GO]5dgJ+\u0007o\u001c:uKJ\fa\u0001P5oSRtD#C2eS*\\G.\u001c8p!\u0011\u0019\u0005\u0001L\u001c\t\u000b\u0001K\u0001\u0019A31\u0005\u0019D\u0007#B\"EY\u001d<\u0004CA\u0017i\t%AE-!A\u0001\u0002\u000b\u0005\u0001\u0007C\u0003J\u0013\u0001\u0007!\nC\u0003N\u0013\u0001\u0007!\nC\u0004O\u0013A\u0005\t\u0019\u0001&\t\u000f=K\u0001\u0013!a\u0001\u0015\")\u0001+\u0003a\u0001#\")Q+\u0003a\u0001-\")Q,\u0003a\u0001=\u0006\u0019A-\u001a9\u0016\u0003I\u0004$a]<\u0011\u000bI#HF^\u001c\n\u0005UL\"!E*ik\u001a4G.\u001a#fa\u0016tG-\u001a8dsB\u0011Qf\u001e\u0003\n\u0011\u0006\t\t\u0011!A\u0003\u0002A\nA\u0001Z3qA\u0005\u0001\"o]:TQV4g\r\\3DY&,g\u000e^\u000b\u0002wB\u0011Ap`\u0007\u0002{*\u0011aPW\u0001\u0007G2LWM\u001c;\n\u0007\u0005\u0005QPA\u0007TQV4g\r\\3DY&,g\u000e^\u0001\u0012eN\u001c8\u000b[;gM2,7\t\\5f]R\u0004\u0013\u0001\u0002:fC\u0012$\"!!\u0003\u0011\r\u0005-\u00111DA\u0011\u001d\u0011\ti!a\u0006\u000f\t\u0005=\u0011QC\u0007\u0003\u0003#Q1!a\u0005 \u0003\u0019a$o\\8u}%\tQ%C\u0002\u0002\u001a\u0011\nq\u0001]1dW\u0006<W-\u0003\u0003\u0002\u001e\u0005}!\u0001C%uKJ\fGo\u001c:\u000b\u0007\u0005eA\u0005E\u0003$\u0003Gas'C\u0002\u0002&\u0011\u0012\u0001\u0002\u0015:pIV\u001cGOM\u0001\u0011%N\u001c8\u000b[;gM2,'+Z1eKJ\u0004\"a\u0011\t\u0014\u0005A\u0011CCAA\u0015\u0003m!C.Z:tS:LG\u000fJ4sK\u0006$XM\u001d\u0013eK\u001a\fW\u000f\u001c;%iU1\u00111GA%\u0003\u0017*\"!!\u000e+\u0007)\u000b9d\u000b\u0002\u0002:A!\u00111HA#\u001b\t\tiD\u0003\u0003\u0002@\u0005\u0005\u0013!C;oG\",7m[3e\u0015\r\t\u0019\u0005J\u0001\u000bC:tw\u000e^1uS>t\u0017\u0002BA$\u0003{\u0011\u0011#\u001e8dQ\u0016\u001c7.\u001a3WCJL\u0017M\\2f\t\u0015y#C1\u00011\t\u0015I$C1\u00011\u0003m!C.Z:tS:LG\u000fJ4sK\u0006$XM\u001d\u0013eK\u001a\fW\u000f\u001c;%kU1\u00111GA)\u0003'\"QaL\nC\u0002A\"Q!O\nC\u0002A\u0002")
/* loaded from: input_file:org/apache/spark/shuffle/celeborn/RssShuffleReader.class */
public class RssShuffleReader<K, C> implements ShuffleReader<K, C>, Logging {
    private final RssShuffleHandle<K, ?, C> handle;
    private final int startPartition;
    private final int endPartition;
    private final int startMapIndex;
    private final int endMapIndex;
    private final TaskContext context;
    private final CelebornConf conf;
    public final ShuffleReadMetricsReporter org$apache$spark$shuffle$celeborn$RssShuffleReader$$metrics;
    private final ShuffleDependency<K, ?, C> dep;
    private final ShuffleClient rssShuffleClient;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.initializeLogIfNecessary$(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$(this);
    }

    public void initializeForcefully(boolean z, boolean z2) {
        Logging.initializeForcefully$(this, z, z2);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    private ShuffleDependency<K, ?, C> dep() {
        return this.dep;
    }

    private ShuffleClient rssShuffleClient() {
        return this.rssShuffleClient;
    }

    public Iterator<Product2<K, C>> read() {
        Iterator iterator;
        Ordering ordering;
        ObjectRef create = ObjectRef.create(dep().serializer().newInstance());
        if (this.conf.columnarShuffleEnabled()) {
            StructType schema = SparkUtils.getSchema(dep());
            if (RssBatchBuilder$.MODULE$.supportsColumnarType(schema)) {
                create.elem = new RssColumnarBatchSerializer(schema, this.conf.columnarShuffleBatchSize(), this.conf.columnarShuffleDictionaryEnabled(), this.conf.columnarShuffleOffHeapEnabled(), SparkUtils.getDataSize(dep().serializer())).newInstance();
            }
        }
        MetricsCallback metricsCallback = new MetricsCallback(this) { // from class: org.apache.spark.shuffle.celeborn.RssShuffleReader$$anon$1
            private final /* synthetic */ RssShuffleReader $outer;

            @Override // org.apache.celeborn.client.read.MetricsCallback
            public void incBytesRead(long j) {
                this.$outer.org$apache$spark$shuffle$celeborn$RssShuffleReader$$metrics.incRemoteBytesRead(j);
                this.$outer.org$apache$spark$shuffle$celeborn$RssShuffleReader$$metrics.incRemoteBlocksFetched(1L);
            }

            @Override // org.apache.celeborn.client.read.MetricsCallback
            public void incReadTime(long j) {
                this.$outer.org$apache$spark$shuffle$celeborn$RssShuffleReader$$metrics.incFetchWaitTime(j);
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        };
        Iterator interruptibleIterator = new InterruptibleIterator(this.context, CompletionIterator$.MODULE$.apply(RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(this.startPartition), this.endPartition).iterator().map(obj -> {
            return $anonfun$read$1(this, metricsCallback, BoxesRunTime.unboxToInt(obj));
        }).flatMap(rssInputStream -> {
            return ((SerializerInstance) create.elem).deserializeStream(rssInputStream).asKeyValueIterator();
        }).map(tuple2 -> {
            this.org$apache$spark$shuffle$celeborn$RssShuffleReader$$metrics.incRecordsRead(1L);
            return tuple2;
        }), () -> {
            this.context.taskMetrics().mergeShuffleReadMetrics();
        }));
        Iterator combineCombinersByKey = dep().aggregator().isDefined() ? dep().mapSideCombine() ? ((Aggregator) dep().aggregator().get()).combineCombinersByKey(interruptibleIterator, this.context) : ((Aggregator) dep().aggregator().get()).combineValuesByKey(interruptibleIterator, this.context) : interruptibleIterator;
        Some keyOrdering = dep().keyOrdering();
        if ((keyOrdering instanceof Some) && (ordering = (Ordering) keyOrdering.value()) != null) {
            ExternalSorter externalSorter = new ExternalSorter(this.context, ExternalSorter$.MODULE$.$lessinit$greater$default$2(), ExternalSorter$.MODULE$.$lessinit$greater$default$3(), new Some(ordering), dep().serializer());
            externalSorter.insertAll(combineCombinersByKey);
            this.context.taskMetrics().incMemoryBytesSpilled(externalSorter.memoryBytesSpilled());
            this.context.taskMetrics().incDiskBytesSpilled(externalSorter.diskBytesSpilled());
            this.context.taskMetrics().incPeakExecutionMemory(externalSorter.peakMemoryUsedBytes());
            this.context.addTaskCompletionListener(taskContext -> {
                externalSorter.stop();
                return BoxedUnit.UNIT;
            });
            iterator = CompletionIterator$.MODULE$.apply(externalSorter.iterator(), () -> {
                externalSorter.stop();
            });
        } else {
            if (!None$.MODULE$.equals(keyOrdering)) {
                throw new MatchError(keyOrdering);
            }
            iterator = combineCombinersByKey;
        }
        Iterator iterator2 = iterator;
        return iterator2 instanceof InterruptibleIterator ? iterator2 : new InterruptibleIterator(this.context, iterator2);
    }

    public static final /* synthetic */ RssInputStream $anonfun$read$1(RssShuffleReader rssShuffleReader, MetricsCallback metricsCallback, int i) {
        if (rssShuffleReader.handle.numMappers() <= 0) {
            return RssInputStream.empty();
        }
        long currentTimeMillis = System.currentTimeMillis();
        RssInputStream readPartition = rssShuffleReader.rssShuffleClient().readPartition(rssShuffleReader.handle.newAppId(), rssShuffleReader.handle.shuffleId(), i, rssShuffleReader.context.attemptNumber(), rssShuffleReader.startMapIndex, rssShuffleReader.endMapIndex);
        metricsCallback.incReadTime(System.currentTimeMillis() - currentTimeMillis);
        readPartition.setCallback(metricsCallback);
        rssShuffleReader.context.addTaskCompletionListener(taskContext -> {
            readPartition.close();
            return BoxedUnit.UNIT;
        });
        return readPartition;
    }

    public RssShuffleReader(RssShuffleHandle<K, ?, C> rssShuffleHandle, int i, int i2, int i3, int i4, TaskContext taskContext, CelebornConf celebornConf, ShuffleReadMetricsReporter shuffleReadMetricsReporter) {
        this.handle = rssShuffleHandle;
        this.startPartition = i;
        this.endPartition = i2;
        this.startMapIndex = i3;
        this.endMapIndex = i4;
        this.context = taskContext;
        this.conf = celebornConf;
        this.org$apache$spark$shuffle$celeborn$RssShuffleReader$$metrics = shuffleReadMetricsReporter;
        Logging.$init$(this);
        this.dep = rssShuffleHandle.dependency();
        this.rssShuffleClient = ShuffleClient.get(rssShuffleHandle.rssMetaServiceHost(), rssShuffleHandle.rssMetaServicePort(), celebornConf, rssShuffleHandle.userIdentifier());
    }
}
