package io.ray.streaming.runtime.core.collector;

import io.ray.api.BaseActorHandle;
import io.ray.api.PyActorHandle;
import io.ray.streaming.api.Language;
import io.ray.streaming.api.collector.Collector;
import io.ray.streaming.api.partition.Partition;
import io.ray.streaming.message.Record;
import io.ray.streaming.runtime.serialization.CrossLangSerializer;
import io.ray.streaming.runtime.serialization.JavaSerializer;
import io.ray.streaming.runtime.serialization.Serializer;
import io.ray.streaming.runtime.transfer.ChannelId;
import io.ray.streaming.runtime.transfer.DataWriter;
import java.nio.ByteBuffer;
import java.util.Collection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/ray/streaming/runtime/core/collector/OutputCollector.class */
public class OutputCollector implements Collector<Record> {
    private static final Logger LOGGER = LoggerFactory.getLogger(OutputCollector.class);
    private final DataWriter writer;
    private final ChannelId[] outputQueues;
    private final Collection<BaseActorHandle> targetActors;
    private final Language[] targetLanguages;
    private final Partition partition;
    private final Serializer javaSerializer = new JavaSerializer();
    private final Serializer crossLangSerializer = new CrossLangSerializer();

    public OutputCollector(DataWriter dataWriter, Collection<String> collection, Collection<BaseActorHandle> collection2, Partition partition) {
        this.writer = dataWriter;
        this.outputQueues = (ChannelId[]) collection.stream().map(ChannelId::from).toArray(i -> {
            return new ChannelId[i];
        });
        this.targetActors = collection2;
        this.targetLanguages = (Language[]) collection2.stream().map(baseActorHandle -> {
            return baseActorHandle instanceof PyActorHandle ? Language.PYTHON : Language.JAVA;
        }).toArray(i2 -> {
            return new Language[i2];
        });
        this.partition = partition;
        LOGGER.debug("OutputCollector constructed, outputChannelIds:{}, partition:{}.", collection, this.partition);
    }

    public void collect(Record record) {
        ByteBuffer byteBuffer = null;
        ByteBuffer byteBuffer2 = null;
        for (int i : this.partition.partition(record, this.outputQueues.length)) {
            if (this.targetLanguages[i] == Language.JAVA) {
                if (byteBuffer == null) {
                    byte[] serialize = this.javaSerializer.serialize(record);
                    byteBuffer = ByteBuffer.allocate(1 + serialize.length);
                    byteBuffer.put((byte) 1);
                    byteBuffer.put(serialize);
                    byteBuffer.flip();
                }
                this.writer.write(this.outputQueues[i], byteBuffer.duplicate());
            } else {
                if (byteBuffer2 == null) {
                    byte[] serialize2 = this.crossLangSerializer.serialize(record);
                    byteBuffer2 = ByteBuffer.allocate(1 + serialize2.length);
                    byteBuffer2.put((byte) 0);
                    byteBuffer2.put(serialize2);
                    byteBuffer2.flip();
                }
                this.writer.write(this.outputQueues[i], byteBuffer2.duplicate());
            }
        }
    }
}
