package org.apache.hadoop.mapred;

import java.io.IOException;
import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.IFile;
import org.apache.hadoop.mapred.Merger;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ResourceCalculatorPlugin;
import org.apache.hadoop.util.StringUtils;

/* loaded from: input_file:org/apache/hadoop/mapred/BlockMapOutputBuffer.class */
public class BlockMapOutputBuffer<K extends BytesWritable, V extends BytesWritable> implements BlockMapOutputCollector<K, V> {
    private static final Log LOG = LogFactory.getLog(BlockMapOutputBuffer.class.getName());
    private final Partitioner<K, V> partitioner;
    private final int partitions;
    private final JobConf job;
    private final Task.TaskReporter reporter;
    private final Class<K> keyClass;
    private final Class<V> valClass;
    private final int softBufferLimit;
    private CompressionCodec codec;
    private byte[] kvbuffer;
    private int kvBufferSize;
    private volatile int numBigRecordsWarnThreshold;
    private final FileSystem localFs;
    private final FileSystem rfs;
    private final Counters.Counter mapOutputByteCounter;
    private final Counters.Counter mapOutputRecordCounter;
    private MapSpillSortCounters mapSpillSortCounter;
    private MapTask task;
    private ReducePartition<K, V>[] reducePartitions;
    private Merger.Segment<K, V>[] inMemorySegments;
    private boolean hasInMemorySpill;
    private boolean lastSpillInMem;
    private int totalIndexCacheMemory;
    private static final int INDEX_CACHE_MEMORY_LIMIT = 2097152;
    private final MemoryBlockAllocator memoryBlockAllocator;
    private volatile int numSpills = 0;
    private volatile int numBigRecordsSpills = 0;
    private ArrayList<SpillRecord> indexCacheList = new ArrayList<>();

    public BlockMapOutputBuffer(TaskUmbilicalProtocol taskUmbilicalProtocol, JobConf jobConf, Task.TaskReporter taskReporter, MapTask mapTask) throws IOException, ClassNotFoundException {
        this.codec = null;
        this.numBigRecordsWarnThreshold = FSNamesystem.DEFAULT_MAX_CORRUPT_FILEBLOCKS_RETURNED;
        this.task = mapTask;
        this.job = jobConf;
        this.reporter = taskReporter;
        this.localFs = FileSystem.getLocal(jobConf);
        this.partitions = jobConf.getNumReduceTasks();
        if (this.partitions > 0) {
            this.partitioner = (Partitioner) ReflectionUtils.newInstance(jobConf.getPartitionerClass(), jobConf);
        } else {
            this.partitioner = new Partitioner() { // from class: org.apache.hadoop.mapred.BlockMapOutputBuffer.1
                @Override // org.apache.hadoop.mapred.Partitioner
                public int getPartition(Object obj, Object obj2, int i) {
                    return -1;
                }

                @Override // org.apache.hadoop.mapred.JobConfigurable
                public void configure(JobConf jobConf2) {
                }
            };
        }
        this.rfs = ((LocalFileSystem) this.localFs).getRaw();
        float f = jobConf.getFloat("io.sort.spill.percent", 0.9f);
        if (f > 1.0f || f < 0.0f) {
            LOG.error("Invalid \"io.sort.spill.percent\": " + f);
            f = 0.8f;
        }
        this.lastSpillInMem = jobConf.getBoolean("mapred.map.lastspill.memory", true);
        this.numBigRecordsWarnThreshold = jobConf.getInt("mapred.map.bigrecord.spill.warn.threshold", FSNamesystem.DEFAULT_MAX_CORRUPT_FILEBLOCKS_RETURNED);
        int i = jobConf.get("mapred.job.tracker", "local").equals("local") ? jobConf.getInt("io.sort.mb.localmode", 100) : jobConf.getInt("io.sort.mb", 100);
        if ((i & 2047) != i) {
            throw new IOException("Invalid \"io.sort.mb\": " + i);
        }
        LOG.info("io.sort.mb = " + i);
        this.kvBufferSize = i << 20;
        this.kvbuffer = new byte[this.kvBufferSize];
        this.softBufferLimit = (int) (this.kvbuffer.length * f);
        this.keyClass = (Class<K>) jobConf.getMapOutputKeyClass();
        this.valClass = (Class<V>) jobConf.getMapOutputValueClass();
        if (!BytesWritable.class.isAssignableFrom(this.keyClass) || !BytesWritable.class.isAssignableFrom(this.valClass)) {
            throw new IOException(getClass().getName() + "  only support " + BytesWritable.class.getName() + " as key and value classes, MapOutputKeyClass is " + this.keyClass.getName() + ", MapOutputValueClass is " + this.valClass.getName());
        }
        this.memoryBlockAllocator = new MemoryBlockAllocator(this.kvBufferSize, this.softBufferLimit, jobConf.getNumMapTasks(), this.partitions, this);
        this.mapOutputByteCounter = taskReporter.getCounter((Enum<?>) Task.Counter.MAP_OUTPUT_BYTES);
        this.mapOutputRecordCounter = taskReporter.getCounter((Enum<?>) Task.Counter.MAP_OUTPUT_RECORDS);
        this.mapSpillSortCounter = new MapSpillSortCounters(taskReporter);
        this.reducePartitions = new ReducePartition[this.partitions];
        this.inMemorySegments = new Merger.Segment[this.partitions];
        for (int i2 = 0; i2 < this.partitions; i2++) {
            this.reducePartitions[i2] = new ReducePartition<>(i2, this.memoryBlockAllocator, this.kvbuffer, this, this.reporter);
        }
        if (jobConf.getCompressMapOutput()) {
            this.codec = (CompressionCodec) ReflectionUtils.newInstance(jobConf.getMapOutputCompressorClass(DefaultCodec.class), jobConf);
        }
    }

    private TaskAttemptID getTaskID() {
        return this.task.getTaskID();
    }

    @Override // org.apache.hadoop.mapred.MapTask.MapOutputCollector
    public void collect(K k, V v, int i) throws IOException {
        this.reporter.progress();
        if (k.getClass() != this.keyClass) {
            throw new IOException("Type mismatch in key from map: expected " + this.keyClass.getName() + ", recieved " + k.getClass().getName());
        }
        if (v.getClass() != this.valClass) {
            throw new IOException("Type mismatch in value from map: expected " + this.valClass.getName() + ", recieved " + v.getClass().getName());
        }
        int collect = this.reducePartitions[i].collect(k, v);
        this.mapOutputRecordCounter.increment(1L);
        this.mapOutputByteCounter.increment(collect);
    }

    @Override // org.apache.hadoop.mapred.OutputCollector
    public void collect(K k, V v) throws IOException {
        collect((BlockMapOutputBuffer<K, V>) k, (K) v, this.partitioner.getPartition(k, v, this.partitions));
    }

    protected ResourceCalculatorPlugin.ProcResourceValues sortReduceParts() {
        long currentTimeMillis = System.currentTimeMillis();
        ResourceCalculatorPlugin.ProcResourceValues currentProcResourceValues = this.task.getCurrentProcResourceValues();
        for (int i = 0; i < this.reducePartitions.length; i++) {
            this.reducePartitions[i].groupOrSort();
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        ResourceCalculatorPlugin.ProcResourceValues currentProcResourceValues2 = this.task.getCurrentProcResourceValues();
        this.mapSpillSortCounter.incCountersPerSort(currentProcResourceValues, currentProcResourceValues2, currentTimeMillis2 - currentTimeMillis);
        return currentProcResourceValues2;
    }

    /* JADX WARN: Finally extract failed */
    @Override // org.apache.hadoop.mapred.BlockMapOutputCollector
    public void sortAndSpill() throws IOException {
        ResourceCalculatorPlugin.ProcResourceValues sortReduceParts = sortReduceParts();
        long currentTimeMillis = System.currentTimeMillis();
        FSDataOutputStream fSDataOutputStream = null;
        long j = 0;
        try {
            SpillRecord spillRecord = new SpillRecord(this.partitions);
            fSDataOutputStream = this.rfs.create(this.task.mapOutputFile.getSpillFileForWrite(getTaskID(), this.numSpills, this.memoryBlockAllocator.getEstimatedSize()));
            for (int i = 0; i < this.partitions; i++) {
                IndexRecord spill = this.reducePartitions[i].spill(this.job, fSDataOutputStream, this.keyClass, this.valClass, this.codec, this.task.spilledRecordsCounter);
                j += spill.partLength;
                spillRecord.putIndex(spill, i);
            }
            if (this.totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
                spillRecord.writeToFile(this.task.mapOutputFile.getSpillIndexFileForWrite(getTaskID(), this.numSpills, this.partitions * 24), this.job);
            } else {
                this.indexCacheList.add(spillRecord);
                this.totalIndexCacheMemory += spillRecord.size() * 24;
            }
            LOG.info("Finished spill " + this.numSpills);
            this.numSpills++;
            if (fSDataOutputStream != null) {
                fSDataOutputStream.close();
            }
            this.mapSpillSortCounter.incCountersPerSpill(sortReduceParts, this.task.getCurrentProcResourceValues(), System.currentTimeMillis() - currentTimeMillis, j);
        } catch (Throwable th) {
            if (fSDataOutputStream != null) {
                fSDataOutputStream.close();
            }
            throw th;
        }
    }

    @Override // org.apache.hadoop.mapred.BlockMapOutputCollector
    public void spillSingleRecord(K k, V v, int i) throws IOException {
        ResourceCalculatorPlugin.ProcResourceValues currentProcResourceValues = this.task.getCurrentProcResourceValues();
        long currentTimeMillis = System.currentTimeMillis();
        FSDataOutputStream fSDataOutputStream = null;
        long j = 0;
        try {
            SpillRecord spillRecord = new SpillRecord(this.partitions);
            fSDataOutputStream = this.rfs.create(this.task.mapOutputFile.getSpillFileForWrite(getTaskID(), this.numSpills, k.getLength() + v.getLength()));
            IndexRecord indexRecord = new IndexRecord();
            for (int i2 = 0; i2 < this.partitions; i2++) {
                IFile.Writer writer = null;
                try {
                    long pos = fSDataOutputStream.getPos();
                    writer = new IFile.Writer(this.job, fSDataOutputStream, this.keyClass, this.valClass, this.codec, this.task.spilledRecordsCounter);
                    if (i2 == i) {
                        long pos2 = fSDataOutputStream.getPos();
                        writer.append((IFile.Writer) k, (K) v);
                        this.mapOutputByteCounter.increment(fSDataOutputStream.getPos() - pos2);
                    }
                    writer.close();
                    indexRecord.startOffset = pos;
                    indexRecord.rawLength = writer.getRawLength();
                    indexRecord.partLength = writer.getCompressedLength();
                    j += writer.getCompressedLength();
                    spillRecord.putIndex(indexRecord, i2);
                } catch (IOException e) {
                    if (null != writer) {
                        writer.close();
                    }
                    throw e;
                }
            }
            if (this.totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
                spillRecord.writeToFile(this.task.mapOutputFile.getSpillIndexFileForWrite(getTaskID(), this.numSpills, this.partitions * 24), this.job);
            } else {
                this.indexCacheList.add(spillRecord);
                this.totalIndexCacheMemory += spillRecord.size() * 24;
            }
            LOG.info("Finished spill big record " + this.numBigRecordsSpills);
            this.numBigRecordsSpills++;
            this.numSpills++;
            if (fSDataOutputStream != null) {
                fSDataOutputStream.close();
            }
            this.mapSpillSortCounter.incCountersPerSpill(currentProcResourceValues, this.task.getCurrentProcResourceValues(), System.currentTimeMillis() - currentTimeMillis, j);
            this.mapSpillSortCounter.incSpillSingleRecord();
        } catch (Throwable th) {
            if (fSDataOutputStream != null) {
                fSDataOutputStream.close();
            }
            throw th;
        }
    }

    @Override // org.apache.hadoop.mapred.MapTask.MapOutputCollector
    public synchronized void flush() throws IOException, ClassNotFoundException, InterruptedException {
        if (this.numSpills <= 0 || !this.lastSpillInMem) {
            sortAndSpill();
        } else {
            sortReduceParts();
            for (int i = 0; i < this.partitions; i++) {
                this.inMemorySegments[i] = new Merger.Segment<>(this.reducePartitions[i].getIReader(), true);
            }
            this.hasInMemorySpill = true;
        }
        long currentTimeMillis = System.currentTimeMillis();
        ResourceCalculatorPlugin.ProcResourceValues currentProcResourceValues = this.task.getCurrentProcResourceValues();
        mergeParts();
        this.mapSpillSortCounter.incMergeCounters(currentProcResourceValues, this.task.getCurrentProcResourceValues(), System.currentTimeMillis() - currentTimeMillis);
    }

    private void mergeParts() throws IOException, InterruptedException, ClassNotFoundException {
        long j = 0;
        Path[] pathArr = new Path[this.numSpills];
        TaskAttemptID taskID = getTaskID();
        for (int i = 0; i < this.numSpills; i++) {
            pathArr[i] = this.task.mapOutputFile.getSpillFile(taskID, i);
            j += this.rfs.getFileStatus(pathArr[i]).getLen();
        }
        for (Merger.Segment<K, V> segment : this.inMemorySegments) {
            if (segment != null) {
                j += segment.getLength();
            }
        }
        if (this.numSpills == 1 && !this.hasInMemorySpill) {
            this.rfs.rename(pathArr[0], new Path(pathArr[0].getParent(), "file.out"));
            if (this.indexCacheList.size() == 0) {
                this.rfs.rename(this.task.mapOutputFile.getSpillIndexFile(taskID, 0), new Path(pathArr[0].getParent(), "file.out.index"));
                return;
            } else {
                this.indexCacheList.get(0).writeToFile(new Path(pathArr[0].getParent(), "file.out.index"), this.job);
                return;
            }
        }
        for (int size = this.indexCacheList.size(); size < this.numSpills; size++) {
            this.indexCacheList.add(new SpillRecord(this.task.mapOutputFile.getSpillIndexFile(taskID, size), this.job));
        }
        long j2 = this.partitions * 24;
        Path outputFileForWrite = this.task.mapOutputFile.getOutputFileForWrite(taskID, j + (this.partitions * MapTask.APPROX_HEADER_LENGTH));
        Path outputIndexFileForWrite = this.task.mapOutputFile.getOutputIndexFileForWrite(taskID, j2);
        FSDataOutputStream create = this.rfs.create(outputFileForWrite, true, 4096);
        if (this.numSpills == 0) {
            IndexRecord indexRecord = new IndexRecord();
            SpillRecord spillRecord = new SpillRecord(this.partitions);
            for (int i2 = 0; i2 < this.partitions; i2++) {
                try {
                    long pos = create.getPos();
                    IFile.Writer writer = new IFile.Writer(this.job, create, this.keyClass, this.valClass, this.codec, null);
                    writer.close();
                    indexRecord.startOffset = pos;
                    indexRecord.rawLength = writer.getRawLength();
                    indexRecord.partLength = writer.getCompressedLength();
                    spillRecord.putIndex(indexRecord, i2);
                } catch (Throwable th) {
                    create.close();
                    throw th;
                }
            }
            spillRecord.writeToFile(outputIndexFileForWrite, this.job);
            create.close();
            return;
        }
        IndexRecord indexRecord2 = new IndexRecord();
        SpillRecord spillRecord2 = new SpillRecord(this.partitions);
        for (int i3 = 0; i3 < this.partitions; i3++) {
            ArrayList arrayList = new ArrayList(this.numSpills + this.inMemorySegments.length);
            for (int i4 = 0; i4 < this.numSpills; i4++) {
                IndexRecord index = this.indexCacheList.get(i4).getIndex(i3);
                arrayList.add(i4, new Merger.Segment(this.job, this.rfs, pathArr[i4], index.startOffset, index.partLength, this.codec, true));
                if (LOG.isDebugEnabled()) {
                    LOG.debug("MapId=" + taskID + " Reducer=" + i3 + "Spill =" + i4 + "(" + index.startOffset + StringUtils.COMMA_STR + index.rawLength + ", " + index.partLength + ")");
                }
            }
            if (this.inMemorySegments[i3] != null) {
                arrayList.add(this.numSpills, this.inMemorySegments[i3]);
            }
            RawKeyValueIterator merge = Merger.merge(this.job, this.rfs, this.keyClass, this.valClass, this.codec, arrayList, this.job.getInt("io.sort.factor", 100), new Path(taskID.toString()), new RawComparator<K>() { // from class: org.apache.hadoop.mapred.BlockMapOutputBuffer.2
                @Override // org.apache.hadoop.io.RawComparator
                public int compare(byte[] bArr, int i5, int i6, byte[] bArr2, int i7, int i8) {
                    return LexicographicalComparerHolder.BEST_COMPARER.compareTo(bArr, i5 + 4, i6 - 4, bArr2, i7 + 4, i8 - 4);
                }

                @Override // java.util.Comparator
                public int compare(K k, K k2) {
                    return LexicographicalComparerHolder.BEST_COMPARER.compareTo(k.getBytes(), 0, k.getLength(), k2.getBytes(), 0, k2.getLength());
                }
            }, this.reporter, (Counters.Counter) null, this.task.spilledRecordsCounter);
            long pos2 = create.getPos();
            IFile.Writer writer2 = new IFile.Writer(this.job, create, this.keyClass, this.valClass, this.codec, this.task.spilledRecordsCounter);
            Merger.writeFile(merge, writer2, this.reporter, this.job);
            writer2.close();
            indexRecord2.startOffset = pos2;
            indexRecord2.rawLength = writer2.getRawLength();
            indexRecord2.partLength = writer2.getCompressedLength();
            spillRecord2.putIndex(indexRecord2, i3);
        }
        spillRecord2.writeToFile(outputIndexFileForWrite, this.job);
        create.close();
        for (int i5 = 0; i5 < this.numSpills; i5++) {
            this.rfs.delete(pathArr[i5], true);
        }
    }

    @Override // org.apache.hadoop.mapred.MapTask.MapOutputCollector
    public void close() {
        this.mapSpillSortCounter.finalCounterUpdate();
        if (this.numBigRecordsSpills > this.numBigRecordsWarnThreshold) {
            LOG.warn("Spilled a large number of big records: " + this.numBigRecordsSpills);
        }
    }
}
