package org.apache.tez.runtime.library.shuffle.common.impl;

import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.TezInputContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.shuffle.common.DiskFetchedInput;
import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
import org.apache.tez.runtime.library.shuffle.common.ShuffleEventHandler;
import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;

/* loaded from: input_file:org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.class */
public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
    private static final Log LOG = LogFactory.getLog(ShuffleInputEventHandlerImpl.class);
    private final ShuffleManager shuffleManager;
    private final FetchedInputAllocator inputAllocator;
    private final CompressionCodec codec;
    private final boolean ifileReadAhead;
    private final int ifileReadAheadLength;

    public ShuffleInputEventHandlerImpl(TezInputContext tezInputContext, ShuffleManager shuffleManager, FetchedInputAllocator fetchedInputAllocator, CompressionCodec compressionCodec, boolean z, int i) {
        this.shuffleManager = shuffleManager;
        this.inputAllocator = fetchedInputAllocator;
        this.codec = compressionCodec;
        this.ifileReadAhead = z;
        this.ifileReadAheadLength = i;
    }

    @Override // org.apache.tez.runtime.library.shuffle.common.ShuffleEventHandler
    public void handleEvents(List<Event> list) throws IOException {
        Iterator<Event> it = list.iterator();
        while (it.hasNext()) {
            handleEvent(it.next());
        }
    }

    private void handleEvent(Event event) throws IOException {
        if (event instanceof DataMovementEvent) {
            processDataMovementEvent((DataMovementEvent) event);
        } else {
            if (!(event instanceof InputFailedEvent)) {
                throw new TezUncheckedException("Unexpected event type: " + event.getClass().getName());
            }
            processInputFailedEvent((InputFailedEvent) event);
        }
    }

    private void processDataMovementEvent(DataMovementEvent dataMovementEvent) throws IOException {
        try {
            ShuffleUserPayloads.DataMovementEventPayloadProto parseFrom = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(dataMovementEvent.getUserPayload());
            int sourceIndex = dataMovementEvent.getSourceIndex();
            LOG.info("Processing DataMovementEvent with srcIndex: " + sourceIndex + ", targetIndex: " + dataMovementEvent.getTargetIndex() + ", attemptNum: " + dataMovementEvent.getVersion() + ", payload: " + stringify(parseFrom));
            if (parseFrom.hasEmptyPartitions() && TezUtils.fromByteArray(TezUtils.decompressByteStringToByteArray(parseFrom.getEmptyPartitions())).get(sourceIndex)) {
                InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(dataMovementEvent.getTargetIndex(), dataMovementEvent.getVersion());
                LOG.info("Source partition: " + sourceIndex + " did not generate any data. SrcAttempt: [" + inputAttemptIdentifier + "]. Not fetching.");
                this.shuffleManager.addCompletedInputWithNoData(inputAttemptIdentifier);
                return;
            }
            InputAttemptIdentifier inputAttemptIdentifier2 = new InputAttemptIdentifier(dataMovementEvent.getTargetIndex(), dataMovementEvent.getVersion(), parseFrom.getPathComponent());
            if (!parseFrom.hasData()) {
                this.shuffleManager.addKnownInput(parseFrom.getHost(), parseFrom.getPort(), inputAttemptIdentifier2, sourceIndex);
                return;
            }
            ShuffleUserPayloads.DataProto data = parseFrom.getData();
            FetchedInput allocate = this.inputAllocator.allocate(data.getRawLength(), data.getCompressedLength(), inputAttemptIdentifier2);
            moveDataToFetchedInput(data, allocate);
            this.shuffleManager.addCompletedInputWithData(inputAttemptIdentifier2, allocate);
        } catch (InvalidProtocolBufferException e) {
            throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
        }
    }

    private void moveDataToFetchedInput(ShuffleUserPayloads.DataProto dataProto, FetchedInput fetchedInput) throws IOException {
        switch (fetchedInput.getType()) {
            case DISK:
                ShuffleUtils.shuffleToDisk((DiskFetchedInput) fetchedInput, dataProto.getData().newInput(), dataProto.getCompressedLength(), LOG);
                return;
            case MEMORY:
                ShuffleUtils.shuffleToMemory((MemoryFetchedInput) fetchedInput, dataProto.getData().newInput(), dataProto.getRawLength(), dataProto.getCompressedLength(), this.codec, this.ifileReadAhead, this.ifileReadAheadLength, LOG);
                return;
            case WAIT:
            default:
                throw new TezUncheckedException("Unexpected type: " + fetchedInput.getType());
        }
    }

    private void processInputFailedEvent(InputFailedEvent inputFailedEvent) {
        this.shuffleManager.obsoleteKnownInput(new InputAttemptIdentifier(inputFailedEvent.getTargetIndex(), inputFailedEvent.getVersion()));
    }

    private String stringify(ShuffleUserPayloads.DataMovementEventPayloadProto dataMovementEventPayloadProto) {
        StringBuilder sb = new StringBuilder();
        sb.append("[");
        sb.append("hasEmptyPartitions: ").append(dataMovementEventPayloadProto.hasEmptyPartitions()).append(", ");
        sb.append("host: " + dataMovementEventPayloadProto.getHost()).append(", ");
        sb.append("port: " + dataMovementEventPayloadProto.getPort()).append(", ");
        sb.append("pathComponent: " + dataMovementEventPayloadProto.getPathComponent()).append(", ");
        sb.append("runDuration: " + dataMovementEventPayloadProto.getRunDuration()).append(", ");
        sb.append("hasDataInEvent: " + dataMovementEventPayloadProto.hasData());
        return sb.toString();
    }
}
