package org.apache.pulsar.functions.worker.rest.api;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import org.apache.pulsar.broker.authentication.AuthenticationParameters;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.PulsarWorkerService;
import org.apache.pulsar.functions.worker.SchedulerManager;
import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.functions.worker.rest.RestUtils;
import org.apache.pulsar.functions.worker.service.api.Workers;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.javax.ws.rs.WebApplicationException;
import org.apache.pulsar.shade.javax.ws.rs.core.Response;
import org.apache.pulsar.shade.javax.ws.rs.core.UriBuilder;
import org.apache.pulsar.shade.org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.shade.org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.FunctionInstanceStatsImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.WorkerFunctionInstanceStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.RestException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/worker/rest/api/WorkerImpl.class */
public class WorkerImpl implements Workers<PulsarWorkerService> {
    private static final Logger log = LoggerFactory.getLogger(WorkerImpl.class);
    private final Supplier<PulsarWorkerService> workerServiceSupplier;

    public WorkerImpl(Supplier<PulsarWorkerService> supplier) {
        this.workerServiceSupplier = supplier;
    }

    private PulsarWorkerService worker() {
        try {
            return (PulsarWorkerService) Preconditions.checkNotNull(this.workerServiceSupplier.get());
        } catch (Throwable th) {
            log.info("Failed to get worker service", th);
            throw th;
        }
    }

    private boolean isWorkerServiceAvailable() {
        PulsarWorkerService pulsarWorkerService = this.workerServiceSupplier.get();
        if (pulsarWorkerService == null) {
            return false;
        }
        return pulsarWorkerService.isInitialized();
    }

    @Override // org.apache.pulsar.functions.worker.service.api.Workers
    public List<WorkerInfo> getCluster(AuthenticationParameters authenticationParameters) {
        if (!isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        throwIfNotSuperUser(authenticationParameters, "get cluster");
        return worker().getMembershipManager().getCurrentMembership();
    }

    @Override // org.apache.pulsar.functions.worker.service.api.Workers
    public WorkerInfo getClusterLeader(AuthenticationParameters authenticationParameters) {
        if (!isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        throwIfNotSuperUser(authenticationParameters, "get cluster leader");
        WorkerInfo leader = worker().getMembershipManager().getLeader();
        if (leader == null) {
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, "Leader cannot be determined");
        }
        return leader;
    }

    @Override // org.apache.pulsar.functions.worker.service.api.Workers
    public Map<String, Collection<String>> getAssignments(AuthenticationParameters authenticationParameters) {
        if (!isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        throwIfNotSuperUser(authenticationParameters, "get cluster assignments");
        Map<String, Map<String, Function.Assignment>> currentAssignments = worker().getFunctionRuntimeManager().getCurrentAssignments();
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, Map<String, Function.Assignment>> entry : currentAssignments.entrySet()) {
            hashMap.put(entry.getKey(), entry.getValue().keySet());
        }
        return hashMap;
    }

    private void throwIfNotSuperUser(AuthenticationParameters authenticationParameters, String str) {
        if (worker().getWorkerConfig().isAuthorizationEnabled()) {
            try {
                if (authenticationParameters.getClientRole() == null || !worker().getAuthorizationService().isSuperUser(authenticationParameters).get(worker().getWorkerConfig().getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS).booleanValue()) {
                    log.error("Client with role [{}] and originalPrincipal [{}] is not authorized to {}", new Object[]{authenticationParameters.getClientRole(), authenticationParameters.getOriginalPrincipal(), str});
                    throw new RestException(Response.Status.UNAUTHORIZED, "Client is not authorized to perform operation");
                }
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                log.warn("Time-out {} sec while checking the role {} originalPrincipal {} is a super user role ", new Object[]{Integer.valueOf(worker().getWorkerConfig().getZooKeeperOperationTimeoutSeconds()), authenticationParameters.getClientRole(), authenticationParameters.getOriginalPrincipal()});
                throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
            }
        }
    }

    @Override // org.apache.pulsar.functions.worker.service.api.Workers
    public List<Metrics> getWorkerMetrics(AuthenticationParameters authenticationParameters) {
        if (!isWorkerServiceAvailable() || worker().getMetricsGenerator() == null) {
            RestUtils.throwUnavailableException();
        }
        throwIfNotSuperUser(authenticationParameters, "get worker stats");
        return worker().getMetricsGenerator().generate();
    }

    @Override // org.apache.pulsar.functions.worker.service.api.Workers
    public List<WorkerFunctionInstanceStats> getFunctionsMetrics(AuthenticationParameters authenticationParameters) throws IOException {
        if (!isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        throwIfNotSuperUser(authenticationParameters, "get function stats");
        Map<String, FunctionRuntimeInfo> functionRuntimeInfos = worker().getFunctionRuntimeManager().getFunctionRuntimeInfos();
        ArrayList arrayList = new ArrayList(functionRuntimeInfos.size());
        for (Map.Entry<String, FunctionRuntimeInfo> entry : functionRuntimeInfos.entrySet()) {
            String key = entry.getKey();
            FunctionRuntimeInfo value = entry.getValue();
            if (worker().getFunctionRuntimeManager().getRuntimeFactory().externallyManaged()) {
                Function.FunctionDetails functionDetails = value.getFunctionInstance().getFunctionMetaData().getFunctionDetails();
                int parallelism = functionDetails.getParallelism();
                for (int i = 0; i < parallelism; i++) {
                    FunctionInstanceStatsImpl functionInstanceStats = WorkerUtils.getFunctionInstanceStats(key, value, i);
                    WorkerFunctionInstanceStats workerFunctionInstanceStats = new WorkerFunctionInstanceStats();
                    workerFunctionInstanceStats.setName(FunctionCommon.getFullyQualifiedInstanceId(functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName(), i));
                    workerFunctionInstanceStats.setMetrics(functionInstanceStats.getMetrics());
                    arrayList.add(workerFunctionInstanceStats);
                }
            } else {
                FunctionInstanceStatsImpl functionInstanceStats2 = WorkerUtils.getFunctionInstanceStats(key, value, value.getFunctionInstance().getInstanceId());
                WorkerFunctionInstanceStats workerFunctionInstanceStats2 = new WorkerFunctionInstanceStats();
                workerFunctionInstanceStats2.setName(key);
                workerFunctionInstanceStats2.setMetrics(functionInstanceStats2.getMetrics());
                arrayList.add(workerFunctionInstanceStats2);
            }
        }
        return arrayList;
    }

    @Override // org.apache.pulsar.functions.worker.service.api.Workers
    public List<ConnectorDefinition> getListOfConnectors(AuthenticationParameters authenticationParameters) {
        if (!isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        throwIfNotSuperUser(authenticationParameters, "get list of connectors");
        return worker().getConnectorsManager().getConnectorDefinitions();
    }

    @Override // org.apache.pulsar.functions.worker.service.api.Workers
    public void rebalance(URI uri, AuthenticationParameters authenticationParameters) {
        if (!isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        throwIfNotSuperUser(authenticationParameters, "rebalance cluster");
        if (worker().getLeaderService().isLeader()) {
            try {
                worker().getSchedulerManager().rebalanceIfNotInprogress();
            } catch (SchedulerManager.RebalanceInProgressException e) {
                throw new RestException(Response.Status.BAD_REQUEST, "Rebalance already in progress");
            } catch (SchedulerManager.TooFewWorkersException e2) {
                throw new RestException(Response.Status.BAD_REQUEST, "Too few workers (need at least 2)");
            }
        } else {
            WorkerInfo leader = worker().getMembershipManager().getLeader();
            if (leader != null) {
                throw new WebApplicationException(Response.temporaryRedirect(UriBuilder.fromUri(uri).host(leader.getWorkerHostname()).port(leader.getPort()).build(new Object[0])).build());
            }
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, "Leader cannot be determined");
        }
    }

    @Override // org.apache.pulsar.functions.worker.service.api.Workers
    public void drain(URI uri, String str, AuthenticationParameters authenticationParameters, boolean z) {
        if (!isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        String workerId = worker().getWorkerConfig().getWorkerId();
        String str2 = (str == null || str.isEmpty()) ? workerId : str;
        if (log.isDebugEnabled()) {
            log.debug("drain called with URI={}, inWorkerId={}, workerId={}, clientRole={}, originalPrincipal={}, calledOnLeaderUri={}, on actual worker-id={}", new Object[]{uri, str, str2, authenticationParameters.getClientRole(), authenticationParameters.getOriginalPrincipal(), Boolean.valueOf(z), workerId});
        }
        throwIfNotSuperUser(authenticationParameters, "drain worker");
        if (!worker().getLeaderService().isLeader()) {
            URI buildRedirectUriForDrainRelatedOp = buildRedirectUriForDrainRelatedOp(uri, str2);
            log.info("Not leader; redirect URI={}", buildRedirectUriForDrainRelatedOp);
            throw new WebApplicationException(Response.temporaryRedirect(buildRedirectUriForDrainRelatedOp).build());
        }
        try {
            worker().getSchedulerManager().drainIfNotInProgress(str2);
        } catch (SchedulerManager.DrainInProgressException e) {
            throw new RestException(Response.Status.CONFLICT, "Another drain is in progress");
        } catch (SchedulerManager.TooFewWorkersException e2) {
            throw new RestException(Response.Status.BAD_REQUEST, "Too few workers (need at least 2)");
        } catch (SchedulerManager.UnknownWorkerException e3) {
            throw new RestException(Response.Status.BAD_REQUEST, "Worker " + str2 + " is not among the current workers in the system");
        } catch (SchedulerManager.WorkerNotRemovedAfterPriorDrainException e4) {
            throw new RestException(Response.Status.PRECONDITION_FAILED, "Worker " + str2 + " was not yet removed after a prior drain op; try later");
        }
    }

    @Override // org.apache.pulsar.functions.worker.service.api.Workers
    public LongRunningProcessStatus getDrainStatus(URI uri, String str, AuthenticationParameters authenticationParameters, boolean z) {
        if (!isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        String workerId = worker().getWorkerConfig().getWorkerId();
        String str2 = (str == null || str.isEmpty()) ? workerId : str;
        if (log.isDebugEnabled()) {
            log.debug("getDrainStatus called with uri={}, inWorkerId={}, workerId={}, clientRole={}, originalPrincipal={}, calledOnLeaderUri={}, on actual workerId={}", new Object[]{uri, str, str2, authenticationParameters.getClientRole(), authenticationParameters.getOriginalPrincipal(), Boolean.valueOf(z), workerId});
        }
        throwIfNotSuperUser(authenticationParameters, "get drain status of worker");
        if (worker().getLeaderService().isLeader()) {
            return worker().getSchedulerManager().getDrainStatus(str2);
        }
        URI buildRedirectUriForDrainRelatedOp = buildRedirectUriForDrainRelatedOp(uri, str2);
        log.info("Not leader; redirect URI={}", buildRedirectUriForDrainRelatedOp);
        throw new WebApplicationException(Response.temporaryRedirect(buildRedirectUriForDrainRelatedOp).build());
    }

    @Override // org.apache.pulsar.functions.worker.service.api.Workers
    public boolean isLeaderReady(AuthenticationParameters authenticationParameters) {
        if (!isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        if (worker().getLeaderService().isLeader()) {
            return true;
        }
        RestUtils.throwUnavailableException();
        return false;
    }

    private URI buildRedirectUriForDrainRelatedOp(URI uri, String str) {
        WorkerInfo leader = worker().getMembershipManager().getLeader();
        if (leader == null) {
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, "Leader cannot be determined");
        }
        return UriBuilder.fromUri(uri).host(leader.getWorkerHostname()).port(leader.getPort()).replacePath("admin/v2/worker/leader/drain").replaceQueryParam("workerId", str).build(new Object[0]);
    }
}
