package alluxio.master.block;

import alluxio.RpcUtils;
import alluxio.conf.PropertyKey;
import alluxio.exception.status.DeadlineExceededException;
import alluxio.grpc.GrpcExceptionUtils;
import alluxio.grpc.RegisterWorkerPRequest;
import alluxio.grpc.RegisterWorkerPResponse;
import com.google.common.base.Preconditions;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:alluxio/master/block/RegisterStreamObserver.class */
public class RegisterStreamObserver implements StreamObserver<RegisterWorkerPRequest> {
    private static final Logger LOG = LoggerFactory.getLogger(RegisterStreamObserver.class);
    private WorkerRegisterContext mContext;
    private final BlockMaster mBlockMaster;
    private final StreamObserver<RegisterWorkerPResponse> mMasterResponseObserver;
    private final AtomicReference<Throwable> mErrorReceived = new AtomicReference<>();

    public RegisterStreamObserver(BlockMaster blockMaster, StreamObserver<RegisterWorkerPResponse> streamObserver) {
        this.mBlockMaster = blockMaster;
        this.mMasterResponseObserver = streamObserver;
    }

    public void onNext(final RegisterWorkerPRequest registerWorkerPRequest) {
        final long workerId = registerWorkerPRequest.getWorkerId();
        final boolean isFirstMessage = isFirstMessage(registerWorkerPRequest);
        LOG.debug("Received register worker request of {} bytes with {} LocationBlockIdListEntry. Worker {}, isHead {}", new Object[]{Integer.valueOf(registerWorkerPRequest.getSerializedSize()), Integer.valueOf(registerWorkerPRequest.getCurrentBlocksCount()), Long.valueOf(workerId), Boolean.valueOf(isFirstMessage)});
        RpcUtils.streamingRPCAndLog(LOG, new RpcUtils.StreamingRpcCallable<RegisterWorkerPResponse>() { // from class: alluxio.master.block.RegisterStreamObserver.1
            /* renamed from: call, reason: merged with bridge method [inline-methods] */
            public RegisterWorkerPResponse m20call() throws Exception {
                Preconditions.checkState(RegisterStreamObserver.this.mErrorReceived.get() == null, "The stream has been closed due to an earlier error received: %s", RegisterStreamObserver.this.mErrorReceived.get());
                synchronized (this) {
                    if (RegisterStreamObserver.this.mContext == null) {
                        Preconditions.checkState(isFirstMessage, "Context is not initialized but the request is not the 1st in a stream!");
                        RegisterStreamObserver.LOG.debug("Initializing context for {}", Long.valueOf(workerId));
                        RegisterStreamObserver.this.mContext = WorkerRegisterContext.create(RegisterStreamObserver.this.mBlockMaster, workerId, this);
                        RegisterStreamObserver.LOG.debug("Context created for {}", Long.valueOf(workerId));
                    }
                }
                Preconditions.checkState(RegisterStreamObserver.this.mContext != null, "Stream message received from the client side but the context was not initialized!");
                Preconditions.checkState(RegisterStreamObserver.this.mContext.isOpen(), "WorkerRegisterContext has been closed before this message is received! Probably %s was exceeded!", PropertyKey.MASTER_WORKER_REGISTER_STREAM_RESPONSE_TIMEOUT.toString());
                RegisterStreamObserver.this.mContext.updateTs();
                RegisterStreamObserver.this.mBlockMaster.workerRegisterStream(RegisterStreamObserver.this.mContext, registerWorkerPRequest, isFirstMessage);
                RegisterStreamObserver.this.mContext.updateTs();
                return RegisterWorkerPResponse.newBuilder().build();
            }

            public void exceptionCaught(Throwable th) {
                RegisterStreamObserver.LOG.error("Failed to process the RegisterWorkerPRequest in the stream: ", th);
                RegisterStreamObserver.this.cleanup();
                RegisterStreamObserver.this.mMasterResponseObserver.onError(GrpcExceptionUtils.fromThrowable(th));
            }
        }, isFirstMessage ? "registerWorkerStart" : "registerWorkerStream", true, false, this.mMasterResponseObserver, "WorkerId=%s", new Object[]{Long.valueOf(registerWorkerPRequest.getWorkerId())});
    }

    public void onError(Throwable th) {
        this.mErrorReceived.set(th);
        Preconditions.checkState(this.mContext != null, "Error received from the client side but the context was not initialized!");
        if (!(th instanceof TimeoutException)) {
            LOG.error("Received error from the worker side during the streaming register call: {}", th.getMessage());
            cleanup();
        } else {
            cleanup();
            this.mMasterResponseObserver.onError(new DeadlineExceededException(th).toGrpcStatusException());
            LOG.warn("Worker {} register stream has timed out. Error sent to the worker.", Long.valueOf(this.mContext.getWorkerId()));
        }
    }

    public void onCompleted() {
        LOG.info("Register stream completed on the client side");
        Logger logger = LOG;
        RpcUtils.StreamingRpcCallable<RegisterWorkerPResponse> streamingRpcCallable = new RpcUtils.StreamingRpcCallable<RegisterWorkerPResponse>() { // from class: alluxio.master.block.RegisterStreamObserver.2
            /* renamed from: call, reason: merged with bridge method [inline-methods] */
            public RegisterWorkerPResponse m21call() {
                Preconditions.checkState(RegisterStreamObserver.this.mErrorReceived.get() == null, "The stream has been closed due to an earlier error received: %s", RegisterStreamObserver.this.mErrorReceived.get());
                Preconditions.checkState(RegisterStreamObserver.this.mContext != null, "Stream message received from the client side but the context was not initialized!");
                Preconditions.checkState(RegisterStreamObserver.this.mContext.isOpen(), "WorkerRegisterContext has been closed before this stream is completed! Probably %s was exceeded!", PropertyKey.MASTER_WORKER_REGISTER_STREAM_RESPONSE_TIMEOUT.toString());
                RegisterStreamObserver.this.mContext.updateTs();
                RegisterStreamObserver.this.mBlockMaster.workerRegisterFinish(RegisterStreamObserver.this.mContext);
                RegisterStreamObserver.this.mContext.updateTs();
                RegisterStreamObserver.this.cleanup();
                return null;
            }

            public void exceptionCaught(Throwable th) {
                RegisterStreamObserver.LOG.error("Failed to complete the register worker stream: ", th);
                RegisterStreamObserver.this.cleanup();
                RegisterStreamObserver.this.mMasterResponseObserver.onError(GrpcExceptionUtils.fromThrowable(th));
            }
        };
        StreamObserver<RegisterWorkerPResponse> streamObserver = this.mMasterResponseObserver;
        Object[] objArr = new Object[1];
        objArr[0] = this.mContext == null ? "NONE" : Long.valueOf(this.mContext.getWorkerId());
        RpcUtils.streamingRPCAndLog(logger, streamingRpcCallable, "registerWorkerComplete", false, true, streamObserver, "WorkerId=%s", objArr);
    }

    private boolean isFirstMessage(RegisterWorkerPRequest registerWorkerPRequest) {
        return registerWorkerPRequest.getStorageTiersCount() > 0;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void cleanup() {
        synchronized (this) {
            if (this.mContext == null) {
                LOG.debug("The stream is closed before the context is initialized. Nothing to clean up.");
                return;
            }
            LOG.debug("Unlocking worker {}", Long.valueOf(this.mContext.getWorkerId()));
            this.mContext.close();
            LOG.debug("Context closed for worker {}", Long.valueOf(this.mContext.getWorkerId()));
            Preconditions.checkState(!this.mContext.isOpen(), "Failed to properly close the WorkerRegisterContext!");
        }
    }
}
