package com.nvidia.spark.rapids.shuffle.ucx;

import com.nvidia.spark.rapids.shuffle.TransportUtils$;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.apache.spark.internal.Logging;
import org.slf4j.Logger;
import scala.Function0;
import scala.Predef$;
import scala.Tuple3;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.mutable.ArrayBuffer;
import scala.math.Numeric$IntIsIntegral$;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.RichInt$;

/* compiled from: UCXConnection.scala */
/* loaded from: input_file:com/nvidia/spark/rapids/shuffle/ucx/UCXConnection$.class */
public final class UCXConnection$ implements Logging {
    public static UCXConnection$ MODULE$;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    static {
        new UCXConnection$();
    }

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.initializeLogIfNecessary$(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$(this);
    }

    public void initializeForcefully(boolean z, boolean z2) {
        Logging.initializeForcefully$(this, z, z2);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    private ByteBuffer readBytesFromStream(boolean z, InputStream inputStream, int i) {
        IntRef create = IntRef.create(0);
        int i2 = 0;
        byte[] bArr = new byte[i];
        while (i2 >= 0 && create.elem < i) {
            logTrace(() -> {
                return new StringBuilder(23).append("Reading ").append(i).append(". Currently at ").append(create.elem).toString();
            });
            i2 = inputStream.read(bArr, create.elem, i - create.elem);
            if (i2 > 0) {
                create.elem += i2;
            }
        }
        if (create.elem < i) {
            throw new IllegalStateException("Read less bytes than expected!");
        }
        ByteBuffer wrap = ByteBuffer.wrap(bArr);
        if (!z) {
            return wrap;
        }
        ByteBuffer allocateDirect = ByteBuffer.allocateDirect(i);
        TransportUtils$.MODULE$.copyBuffer(wrap, allocateDirect, i);
        allocateDirect.rewind();
        return allocateDirect;
    }

    public Tuple3<WorkerAddress, Object, Rkeys> readHandshakeHeader(InputStream inputStream) {
        int i = 1048576;
        int i2 = readBytesFromStream(false, inputStream, 4).getInt();
        Predef$.MODULE$.require(i2 <= 1048576, () -> {
            return new StringBuilder(53).append("Received an abnormally large (>").append(i).append(" Bytes) WorkerAddress ").append(new StringBuilder(19).append("(").append(i2).append(" Bytes), dropping.").toString()).toString();
        });
        ByteBuffer readBytesFromStream = readBytesFromStream(true, inputStream, i2);
        int i3 = readBytesFromStream(false, inputStream, 4).getInt();
        int i4 = readBytesFromStream(false, inputStream, 4).getInt();
        ArrayBuffer arrayBuffer = new ArrayBuffer(i4);
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), i4).foreach$mVc$sp(i5 -> {
            arrayBuffer.append(Predef$.MODULE$.wrapRefArray(new ByteBuffer[]{MODULE$.readBytesFromStream(true, inputStream, MODULE$.readBytesFromStream(false, inputStream, 4).getInt())}));
        });
        return new Tuple3<>(new WorkerAddress(readBytesFromStream), BoxesRunTime.boxToInteger(i3), new Rkeys(arrayBuffer));
    }

    public void writeHandshakeHeader(OutputStream outputStream, ByteBuffer byteBuffer, int i, Seq<ByteBuffer> seq) {
        ByteBuffer allocate = ByteBuffer.allocate(4 + byteBuffer.remaining() + 4 + 4 + (4 * seq.size()) + BoxesRunTime.unboxToInt(((TraversableOnce) seq.map(byteBuffer2 -> {
            return BoxesRunTime.boxToInteger(byteBuffer2.capacity());
        }, Seq$.MODULE$.canBuildFrom())).sum(Numeric$IntIsIntegral$.MODULE$)));
        allocate.putInt(byteBuffer.capacity());
        allocate.put(byteBuffer);
        allocate.putInt(i);
        allocate.putInt(seq.size());
        seq.foreach(byteBuffer3 -> {
            allocate.putInt(byteBuffer3.capacity());
            return allocate.put(byteBuffer3);
        });
        allocate.flip();
        outputStream.write(allocate.array());
        outputStream.flush();
    }

    private UCXConnection$() {
        MODULE$ = this;
        Logging.$init$(this);
    }
}
