package cascading.flow.tez.stream.element;

import cascading.CascadingException;
import cascading.flow.FlowProcess;
import cascading.flow.SliceCounters;
import cascading.flow.planner.Scope;
import cascading.flow.stream.duct.Duct;
import cascading.flow.stream.duct.DuctException;
import cascading.flow.stream.element.BoundaryStage;
import cascading.flow.stream.element.InputSource;
import cascading.flow.stream.graph.IORole;
import cascading.flow.stream.graph.StreamGraph;
import cascading.pipe.Boundary;
import cascading.pipe.Pipe;
import cascading.tap.hadoop.util.MeasuredOutputCollector;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tuple.io.KeyTuple;
import cascading.tuple.io.ValueTuple;
import cascading.tuple.util.Resettable1;
import cascading.util.Util;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.library.api.KeyValueReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cascading/flow/tez/stream/element/TezBoundaryStage.class */
public class TezBoundaryStage extends BoundaryStage<TupleEntry, TupleEntry> implements InputSource {
    private static final Logger LOG = LoggerFactory.getLogger(TezBoundaryStage.class);
    protected Collection<LogicalOutput> logicalOutputs;
    protected LogicalInput logicalInput;
    private MeasuredOutputCollector collector;
    private TupleEntry valueEntry;
    private final Resettable1<Tuple> keyTuple;

    public TezBoundaryStage(FlowProcess flowProcess, Boundary boundary, IORole iORole, Collection<LogicalOutput> collection) {
        super(flowProcess, boundary, iORole);
        this.keyTuple = new KeyTuple();
        if (collection == null || collection.isEmpty()) {
            throw new IllegalArgumentException("output must not be null or empty");
        }
        this.logicalOutputs = collection;
    }

    public TezBoundaryStage(FlowProcess flowProcess, Boundary boundary, IORole iORole, LogicalInput logicalInput) {
        super(flowProcess, boundary, iORole);
        this.keyTuple = new KeyTuple();
        if (logicalInput == null) {
            throw new IllegalArgumentException("inputs must not be null or empty");
        }
        this.logicalInput = logicalInput;
    }

    public void initialize() {
        super.initialize();
        this.valueEntry = new TupleEntry(((Scope) Util.getFirst(this.outgoingScopes)).getIncomingFunctionPassThroughFields(), true);
    }

    public void bind(StreamGraph streamGraph) {
        if (this.role != IORole.sink) {
            this.next = getNextFor(streamGraph);
        }
    }

    public void prepare() {
        try {
            if (this.logicalInput != null) {
                LOG.info("calling {}#start() on: {} {}", new Object[]{this.logicalInput.getClass().getSimpleName(), getBoundary(), Pipe.id(getBoundary())});
                this.logicalInput.start();
            }
            if (this.logicalOutputs != null) {
                for (LogicalOutput logicalOutput : this.logicalOutputs) {
                    LOG.info("calling {}#start() on: {} {}", new Object[]{logicalOutput.getClass().getSimpleName(), getBoundary(), Pipe.id(getBoundary())});
                    logicalOutput.start();
                }
            }
            if (this.role != IORole.source) {
                this.collector = new MeasuredOutputCollector(this.flowProcess, SliceCounters.Write_Duration, createOutputCollector());
            }
            super.prepare();
        } catch (Exception e) {
            throw new CascadingException("unable to start input/output", e);
        }
    }

    public void start(Duct duct) {
        if (this.next != null) {
            super.start(duct);
        }
    }

    public void receive(Duct duct, int i, TupleEntry tupleEntry) {
        try {
            this.keyTuple.reset(tupleEntry.getTuple());
            this.collector.collect(this.keyTuple, ValueTuple.NULL);
            this.flowProcess.increment(SliceCounters.Tuples_Written, 1L);
        } catch (CascadingException e) {
            handleException(e, tupleEntry);
        } catch (OutOfMemoryError e2) {
            handleReThrowableException("out of memory, try increasing task memory allocation", e2);
        } catch (Throwable th) {
            handleException(new DuctException("internal error: " + tupleEntry.getTuple().print(), th), tupleEntry);
        }
    }

    public void complete(Duct duct) {
        if (this.next != null) {
            super.complete(duct);
        }
    }

    public void run(Object obj) throws Throwable {
        Throwable map = map();
        if (map != null) {
            throw map;
        }
    }

    protected Throwable map() throws Exception {
        try {
            start(this);
            KeyValueReader reader = this.logicalInput.getReader();
            while (reader.next()) {
                this.valueEntry.setTuple((Tuple) reader.getCurrentKey());
                this.next.receive(this, 0, this.valueEntry);
            }
            complete(this);
            return null;
        } catch (Throwable th) {
            if (!(th instanceof OutOfMemoryError)) {
                LOG.error("caught throwable", th);
            }
            return th;
        }
    }

    protected OutputCollector createOutputCollector() {
        if (this.logicalOutputs.size() == 1) {
            return new OldOutputCollector((LogicalOutput) Util.getFirst(this.logicalOutputs));
        }
        final OutputCollector[] outputCollectorArr = new OutputCollector[this.logicalOutputs.size()];
        int i = 0;
        Iterator<LogicalOutput> it = this.logicalOutputs.iterator();
        while (it.hasNext()) {
            int i2 = i;
            i++;
            outputCollectorArr[i2] = new OldOutputCollector(it.next());
        }
        return new OutputCollector() { // from class: cascading.flow.tez.stream.element.TezBoundaryStage.1
            public void collect(Object obj, Object obj2) throws IOException {
                for (OutputCollector outputCollector : outputCollectorArr) {
                    outputCollector.collect(obj, obj2);
                }
            }
        };
    }
}
