package org.apache.flink.runtime.io.network;

import akka.dispatch.OnFailure;
import java.io.IOException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.TaskMessages;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Tuple2;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/io/network/NetworkEnvironment.class */
public class NetworkEnvironment {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) NetworkEnvironment.class);
    private final Object lock = new Object();
    private final NetworkEnvironmentConfiguration configuration;
    private final FiniteDuration jobManagerTimeout;
    private final NetworkBufferPool networkBufferPool;
    private ConnectionManager connectionManager;
    private ResultPartitionManager partitionManager;
    private TaskEventDispatcher taskEventDispatcher;
    private ResultPartitionConsumableNotifier partitionConsumableNotifier;
    private PartitionStateChecker partitionStateChecker;
    private boolean isShutdown;
    private final ExecutionContext executionContext;

    /* loaded from: input_file:org/apache/flink/runtime/io/network/NetworkEnvironment$JobManagerPartitionStateChecker.class */
    private static class JobManagerPartitionStateChecker implements PartitionStateChecker {
        private final ActorGateway jobManager;
        private final ActorGateway taskManager;

        public JobManagerPartitionStateChecker(ActorGateway actorGateway, ActorGateway actorGateway2) {
            this.jobManager = actorGateway;
            this.taskManager = actorGateway2;
        }

        @Override // org.apache.flink.runtime.io.network.netty.PartitionStateChecker
        public void triggerPartitionStateCheck(JobID jobID, ExecutionAttemptID executionAttemptID, IntermediateDataSetID intermediateDataSetID, ResultPartitionID resultPartitionID) {
            this.jobManager.tell(new JobManagerMessages.RequestPartitionState(jobID, resultPartitionID, executionAttemptID, intermediateDataSetID), this.taskManager);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/io/network/NetworkEnvironment$JobManagerResultPartitionConsumableNotifier.class */
    private static class JobManagerResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier {
        private final ExecutionContext executionContext;
        private final ActorGateway jobManager;
        private final ActorGateway taskManager;
        private final FiniteDuration jobManagerMessageTimeout;

        public JobManagerResultPartitionConsumableNotifier(ExecutionContext executionContext, ActorGateway actorGateway, ActorGateway actorGateway2, FiniteDuration finiteDuration) {
            this.executionContext = executionContext;
            this.jobManager = actorGateway;
            this.taskManager = actorGateway2;
            this.jobManagerMessageTimeout = finiteDuration;
        }

        @Override // org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier
        public void notifyPartitionConsumable(JobID jobID, final ResultPartitionID resultPartitionID) {
            this.jobManager.ask(new JobManagerMessages.ScheduleOrUpdateConsumers(jobID, resultPartitionID), this.jobManagerMessageTimeout).onFailure(new OnFailure() { // from class: org.apache.flink.runtime.io.network.NetworkEnvironment.JobManagerResultPartitionConsumableNotifier.1
                public void onFailure(Throwable th) {
                    NetworkEnvironment.LOG.error("Could not schedule or update consumers at the JobManager.", th);
                    JobManagerResultPartitionConsumableNotifier.this.taskManager.tell(new TaskMessages.FailTask(resultPartitionID.getProducerId(), new RuntimeException("Could not notify JobManager to schedule or update consumers", th)));
                }
            }, this.executionContext);
        }
    }

    public NetworkEnvironment(ExecutionContext executionContext, FiniteDuration finiteDuration, NetworkEnvironmentConfiguration networkEnvironmentConfiguration) throws IOException {
        this.executionContext = executionContext;
        this.configuration = (NetworkEnvironmentConfiguration) Preconditions.checkNotNull(networkEnvironmentConfiguration);
        this.jobManagerTimeout = (FiniteDuration) Preconditions.checkNotNull(finiteDuration);
        try {
            this.networkBufferPool = new NetworkBufferPool(networkEnvironmentConfiguration.numNetworkBuffers(), networkEnvironmentConfiguration.networkBufferSize(), networkEnvironmentConfiguration.memoryType());
        } catch (Throwable th) {
            throw new IOException("Cannot allocate network buffer pool: " + th.getMessage(), th);
        }
    }

    public ResultPartitionManager getPartitionManager() {
        return this.partitionManager;
    }

    public TaskEventDispatcher getTaskEventDispatcher() {
        return this.taskEventDispatcher;
    }

    public ConnectionManager getConnectionManager() {
        return this.connectionManager;
    }

    public NetworkBufferPool getNetworkBufferPool() {
        return this.networkBufferPool;
    }

    public IOManager.IOMode getDefaultIOMode() {
        return this.configuration.ioMode();
    }

    public ResultPartitionConsumableNotifier getPartitionConsumableNotifier() {
        return this.partitionConsumableNotifier;
    }

    public PartitionStateChecker getPartitionStateChecker() {
        return this.partitionStateChecker;
    }

    public Tuple2<Integer, Integer> getPartitionRequestInitialAndMaxBackoff() {
        return this.configuration.partitionRequestInitialAndMaxBackoff();
    }

    public boolean isAssociated() {
        return this.partitionConsumableNotifier != null;
    }

    public void associateWithTaskManagerAndJobManager(ActorGateway actorGateway, ActorGateway actorGateway2) throws IOException {
        Preconditions.checkNotNull(actorGateway);
        Preconditions.checkNotNull(actorGateway2);
        synchronized (this.lock) {
            if (this.isShutdown) {
                throw new IllegalStateException("environment is shut down");
            }
            if (this.partitionConsumableNotifier != null || this.partitionManager != null || this.taskEventDispatcher != null || this.connectionManager != null) {
                throw new IllegalStateException("Network Environment is already associated with a JobManager/TaskManager");
            }
            LOG.debug("Starting result partition manager and network connection manager");
            this.partitionManager = new ResultPartitionManager();
            this.taskEventDispatcher = new TaskEventDispatcher();
            this.partitionConsumableNotifier = new JobManagerResultPartitionConsumableNotifier(this.executionContext, actorGateway, actorGateway2, this.jobManagerTimeout);
            this.partitionStateChecker = new JobManagerPartitionStateChecker(actorGateway, actorGateway2);
            Option<NettyConfig> nettyConfig = this.configuration.nettyConfig();
            this.connectionManager = nettyConfig.isDefined() ? new NettyConnectionManager((NettyConfig) nettyConfig.get()) : new LocalConnectionManager();
            try {
                LOG.debug("Starting network connection manager");
                this.connectionManager.start(this.partitionManager, this.taskEventDispatcher, this.networkBufferPool);
            } catch (Throwable th) {
                throw new IOException("Failed to instantiate network connection manager: " + th.getMessage(), th);
            }
        }
    }

    public void disassociate() throws IOException {
        synchronized (this.lock) {
            if (isAssociated()) {
                LOG.debug("Disassociating NetworkEnvironment from TaskManager. Cleaning all intermediate results.");
                if (this.connectionManager != null) {
                    try {
                        LOG.debug("Shutting down network connection manager");
                        this.connectionManager.shutdown();
                        this.connectionManager = null;
                    } catch (Throwable th) {
                        throw new IOException("Cannot shutdown network connection manager", th);
                    }
                }
                if (this.partitionManager != null) {
                    try {
                        LOG.debug("Shutting down intermediate result partition manager");
                        this.partitionManager.shutdown();
                        this.partitionManager = null;
                    } catch (Throwable th2) {
                        throw new IOException("Cannot shutdown partition manager", th2);
                    }
                }
                this.partitionConsumableNotifier = null;
                this.partitionStateChecker = null;
                if (this.taskEventDispatcher != null) {
                    this.taskEventDispatcher.clearAll();
                    this.taskEventDispatcher = null;
                }
                this.networkBufferPool.destroyAllBufferPools();
            }
        }
    }

    public void registerTask(Task task) throws IOException {
        ResultPartitionConsumableNotifier resultPartitionConsumableNotifier;
        ResultPartition[] producedPartitions = task.getProducedPartitions();
        ResultPartitionWriter[] allWriters = task.getAllWriters();
        if (allWriters.length != producedPartitions.length) {
            throw new IllegalStateException("Unequal number of writers and partitions.");
        }
        synchronized (this.lock) {
            if (this.isShutdown) {
                throw new IllegalStateException("NetworkEnvironment is shut down");
            }
            if (!isAssociated()) {
                throw new IllegalStateException("NetworkEnvironment is not associated with a TaskManager");
            }
            for (int i = 0; i < producedPartitions.length; i++) {
                ResultPartition resultPartition = producedPartitions[i];
                ResultPartitionWriter resultPartitionWriter = allWriters[i];
                BufferPool bufferPool = null;
                try {
                    bufferPool = this.networkBufferPool.createBufferPool(resultPartition.getNumberOfSubpartitions(), false);
                    resultPartition.registerBufferPool(bufferPool);
                    this.partitionManager.registerResultPartition(resultPartition);
                    this.taskEventDispatcher.registerWriterForIncomingTaskEvents(resultPartitionWriter.getPartitionId(), resultPartitionWriter);
                } catch (Throwable th) {
                    if (bufferPool != null) {
                        bufferPool.lazyDestroy();
                    }
                    if (!(th instanceof IOException)) {
                        throw new IOException(th.getMessage(), th);
                    }
                    throw ((IOException) th);
                }
            }
            for (SingleInputGate singleInputGate : task.getAllInputGates()) {
                BufferPool bufferPool2 = null;
                try {
                    bufferPool2 = this.networkBufferPool.createBufferPool(singleInputGate.getNumberOfInputChannels(), false);
                    singleInputGate.setBufferPool(bufferPool2);
                } catch (Throwable th2) {
                    if (bufferPool2 != null) {
                        bufferPool2.lazyDestroy();
                    }
                    if (!(th2 instanceof IOException)) {
                        throw new IOException(th2.getMessage(), th2);
                    }
                    throw ((IOException) th2);
                }
            }
            resultPartitionConsumableNotifier = this.partitionConsumableNotifier;
        }
        for (ResultPartition resultPartition2 : producedPartitions) {
            if (resultPartition2.getEagerlyDeployConsumers()) {
                resultPartitionConsumableNotifier.notifyPartitionConsumable(resultPartition2.getJobId(), resultPartition2.getPartitionId());
            }
        }
    }

    public void unregisterTask(Task task) {
        LOG.debug("Unregister task {} from network environment (state: {}).", task.getTaskInfo().getTaskNameWithSubtasks(), task.getExecutionState());
        ExecutionAttemptID executionId = task.getExecutionId();
        synchronized (this.lock) {
            if (this.isShutdown || !isAssociated()) {
                return;
            }
            if (task.isCanceledOrFailed()) {
                this.partitionManager.releasePartitionsProducedBy(executionId, task.getFailureCause());
            }
            ResultPartitionWriter[] allWriters = task.getAllWriters();
            if (allWriters != null) {
                for (ResultPartitionWriter resultPartitionWriter : allWriters) {
                    this.taskEventDispatcher.unregisterWriter(resultPartitionWriter);
                }
            }
            ResultPartition[] producedPartitions = task.getProducedPartitions();
            if (producedPartitions != null) {
                for (ResultPartition resultPartition : producedPartitions) {
                    resultPartition.destroyBufferPool();
                }
            }
            SingleInputGate[] allInputGates = task.getAllInputGates();
            if (allInputGates != null) {
                for (SingleInputGate singleInputGate : allInputGates) {
                    if (singleInputGate != null) {
                        try {
                            singleInputGate.releaseAllResources();
                        } catch (IOException e) {
                            LOG.error("Error during release of reader resources: " + e.getMessage(), (Throwable) e);
                        }
                    }
                }
            }
        }
    }

    public void shutdown() {
        synchronized (this.lock) {
            if (this.isShutdown) {
                return;
            }
            try {
                disassociate();
            } catch (Throwable th) {
                LOG.warn("Network services did not shut down properly: " + th.getMessage(), th);
            }
            try {
                this.networkBufferPool.destroy();
            } catch (Throwable th2) {
                LOG.warn("Network buffer pool did not shut down properly: " + th2.getMessage(), th2);
            }
            this.isShutdown = true;
        }
    }

    public boolean isShutdown() {
        return this.isShutdown;
    }
}
