package io.ray.streaming.runtime.transfer;

import com.google.common.base.Preconditions;
import io.ray.api.BaseActorHandle;
import io.ray.streaming.runtime.config.StreamingWorkerConfig;
import io.ray.streaming.runtime.config.types.TransferChannelType;
import io.ray.streaming.runtime.transfer.channel.ChannelId;
import io.ray.streaming.runtime.transfer.channel.ChannelRecoverInfo;
import io.ray.streaming.runtime.transfer.channel.ChannelUtils;
import io.ray.streaming.runtime.transfer.channel.OffsetInfo;
import io.ray.streaming.runtime.transfer.message.BarrierMessage;
import io.ray.streaming.runtime.transfer.message.ChannelMessage;
import io.ray.streaming.runtime.transfer.message.DataMessage;
import io.ray.streaming.runtime.util.Platform;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/ray/streaming/runtime/transfer/DataReader.class */
public class DataReader {
    private static final Logger LOG = LoggerFactory.getLogger(DataReader.class);
    private long nativeReaderPtr;
    private final ByteBuffer getBundleParams = ByteBuffer.allocateDirect(24);
    private final ByteBuffer bundleData = Platform.wrapDirectBuffer(0, 0);
    private final ByteBuffer bundleMeta = ByteBuffer.allocateDirect(52);
    private final Map<String, ChannelRecoverInfo.ChannelCreationStatus> queueCreationStatusMap = new HashMap();
    private Queue<ChannelMessage> buf = new LinkedList();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/ray/streaming/runtime/transfer/DataReader$BarrierItem.class */
    public class BarrierItem {
        BarrierOffsetInfo barrierOffsetInfo;
        private long msgId;
        private BarrierType barrierType;
        private long globalBarrierId;
        private ByteBuffer data;

        public BarrierItem(DataMessage dataMessage, BarrierOffsetInfo barrierOffsetInfo) {
            this.barrierOffsetInfo = barrierOffsetInfo;
            this.msgId = dataMessage.getMsgId();
            ByteBuffer body = dataMessage.body();
            body.order(ByteOrder.nativeOrder());
            body.getInt();
            this.globalBarrierId = body.getLong();
            this.data = body.slice();
            this.data.order(ByteOrder.nativeOrder());
            body.position(body.limit());
            this.barrierType = BarrierType.GLOBAL_BARRIER;
        }

        public long getBarrierMsgId() {
            return this.msgId;
        }

        public BarrierType getBarrierType() {
            return this.barrierType;
        }

        public long getGlobalBarrierId() {
            return this.globalBarrierId;
        }

        public ByteBuffer getData() {
            return this.data;
        }

        public BarrierOffsetInfo getBarrierOffsetInfo() {
            return this.barrierOffsetInfo;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/ray/streaming/runtime/transfer/DataReader$BarrierOffsetInfo.class */
    public class BarrierOffsetInfo {
        private int queueSize;
        private Map<String, OffsetInfo> queueOffsetInfo;

        public BarrierOffsetInfo(ByteBuffer byteBuffer) {
            this.queueSize = byteBuffer.getInt();
            this.queueOffsetInfo = new HashMap(this.queueSize);
            for (int i = 0; i < this.queueSize; i++) {
                this.queueOffsetInfo.put(DataReader.this.getQueueIdString(byteBuffer), new OffsetInfo(byteBuffer.getLong()));
            }
        }

        public int getQueueSize() {
            return this.queueSize;
        }

        public Map<String, OffsetInfo> getQueueOffsetInfo() {
            return this.queueOffsetInfo;
        }
    }

    /* loaded from: input_file:io/ray/streaming/runtime/transfer/DataReader$BarrierType.class */
    public enum BarrierType {
        GLOBAL_BARRIER(0);

        private int code;

        BarrierType(int i) {
            this.code = i;
        }

        /* renamed from: values, reason: to resolve conflict with enum method */
        public static BarrierType[] valuesCustom() {
            BarrierType[] valuesCustom = values();
            int length = valuesCustom.length;
            BarrierType[] barrierTypeArr = new BarrierType[length];
            System.arraycopy(valuesCustom, 0, barrierTypeArr, 0, length);
            return barrierTypeArr;
        }
    }

    /* loaded from: input_file:io/ray/streaming/runtime/transfer/DataReader$BundleMeta.class */
    class BundleMeta {
        static final int LENGTH = 52;
        private int magicNum;
        private long bundleTs;
        private long lastMessageId;
        private int messageListSize;
        private DataBundleType bundleType;
        private String channelID;
        private int rawBundleSize;

        BundleMeta(ByteBuffer byteBuffer) {
            this.magicNum = byteBuffer.getInt();
            this.bundleTs = byteBuffer.getLong();
            this.lastMessageId = byteBuffer.getLong();
            this.messageListSize = byteBuffer.getInt();
            int i = byteBuffer.getInt();
            if (DataBundleType.BUNDLE.code == i) {
                this.bundleType = DataBundleType.BUNDLE;
            } else if (DataBundleType.BARRIER.code == i) {
                this.bundleType = DataBundleType.BARRIER;
            } else {
                this.bundleType = DataBundleType.EMPTY;
            }
            this.rawBundleSize = byteBuffer.getInt();
            this.channelID = DataReader.this.getQueueIdString(byteBuffer);
        }

        public int getMagicNum() {
            return this.magicNum;
        }

        public long getBundleTs() {
            return this.bundleTs;
        }

        public long getLastMessageId() {
            return this.lastMessageId;
        }

        public int getMessageListSize() {
            return this.messageListSize;
        }

        public DataBundleType getBundleType() {
            return this.bundleType;
        }

        public String getChannelID() {
            return this.channelID;
        }

        public int getRawBundleSize() {
            return this.rawBundleSize;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/ray/streaming/runtime/transfer/DataReader$DataBundleType.class */
    public enum DataBundleType {
        EMPTY(1),
        BARRIER(2),
        BUNDLE(3);

        int code;

        DataBundleType(int i) {
            this.code = i;
        }

        /* renamed from: values, reason: to resolve conflict with enum method */
        public static DataBundleType[] valuesCustom() {
            DataBundleType[] valuesCustom = values();
            int length = valuesCustom.length;
            DataBundleType[] dataBundleTypeArr = new DataBundleType[length];
            System.arraycopy(valuesCustom, 0, dataBundleTypeArr, 0, length);
            return dataBundleTypeArr;
        }
    }

    public DataReader(List<String> list, List<BaseActorHandle> list2, Map<String, OffsetInfo> map, StreamingWorkerConfig streamingWorkerConfig) {
        this.getBundleParams.order(ByteOrder.nativeOrder());
        this.bundleData.order(ByteOrder.nativeOrder());
        this.bundleMeta.order(ByteOrder.nativeOrder());
        Preconditions.checkArgument(list.size() > 0);
        Preconditions.checkArgument(list.size() == list2.size());
        ChannelCreationParametersBuilder buildInputQueueParameters = new ChannelCreationParametersBuilder().buildInputQueueParameters(list, list2);
        byte[][] bArr = (byte[][]) list.stream().map(ChannelId::idStrToBytes).toArray(i -> {
            return new byte[i];
        });
        long[] jArr = new long[list.size()];
        for (int i2 = 0; i2 < list.size(); i2++) {
            if (map.containsKey(list.get(i2))) {
                jArr[i2] = map.get(list.get(i2)).getStreamingMsgId();
            } else {
                jArr[i2] = 0;
            }
        }
        long readerTimerIntervalMs = streamingWorkerConfig.transferConfig.readerTimerIntervalMs();
        boolean z = TransferChannelType.MEMORY_CHANNEL == streamingWorkerConfig.transferConfig.channelType();
        ArrayList arrayList = new ArrayList();
        this.nativeReaderPtr = createDataReaderNative(buildInputQueueParameters, bArr, jArr, readerTimerIntervalMs, arrayList, ChannelUtils.toNativeConf(streamingWorkerConfig), z);
        for (int i3 = 0; i3 < list.size(); i3++) {
            this.queueCreationStatusMap.put(list.get(i3), ChannelRecoverInfo.ChannelCreationStatus.fromInt(((Integer) arrayList.get(i3)).intValue()));
        }
        LOG.info("Create DataReader succeed for worker: {}, creation status={}.", streamingWorkerConfig.workerInternalConfig.workerName(), this.queueCreationStatusMap);
    }

    private static native long createDataReaderNative(ChannelCreationParametersBuilder channelCreationParametersBuilder, byte[][] bArr, long[] jArr, long j, List<Integer> list, byte[] bArr2, boolean z);

    public ChannelMessage read(long j) {
        if (this.buf.isEmpty()) {
            getBundle(j);
            if (this.bundleData.position() < this.bundleData.limit()) {
                BundleMeta bundleMeta = new BundleMeta(this.bundleMeta);
                String channelID = bundleMeta.getChannelID();
                long bundleTs = bundleMeta.getBundleTs();
                if (bundleMeta.getBundleType() == DataBundleType.BARRIER) {
                    this.buf.offer(getBarrier(this.bundleData, channelID, bundleTs));
                } else if (bundleMeta.getBundleType() == DataBundleType.BUNDLE) {
                    for (int i = 0; i < bundleMeta.getMessageListSize(); i++) {
                        this.buf.offer(getDataMessage(this.bundleData, channelID, bundleTs));
                    }
                }
            }
        }
        if (this.buf.isEmpty()) {
            return null;
        }
        return this.buf.poll();
    }

    public ChannelRecoverInfo getQueueRecoverInfo() {
        return new ChannelRecoverInfo(this.queueCreationStatusMap);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getQueueIdString(ByteBuffer byteBuffer) {
        byte[] bArr = new byte[20];
        byteBuffer.get(bArr);
        return ChannelId.idBytesToStr(bArr);
    }

    private BarrierMessage getBarrier(ByteBuffer byteBuffer, String str, long j) {
        ByteBuffer wrap = ByteBuffer.wrap(getOffsetsInfoNative(this.nativeReaderPtr));
        wrap.order(ByteOrder.nativeOrder());
        BarrierOffsetInfo barrierOffsetInfo = new BarrierOffsetInfo(wrap);
        DataMessage dataMessage = getDataMessage(byteBuffer, str, j);
        BarrierItem barrierItem = new BarrierItem(dataMessage, barrierOffsetInfo);
        return new BarrierMessage(dataMessage.getMsgId(), dataMessage.getTimestamp(), dataMessage.getChannelId(), barrierItem.getData(), barrierItem.getGlobalBarrierId(), barrierItem.getBarrierOffsetInfo().getQueueOffsetInfo());
    }

    private DataMessage getDataMessage(ByteBuffer byteBuffer, String str, long j) {
        int i = byteBuffer.getInt();
        long j2 = byteBuffer.getLong();
        byteBuffer.getInt();
        int position = byteBuffer.position();
        int limit = byteBuffer.limit();
        byteBuffer.limit(position + i);
        ByteBuffer slice = byteBuffer.slice();
        byteBuffer.limit(limit);
        byteBuffer.position(position + i);
        return new DataMessage(slice, j, j2, str);
    }

    private void getBundle(long j) {
        getBundleNative(this.nativeReaderPtr, j, Platform.getAddress(this.getBundleParams), Platform.getAddress(this.bundleMeta));
        this.bundleMeta.rewind();
        Platform.wrapDirectBuffer(this.bundleData, this.getBundleParams.getLong(0), this.getBundleParams.getInt(8));
    }

    public void stop() {
        stopReaderNative(this.nativeReaderPtr);
    }

    public void close() {
        if (this.nativeReaderPtr == 0) {
            return;
        }
        LOG.info("Closing DataReader.");
        closeReaderNative(this.nativeReaderPtr);
        this.nativeReaderPtr = 0L;
        LOG.info("Finish closing DataReader.");
    }

    private native void getBundleNative(long j, long j2, long j3, long j4);

    private native byte[] getOffsetsInfoNative(long j);

    private native void stopReaderNative(long j);

    private native void closeReaderNative(long j);
}
