package org.apache.tez.mapreduce.processor.reduce;

import com.google.common.collect.HashMultimap;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.mapreduce.TestUmbilical;
import org.apache.tez.mapreduce.TezTestUtils;
import org.apache.tez.mapreduce.hadoop.IDConverter;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.output.MROutputLegacy;
import org.apache.tez.mapreduce.partition.MRPartitioner;
import org.apache.tez.mapreduce.processor.MapUtils;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos;
import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
import org.apache.tez.runtime.api.ObjectRegistry;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.impl.EventType;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
import org.apache.tez.runtime.library.input.OrderedGroupedInputLegacy;
import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.class */
public class TestReduceProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(TestReduceProcessor.class);
    private static JobConf defaultConf = new JobConf();
    private static FileSystem localFs;
    private static Path workDir;

    public void setUpJobConf(JobConf jobConf) {
        jobConf.set("tez.runtime.framework.local.dirs", workDir.toString());
        jobConf.set("mapreduce.cluster.local.dir", workDir.toString());
        jobConf.setClass("tez.runtime.task.local.output.manager", TezTaskOutputFiles.class, TezTaskOutput.class);
        jobConf.set("tez.runtime.partitioner.class", MRPartitioner.class.getName());
        jobConf.setNumReduceTasks(1);
    }

    @Before
    @After
    public void cleanup() throws Exception {
        localFs.delete(workDir, true);
    }

    @Test(timeout = 5000)
    public void testReduceProcessor() throws Exception {
        String initialMapVertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
        String finalReduceVertexName = MultiStageMRConfigUtil.getFinalReduceVertexName();
        JobConf jobConf = new JobConf(defaultConf);
        setUpJobConf(jobConf);
        MRHelpers.translateMRConfToTez(jobConf);
        jobConf.setInt("mapreduce.job.application.attempt.id", 0);
        jobConf.set("tez.mr.framework.task-local-resource.dir", new Path(workDir, "localized-resources").toUri().toString());
        jobConf.setBoolean("mapreduce.tez.splits.via.events", false);
        Path path = new Path(workDir, "map0");
        MapUtils.generateInputSplit(localFs, workDir, jobConf, path);
        InputSpec inputSpec = new InputSpec("NullSrcVertex", InputDescriptor.create(MRInputLegacy.class.getName()).setUserPayload(UserPayload.create(ByteBuffer.wrap(MRRuntimeProtos.MRInputUserPayloadProto.newBuilder().setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf)).build().toByteArray()))), 1);
        OutputSpec outputSpec = new OutputSpec("NullDestVertex", OutputDescriptor.create(OrderedPartitionedKVOutput.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf(jobConf)), 1);
        TestUmbilical testUmbilical = new TestUmbilical();
        LogicalIOProcessorRuntimeTask createLogicalTask = MapUtils.createLogicalTask(localFs, workDir, jobConf, 0, path, testUmbilical, "mrdag0", initialMapVertexName, Collections.singletonList(inputSpec), Collections.singletonList(outputSpec));
        createLogicalTask.initialize();
        createLogicalTask.run();
        createLogicalTask.close();
        Assert.assertEquals(2L, testUmbilical.getEvents().size());
        Assert.assertEquals(EventType.VERTEX_MANAGER_EVENT, testUmbilical.getEvents().get(0).getEventType());
        Assert.assertEquals(EventType.COMPOSITE_DATA_MOVEMENT_EVENT, testUmbilical.getEvents().get(1).getEventType());
        CompositeDataMovementEvent event = testUmbilical.getEvents().get(1).getEvent();
        Assert.assertEquals(1L, event.getCount());
        DataMovementEvent dataMovementEvent = (DataMovementEvent) event.getEvents().iterator().next();
        dataMovementEvent.setTargetIndex(0);
        LOG.info("Starting reduce...");
        JobTokenIdentifier jobTokenIdentifier = new JobTokenIdentifier(new Text("mrdag0"));
        Token token = new Token(jobTokenIdentifier, new JobTokenSecretManager());
        token.setService(jobTokenIdentifier.getJobId());
        jobConf.setOutputFormat(SequenceFileOutputFormat.class);
        jobConf.set("tez.mr.framework.task-local-resource.dir", new Path(workDir, "localized-resources").toUri().toString());
        jobConf.setBoolean("tez.runtime.optimize.local.fetch", true);
        FileOutputFormat.setOutputPath(jobConf, new Path(workDir, "output"));
        TaskSpec taskSpec = new TaskSpec(TezTestUtils.getMockTaskAttemptId(0, 1, 0, 0), "mrdag0", finalReduceVertexName, -1, ProcessorDescriptor.create(ReduceProcessor.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf(jobConf)), Collections.singletonList(new InputSpec(initialMapVertexName, InputDescriptor.create(OrderedGroupedInputLegacy.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf(jobConf)), 1)), Collections.singletonList(new OutputSpec("NullDestinationVertex", OutputDescriptor.create(MROutputLegacy.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf(jobConf)), 1)), (List) null);
        HashMap hashMap = new HashMap();
        hashMap.put("mapreduce_shuffle", ShuffleUtils.convertJobTokenToBytes(token));
        HashMap hashMap2 = new HashMap();
        AuxiliaryServiceHelper.setServiceDataIntoEnv("mapreduce_shuffle", ByteBuffer.allocate(4).putInt(0, 8000), hashMap2);
        LogicalIOProcessorRuntimeTask logicalIOProcessorRuntimeTask = new LogicalIOProcessorRuntimeTask(taskSpec, 0, jobConf, new String[]{workDir.toString()}, new TestUmbilical(), hashMap, hashMap2, HashMultimap.create(), (ObjectRegistry) null, "", new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory());
        LinkedList linkedList = new LinkedList();
        linkedList.add(dataMovementEvent);
        logicalIOProcessorRuntimeTask.initialize();
        ((OrderedGroupedInputLegacy) logicalIOProcessorRuntimeTask.getInputs().values().iterator().next()).handleEvents(linkedList);
        logicalIOProcessorRuntimeTask.run();
        logicalIOProcessorRuntimeTask.close();
        SequenceFile.Reader reader = new SequenceFile.Reader(localFs, new Path(new Path(new Path(workDir, "output"), "_temporary/0/" + IDConverter.toMRTaskIdForOutput(TezTestUtils.getMockTaskId(0, 1, 0))), "part-v001-o000-00000"), jobConf);
        LongWritable longWritable = new LongWritable();
        Text text = new Text();
        long j = Long.MIN_VALUE;
        while (reader.next(longWritable, text)) {
            if (j != Long.MIN_VALUE) {
                Assert.assertTrue(j < longWritable.get());
                j = longWritable.get();
            }
        }
        reader.close();
    }

    static {
        localFs = null;
        workDir = null;
        try {
            defaultConf.set("fs.defaultFS", "file:///");
            localFs = FileSystem.getLocal(defaultConf);
            workDir = new Path(new Path(System.getProperty("test.build.data", "/tmp")), "TestReduceProcessor").makeQualified(localFs);
            LOG.info("Using workDir: " + workDir);
            MapUtils.configureLocalDirs(defaultConf, workDir.toString());
        } catch (IOException e) {
            throw new RuntimeException("init failure", e);
        }
    }
}
