/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.task.reduce;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BoundedByteArrayOutputStream;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MROutputFiles;
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.task.reduce.ExceptionReporter;
import org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput;
import org.apache.hadoop.mapreduce.task.reduce.MapOutput;
import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl;
import org.apache.hadoop.mapreduce.task.reduce.MergeThread;
import org.apache.hadoop.test.Whitebox;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class TestMergeManager {
    @Test(timeout=10000L)
    public void testMemoryMerge() throws Exception {
        int TOTAL_MEM_BYTES = 10000;
        int OUTPUT_SIZE = 7950;
        JobConf conf = new JobConf();
        conf.setFloat("mapreduce.reduce.shuffle.input.buffer.percent", 1.0f);
        conf.setLong("mapreduce.reduce.memory.totalbytes", 10000L);
        conf.setFloat("mapreduce.reduce.shuffle.memory.limit.percent", 0.8f);
        conf.setFloat("mapreduce.reduce.shuffle.merge.percent", 0.9f);
        TestExceptionReporter reporter = new TestExceptionReporter();
        CyclicBarrier mergeStart = new CyclicBarrier(2);
        CyclicBarrier mergeComplete = new CyclicBarrier(2);
        StubbedMergeManager mgr = new StubbedMergeManager(conf, reporter, mergeStart, mergeComplete);
        MapOutput out1 = mgr.reserve(null, 7950L, 0);
        Assert.assertTrue((String)"Should be a memory merge", (boolean)(out1 instanceof InMemoryMapOutput));
        InMemoryMapOutput mout1 = (InMemoryMapOutput)out1;
        this.fillOutput((InMemoryMapOutput<Text, Text>)mout1);
        MapOutput out2 = mgr.reserve(null, 7950L, 0);
        Assert.assertTrue((String)"Should be a memory merge", (boolean)(out2 instanceof InMemoryMapOutput));
        InMemoryMapOutput mout2 = (InMemoryMapOutput)out2;
        this.fillOutput((InMemoryMapOutput<Text, Text>)mout2);
        MapOutput out3 = mgr.reserve(null, 7950L, 0);
        ((ObjectAssert)Assertions.assertThat((Object)out3).withFailMessage("Should be told to wait", new Object[0])).isNull();
        mout1.commit();
        mout2.commit();
        mergeStart.await();
        Assert.assertEquals((long)1L, (long)mgr.getNumMerges());
        out1 = mgr.reserve(null, 7950L, 0);
        Assert.assertTrue((String)"Should be a memory merge", (boolean)(out1 instanceof InMemoryMapOutput));
        mout1 = (InMemoryMapOutput)out1;
        this.fillOutput((InMemoryMapOutput<Text, Text>)mout1);
        out2 = mgr.reserve(null, 7950L, 0);
        Assert.assertTrue((String)"Should be a memory merge", (boolean)(out2 instanceof InMemoryMapOutput));
        mout2 = (InMemoryMapOutput)out2;
        this.fillOutput((InMemoryMapOutput<Text, Text>)mout2);
        out3 = mgr.reserve(null, 7950L, 0);
        ((ObjectAssert)Assertions.assertThat((Object)out3).withFailMessage("Should be told to wait", new Object[0])).isNull();
        mout1.commit();
        mout2.commit();
        mergeComplete.await();
        mergeStart.await();
        Assert.assertEquals((long)2L, (long)mgr.getNumMerges());
        mergeComplete.await();
        Assert.assertEquals((long)2L, (long)mgr.getNumMerges());
        Assert.assertEquals((String)"exception reporter invoked", (long)0L, (long)reporter.getNumExceptions());
    }

    private void fillOutput(InMemoryMapOutput<Text, Text> output) throws IOException {
        BoundedByteArrayOutputStream stream = output.getArrayStream();
        int count = stream.getLimit();
        for (int i = 0; i < count; ++i) {
            stream.write(i);
        }
    }

    @Test
    public void testIoSortDefaults() {
        JobConf jobConf = new JobConf();
        Assert.assertEquals((long)10L, (long)jobConf.getInt("mapreduce.task.io.sort.factor", 100));
        Assert.assertEquals((long)100L, (long)jobConf.getInt("mapreduce.task.io.sort.mb", 10));
    }

    @Test(timeout=10000L)
    public void testOnDiskMerger() throws IOException, URISyntaxException, InterruptedException {
        JobConf jobConf = new JobConf();
        int SORT_FACTOR = 5;
        jobConf.setInt("mapreduce.task.io.sort.factor", 5);
        MROutputFiles mapOutputFile = new MROutputFiles();
        LocalFileSystem fs = FileSystem.getLocal((Configuration)jobConf);
        MergeManagerImpl manager = new MergeManagerImpl(null, jobConf, (FileSystem)fs, null, null, null, null, null, null, null, null, null, null, (MapOutputFile)mapOutputFile);
        MergeThread onDiskMerger = (MergeThread)Whitebox.getInternalState((Object)manager, (String)"onDiskMerger");
        int mergeFactor = (Integer)Whitebox.getInternalState((Object)onDiskMerger, (String)"mergeFactor");
        Assert.assertEquals((long)mergeFactor, (long)5L);
        onDiskMerger.suspend();
        Random rand = new Random();
        for (int i = 0; i < 10; ++i) {
            Path path = new Path("somePath");
            MergeManagerImpl.CompressAwarePath cap = new MergeManagerImpl.CompressAwarePath(path, 1L, (long)rand.nextInt());
            manager.closeOnDiskFile(cap);
        }
        LinkedList pendingToBeMerged = (LinkedList)Whitebox.getInternalState((Object)onDiskMerger, (String)"pendingToBeMerged");
        Assert.assertTrue((String)"No inputs were added to list pending to merge", (pendingToBeMerged.size() > 0 ? 1 : 0) != 0);
        for (int i = 0; i < pendingToBeMerged.size(); ++i) {
            List inputs = (List)pendingToBeMerged.get(i);
            for (int j = 1; j < inputs.size(); ++j) {
                Assert.assertTrue((String)"Not enough / too many inputs were going to be merged", (inputs.size() > 0 && inputs.size() <= 5 ? 1 : 0) != 0);
                Assert.assertTrue((String)"Inputs to be merged were not sorted according to size: ", (((MergeManagerImpl.CompressAwarePath)inputs.get(j)).getCompressedSize() >= ((MergeManagerImpl.CompressAwarePath)inputs.get(j - 1)).getCompressedSize() ? 1 : 0) != 0);
            }
        }
    }

    @Test
    public void testLargeMemoryLimits() throws Exception {
        JobConf conf = new JobConf();
        conf.setLong("mapreduce.reduce.memory.totalbytes", 0x200000000L);
        conf.setFloat("mapreduce.reduce.shuffle.input.buffer.percent", 1.0f);
        conf.setFloat("mapreduce.reduce.shuffle.memory.limit.percent", 0.95f);
        conf.setFloat("mapreduce.reduce.shuffle.merge.percent", 1.0f);
        conf.setFloat("mapreduce.reduce.input.buffer.percent", 1.0f);
        MergeManagerImpl mgr = new MergeManagerImpl(null, conf, (FileSystem)Mockito.mock(LocalFileSystem.class), null, null, null, null, null, null, null, null, null, null, (MapOutputFile)new MROutputFiles());
        Assert.assertTrue((String)("Large shuffle area unusable: " + mgr.memoryLimit), (mgr.memoryLimit > Integer.MAX_VALUE ? 1 : 0) != 0);
        long maxInMemReduce = mgr.getMaxInMemReduceLimit();
        Assert.assertTrue((String)("Large in-memory reduce area unusable: " + maxInMemReduce), (maxInMemReduce > Integer.MAX_VALUE ? 1 : 0) != 0);
        Assert.assertEquals((String)"maxSingleShuffleLimit to be capped at Integer.MAX_VALUE", (long)Integer.MAX_VALUE, (long)mgr.maxSingleShuffleLimit);
        this.verifyReservedMapOutputType((MergeManagerImpl<Text, Text>)mgr, 10L, "MEMORY");
        this.verifyReservedMapOutputType((MergeManagerImpl<Text, Text>)mgr, 0x80000000L, "DISK");
    }

    private void verifyReservedMapOutputType(MergeManagerImpl<Text, Text> mgr, long size, String expectedShuffleMode) throws IOException {
        TaskAttemptID mapId = TaskAttemptID.forName((String)"attempt_0_1_m_1_1");
        MapOutput mapOutput = mgr.reserve(mapId, size, 1);
        Assert.assertEquals((String)("Shuffled bytes: " + size), (Object)expectedShuffleMode, (Object)mapOutput.getDescription());
        mgr.unreserve(size);
    }

    @Test
    public void testZeroShuffleMemoryLimitPercent() throws Exception {
        JobConf jobConf = new JobConf();
        jobConf.setFloat("mapreduce.reduce.shuffle.memory.limit.percent", 0.0f);
        MergeManagerImpl mgr = new MergeManagerImpl(null, jobConf, (FileSystem)Mockito.mock(LocalFileSystem.class), null, null, null, null, null, null, null, null, null, null, (MapOutputFile)new MROutputFiles());
        this.verifyReservedMapOutputType((MergeManagerImpl<Text, Text>)mgr, 10L, "DISK");
    }

    private static class TestExceptionReporter
    implements ExceptionReporter {
        private List<Throwable> exceptions = new ArrayList<Throwable>();

        private TestExceptionReporter() {
        }

        public void reportException(Throwable t) {
            this.exceptions.add(t);
            t.printStackTrace();
        }

        public int getNumExceptions() {
            return this.exceptions.size();
        }
    }

    private static class TestMergeThread
    extends MergeThread<InMemoryMapOutput<Text, Text>, Text, Text> {
        private AtomicInteger numMerges = new AtomicInteger(0);
        private CyclicBarrier mergeStart;
        private CyclicBarrier mergeComplete;

        public TestMergeThread(MergeManagerImpl<Text, Text> mergeManager, ExceptionReporter reporter) {
            super(mergeManager, Integer.MAX_VALUE, reporter);
        }

        public synchronized void setSyncBarriers(CyclicBarrier mergeStart, CyclicBarrier mergeComplete) {
            this.mergeStart = mergeStart;
            this.mergeComplete = mergeComplete;
        }

        public int getNumMerges() {
            return this.numMerges.get();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void merge(List<InMemoryMapOutput<Text, Text>> inputs) throws IOException {
            TestMergeThread testMergeThread = this;
            synchronized (testMergeThread) {
                this.numMerges.incrementAndGet();
                for (InMemoryMapOutput<Text, Text> input : inputs) {
                    this.manager.unreserve(input.getSize());
                }
            }
            try {
                this.mergeStart.await();
                this.mergeComplete.await();
            }
            catch (InterruptedException interruptedException) {
            }
            catch (BrokenBarrierException brokenBarrierException) {
                // empty catch block
            }
        }
    }

    private static class StubbedMergeManager
    extends MergeManagerImpl<Text, Text> {
        private TestMergeThread mergeThread;

        public StubbedMergeManager(JobConf conf, ExceptionReporter reporter, CyclicBarrier mergeStart, CyclicBarrier mergeComplete) {
            super(null, conf, (FileSystem)Mockito.mock(LocalFileSystem.class), null, null, null, null, null, null, null, null, reporter, null, (MapOutputFile)Mockito.mock(MapOutputFile.class));
            this.mergeThread.setSyncBarriers(mergeStart, mergeComplete);
        }

        protected MergeThread<InMemoryMapOutput<Text, Text>, Text, Text> createInMemoryMerger() {
            this.mergeThread = new TestMergeThread(this, this.getExceptionReporter());
            return this.mergeThread;
        }

        public int getNumMerges() {
            return this.mergeThread.getNumMerges();
        }
    }
}

