package org.apache.pulsar.broker.admin.impl;

import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Example;
import io.swagger.annotations.ExampleProperty;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Response;
import org.apache.bookkeeper.common.util.JsonUtil;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.ClusterOperation;
import org.apache.pulsar.common.policies.data.ClusterPolicies;
import org.apache.pulsar.common.policies.data.ClusterPoliciesImpl;
import org.apache.pulsar.common.policies.data.FailureDomainImpl;
import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicyImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/admin/impl/ClustersBase.class */
public class ClustersBase extends AdminResource {
    private static final Logger log = LoggerFactory.getLogger(ClustersBase.class);

    @GET
    @ApiResponses({@ApiResponse(code = 200, message = "Return a list of clusters."), @ApiResponse(code = 500, message = "Internal server error.")})
    @ApiOperation(value = "Get the list of all the Pulsar clusters.", response = String.class, responseContainer = "Set")
    public void getClusters(@Suspended AsyncResponse asyncResponse) {
        CompletableFuture thenApply = clusterResources().listAsync().thenApply(set -> {
            return (Set) set.stream().filter(str -> {
                return !"global".equals(str);
            }).collect(Collectors.toSet());
        });
        Objects.requireNonNull(asyncResponse);
        thenApply.thenAccept((v1) -> {
            r1.resume(v1);
        }).exceptionally(th -> {
            log.error("[{}] Failed to get clusters {}", clientAppId(), th);
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 200, message = "Return the cluster data.", response = ClusterDataImpl.class), @ApiResponse(code = 403, message = "Don't have admin permission."), @ApiResponse(code = 404, message = "Cluster doesn't exist."), @ApiResponse(code = 500, message = "Internal server error.")})
    @Path("/{cluster}")
    @ApiOperation(value = "Get the configuration for the specified cluster.", response = ClusterDataImpl.class, notes = "This operation requires Pulsar superuser privileges.")
    public void getCluster(@Suspended AsyncResponse asyncResponse, @PathParam("cluster") @ApiParam(value = "The cluster name", required = true) String str) {
        validateBothSuperuserAndClusterOperation(str, ClusterOperation.GET_CLUSTER).thenCompose(r5 -> {
            return clusterResources().getClusterAsync(str);
        }).thenAccept((Consumer<? super U>) optional -> {
            asyncResponse.resume(optional.orElseThrow(() -> {
                return new RestException(Response.Status.NOT_FOUND, "Cluster does not exist");
            }));
        }).exceptionally(th -> {
            log.error("[{}] Failed to get cluster {}", new Object[]{clientAppId(), str, th});
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 200, message = "Cluster has been created."), @ApiResponse(code = 400, message = "Bad request parameter."), @ApiResponse(code = 403, message = "You don't have admin permission to create the cluster."), @ApiResponse(code = 409, message = "Cluster already exists."), @ApiResponse(code = 412, message = "Cluster name is not valid."), @ApiResponse(code = 500, message = "Internal server error.")})
    @Path("/{cluster}")
    @ApiOperation(value = "Create a new cluster.", notes = "This operation requires Pulsar superuser privileges, and the name cannot contain the '/' characters.")
    @PUT
    public void createCluster(@Suspended AsyncResponse asyncResponse, @PathParam("cluster") @ApiParam(value = "The cluster name", required = true) String str, @ApiParam(value = "The cluster data", required = true, examples = @Example({@ExampleProperty(mediaType = "application/json", value = "{\n   \"serviceUrl\": \"http://pulsar.example.com:8080\",\n   \"brokerServiceUrl\": \"pulsar://pulsar.example.com:6651\",\n}\n")})) ClusterDataImpl clusterDataImpl) {
        validateBothSuperuserAndClusterOperation(str, ClusterOperation.CREATE_CLUSTER).thenCompose(r3 -> {
            return validatePoliciesReadOnlyAccessAsync();
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r8 -> {
            NamedEntity.checkName(str);
            if (clusterDataImpl == null) {
                throw new RestException(Response.Status.BAD_REQUEST, "cluster data is required");
            }
            try {
                clusterDataImpl.checkPropertiesIfPresent();
                return clusterResources().getClusterAsync(str);
            } catch (IllegalArgumentException e) {
                throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
            }
        }).thenCompose(optional -> {
            if (optional.isPresent()) {
                throw new RestException(Response.Status.CONFLICT, "Cluster already exists");
            }
            return clusterResources().createClusterAsync(str, clusterDataImpl);
        }).thenAccept(r82 -> {
            log.info("[{}] Created cluster {}", clientAppId(), str);
            asyncResponse.resume(Response.ok().build());
        }).exceptionally(th -> {
            log.error("[{}] Failed to create cluster {}", new Object[]{clientAppId(), str, th});
            if (FutureUtil.unwrapCompletionException(th) instanceof IllegalArgumentException) {
                asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED, "Cluster name is not valid"));
                return null;
            }
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 200, message = "Cluster has been updated."), @ApiResponse(code = 400, message = "Bad request parameter."), @ApiResponse(code = 403, message = "Don't have admin permission or policies are read-only."), @ApiResponse(code = 404, message = "Cluster doesn't exist."), @ApiResponse(code = 500, message = "Internal server error.")})
    @Path("/{cluster}")
    @ApiOperation(value = "Update the configuration for a cluster.", notes = "This operation requires Pulsar superuser privileges.")
    @POST
    public void updateCluster(@Suspended AsyncResponse asyncResponse, @PathParam("cluster") @ApiParam(value = "The cluster name", required = true) String str, @ApiParam(value = "The cluster data", required = true, examples = @Example({@ExampleProperty(mediaType = "application/json", value = "{\n   \"serviceUrl\": \"http://pulsar.example.com:8080\",\n   \"brokerServiceUrl\": \"pulsar://pulsar.example.com:6651\"\n}\n")})) ClusterDataImpl clusterDataImpl) {
        validateBothSuperuserAndClusterOperation(str, ClusterOperation.UPDATE_CLUSTER).thenCompose(r3 -> {
            return validatePoliciesReadOnlyAccessAsync();
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r8 -> {
            try {
                clusterDataImpl.checkPropertiesIfPresent();
                return clusterResources().updateClusterAsync(str, clusterData -> {
                    return clusterDataImpl;
                });
            } catch (IllegalArgumentException e) {
                throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
            }
        }).thenAccept(r82 -> {
            log.info("[{}] Updated cluster {}", clientAppId(), str);
            asyncResponse.resume(Response.ok().build());
        }).exceptionally(th -> {
            log.error("[{}] Failed to update cluster {}", new Object[]{clientAppId(), str, th});
            if (FutureUtil.unwrapCompletionException(th) instanceof MetadataStoreException.NotFoundException) {
                asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Cluster does not exist"));
                return null;
            }
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 200, message = "Return the cluster data.", response = ClusterDataImpl.class), @ApiResponse(code = 403, message = "Don't have admin permission."), @ApiResponse(code = 404, message = "Cluster doesn't exist."), @ApiResponse(code = 500, message = "Internal server error.")})
    @Path("/{cluster}/migrate")
    @ApiOperation(value = "Get the cluster migration configuration for the specified cluster.", response = ClusterDataImpl.class, notes = "This operation requires Pulsar superuser privileges.")
    public void getClusterMigration(@Suspended AsyncResponse asyncResponse, @PathParam("cluster") @ApiParam(value = "The cluster name", required = true) String str) {
        validateBothSuperuserAndClusterPolicyOperation(str, PolicyName.CLUSTER_MIGRATION, PolicyOperation.READ).thenCompose(r5 -> {
            return clusterResources().getClusterPoliciesResources().getClusterPoliciesAsync(str);
        }).thenAccept((Consumer<? super U>) optional -> {
            asyncResponse.resume(optional.orElseThrow(() -> {
                return new RestException(Response.Status.NOT_FOUND, "Cluster does not exist");
            }));
        }).exceptionally(th -> {
            log.error("[{}] Failed to get cluster {} migration", new Object[]{clientAppId(), str, th});
            if (FutureUtil.unwrapCompletionException(th) instanceof MetadataStoreException.NotFoundException) {
                asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Cluster does not exist"));
                return null;
            }
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 200, message = "Cluster has been updated."), @ApiResponse(code = 400, message = "Cluster url must not be empty."), @ApiResponse(code = 403, message = "Don't have admin permission or policies are read-only."), @ApiResponse(code = 404, message = "Cluster doesn't exist."), @ApiResponse(code = 500, message = "Internal server error.")})
    @Path("/{cluster}/migrate")
    @ApiOperation(value = "Update the configuration for a cluster migration.", notes = "This operation requires Pulsar superuser privileges.")
    @POST
    public void updateClusterMigration(@Suspended AsyncResponse asyncResponse, @PathParam("cluster") @ApiParam(value = "The cluster name", required = true) String str, @QueryParam("migrated") @ApiParam(value = "Is cluster migrated", required = true) boolean z, @ApiParam(value = "The cluster url data", required = true, examples = @Example({@ExampleProperty(mediaType = "application/json", value = "{\n   \"serviceUrl\": \"http://pulsar.example.com:8080\",\n   \"brokerServiceUrl\": \"pulsar://pulsar.example.com:6651\"\n}\n")})) ClusterPolicies.ClusterUrl clusterUrl) {
        if (z && clusterUrl.isEmpty()) {
            asyncResponse.resume(new RestException(Response.Status.BAD_REQUEST, "Cluster url must not be empty"));
        } else {
            validateBothSuperuserAndClusterPolicyOperation(str, PolicyName.CLUSTER_MIGRATION, PolicyOperation.WRITE).thenCompose(r3 -> {
                return validatePoliciesReadOnlyAccessAsync();
            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r9 -> {
                return clusterResources().getClusterPoliciesResources().setPoliciesWithCreateAsync(str, optional -> {
                    ClusterPoliciesImpl clusterPoliciesImpl = (ClusterPoliciesImpl) optional.orElse(new ClusterPoliciesImpl());
                    clusterPoliciesImpl.setMigrated(z);
                    clusterPoliciesImpl.setMigratedClusterUrl(clusterUrl);
                    return clusterPoliciesImpl;
                });
            }).thenAccept(r8 -> {
                log.info("[{}] Updated cluster {}", clientAppId(), str);
                asyncResponse.resume(Response.ok().build());
            }).exceptionally(th -> {
                log.error("[{}] Failed to update cluster {}", new Object[]{clientAppId(), str, th});
                if (FutureUtil.unwrapCompletionException(th) instanceof MetadataStoreException.NotFoundException) {
                    asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Cluster does not exist"));
                    return null;
                }
                resumeAsyncResponseExceptionally(asyncResponse, th);
                return null;
            });
        }
    }

    @ApiResponses({@ApiResponse(code = 204, message = "Cluster has been updated."), @ApiResponse(code = 403, message = "Don't have admin permission or policies are read-only."), @ApiResponse(code = 404, message = "Cluster doesn't exist."), @ApiResponse(code = 412, message = "Peer cluster doesn't exist."), @ApiResponse(code = 500, message = "Internal server error.")})
    @Path("/{cluster}/peers")
    @ApiOperation(value = "Update peer-cluster-list for a cluster.", notes = "This operation requires Pulsar superuser privileges.")
    @POST
    public void setPeerClusterNames(@Suspended AsyncResponse asyncResponse, @PathParam("cluster") @ApiParam(value = "The cluster name", required = true) String str, @ApiParam(value = "The list of peer cluster names", required = true, examples = @Example({@ExampleProperty(mediaType = "application/json", value = "[\n   \"cluster-a\",\n   \"cluster-b\"\n]")})) LinkedHashSet<String> linkedHashSet) {
        validateBothSuperuserAndClusterOperation(str, ClusterOperation.UPDATE_PEER_CLUSTER).thenCompose(r3 -> {
            return validatePoliciesReadOnlyAccessAsync();
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r7 -> {
            return innerSetPeerClusterNamesAsync(str, linkedHashSet);
        }).thenAccept(r11 -> {
            log.info("[{}] Successfully added peer-cluster {} for {}", new Object[]{clientAppId(), linkedHashSet, str});
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            Throwable unwrapCompletionException = FutureUtil.unwrapCompletionException(th);
            log.error("[{}] Failed to validate peer-cluster list {}, {}", new Object[]{clientAppId(), linkedHashSet, th});
            if (unwrapCompletionException instanceof MetadataStoreException.NotFoundException) {
                asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Cluster does not exist"));
                return null;
            }
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }

    private CompletableFuture<Void> innerSetPeerClusterNamesAsync(String str, LinkedHashSet<String> linkedHashSet) {
        return (CollectionUtils.isNotEmpty(linkedHashSet) ? FutureUtil.waitForAll((Collection) linkedHashSet.stream().map(str2 -> {
            return str.equalsIgnoreCase(str2) ? FutureUtil.failedFuture(new RestException(Response.Status.PRECONDITION_FAILED, str + " itself can't be part of peer-list")) : clusterResources().getClusterAsync(str2).thenAccept(optional -> {
                if (!optional.isPresent()) {
                    throw new RestException(Response.Status.PRECONDITION_FAILED, "Peer cluster " + str2 + " does not exist");
                }
            });
        }).collect(Collectors.toList())) : CompletableFuture.completedFuture(null)).thenCompose(r7 -> {
            return clusterResources().updateClusterAsync(str, clusterData -> {
                return clusterData.clone().peerClusterNames(linkedHashSet).build();
            });
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission."), @ApiResponse(code = 404, message = "Cluster doesn't exist."), @ApiResponse(code = 500, message = "Internal server error.")})
    @Path("/{cluster}/peers")
    @ApiOperation(value = "Get the peer-cluster data for the specified cluster.", response = String.class, responseContainer = "Set", notes = "This operation requires Pulsar superuser privileges.")
    public void getPeerCluster(@Suspended AsyncResponse asyncResponse, @PathParam("cluster") @ApiParam(value = "The cluster name", required = true) String str) {
        validateBothSuperuserAndClusterOperation(str, ClusterOperation.GET_PEER_CLUSTER).thenCompose(r5 -> {
            return clusterResources().getClusterAsync(str);
        }).thenAccept((Consumer<? super U>) optional -> {
            asyncResponse.resume(((ClusterData) optional.orElseThrow(() -> {
                return new RestException(Response.Status.NOT_FOUND, "Cluster does not exist");
            })).getPeerClusterNames());
        }).exceptionally(th -> {
            log.error("[{}] Failed to get cluster {}", new Object[]{clientAppId(), str, th});
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 204, message = "Cluster has been deleted."), @ApiResponse(code = 403, message = "Don't have admin permission or policies are read-only."), @ApiResponse(code = 404, message = "Cluster doesn't exist."), @ApiResponse(code = 412, message = "Cluster is not empty."), @ApiResponse(code = 500, message = "Internal server error.")})
    @Path("/{cluster}")
    @DELETE
    @ApiOperation(value = "Delete an existing cluster.", notes = "This operation requires Pulsar superuser privileges.")
    public void deleteCluster(@Suspended AsyncResponse asyncResponse, @PathParam("cluster") @ApiParam(value = "The cluster name", required = true) String str) {
        validateBothSuperuserAndClusterOperation(str, ClusterOperation.DELETE_CLUSTER).thenCompose(r3 -> {
            return validatePoliciesReadOnlyAccessAsync();
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r5 -> {
            return internalDeleteClusterAsync(str);
        }).thenAccept(r8 -> {
            log.info("[{}] Deleted cluster {}", clientAppId(), str);
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            if (FutureUtil.unwrapCompletionException(th) instanceof MetadataStoreException.NotFoundException) {
                log.warn("[{}] Failed to delete cluster {} - Does not exist", clientAppId(), str);
                asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Cluster does not exist"));
                return null;
            }
            log.error("[{}] Failed to delete cluster {}", new Object[]{clientAppId(), str, th});
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }

    private CompletableFuture<Void> internalDeleteClusterAsync(String str) {
        return pulsar().getPulsarResources().getClusterResources().isClusterUsedAsync(str).thenCompose(bool -> {
            if (bool.booleanValue()) {
                throw new RestException(Response.Status.PRECONDITION_FAILED, "Cluster not empty");
            }
            return namespaceIsolationPolicies().getIsolationDataPoliciesAsync(str);
        }).thenCompose(optional -> {
            if (!optional.isPresent()) {
                return CompletableFuture.completedFuture(null);
            }
            if (((NamespaceIsolationPolicies) optional.get()).getPolicies().isEmpty()) {
                return namespaceIsolationPolicies().deleteIsolationDataAsync(str);
            }
            throw new RestException(Response.Status.PRECONDITION_FAILED, "Cluster not empty");
        }).thenCompose(r6 -> {
            return clusterResources().getFailureDomainResources().deleteFailureDomainsAsync(str).thenCompose(r5 -> {
                return clusterResources().deleteClusterAsync(str);
            });
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission."), @ApiResponse(code = 404, message = "Cluster doesn't exist."), @ApiResponse(code = 500, message = "Internal server error.")})
    @Path("/{cluster}/namespaceIsolationPolicies")
    @ApiOperation(value = "Get the namespace isolation policies assigned to the cluster.", response = NamespaceIsolationDataImpl.class, responseContainer = "Map", notes = "This operation requires Pulsar superuser privileges.")
    public void getNamespaceIsolationPolicies(@Suspended AsyncResponse asyncResponse, @PathParam("cluster") @ApiParam(value = "The cluster name", required = true) String str) {
        CompletableFuture thenCompose = validateBothSuperuserAndClusterPolicyOperation(str, PolicyName.NAMESPACE_ISOLATION, PolicyOperation.READ).thenCompose(r6 -> {
            return validateClusterExistAsync(str, Response.Status.NOT_FOUND);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r5 -> {
            return internalGetNamespaceIsolationPolicies(str);
        });
        Objects.requireNonNull(asyncResponse);
        thenCompose.thenAccept((v1) -> {
            r1.resume(v1);
        }).exceptionally(th -> {
            log.error("[{}] Failed to get clusters/{}/namespaceIsolationPolicies", new Object[]{clientAppId(), str, th});
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }

    private CompletableFuture<Void> validateClusterExistAsync(String str, Response.Status status) {
        return clusterResources().clusterExistsAsync(str).thenAccept(bool -> {
            if (!bool.booleanValue()) {
                throw new RestException(status, "Cluster " + str + " does not exist.");
            }
        });
    }

    private CompletableFuture<Map<String, NamespaceIsolationDataImpl>> internalGetNamespaceIsolationPolicies(String str) {
        return namespaceIsolationPolicies().getIsolationDataPoliciesAsync(str).thenApply(optional -> {
            if (optional.isPresent()) {
                return ((NamespaceIsolationPolicies) optional.get()).getPolicies();
            }
            throw new RestException(Response.Status.NOT_FOUND, "NamespaceIsolationPolicies for cluster " + str + " does not exist");
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission."), @ApiResponse(code = 404, message = "Policy doesn't exist."), @ApiResponse(code = 412, message = "Cluster doesn't exist."), @ApiResponse(code = 500, message = "Internal server error.")})
    @Path("/{cluster}/namespaceIsolationPolicies/{policyName}")
    @ApiOperation(value = "Get the single namespace isolation policy assigned to the cluster.", response = NamespaceIsolationDataImpl.class, notes = "This operation requires Pulsar superuser privileges.")
    public void getNamespaceIsolationPolicy(@Suspended AsyncResponse asyncResponse, @PathParam("cluster") @ApiParam(value = "The cluster name", required = true) String str, @PathParam("policyName") @ApiParam(value = "The name of the namespace isolation policy", required = true) String str2) {
        validateBothSuperuserAndClusterPolicyOperation(str, PolicyName.NAMESPACE_ISOLATION, PolicyOperation.READ).thenCompose(r6 -> {
            return validateClusterExistAsync(str, Response.Status.PRECONDITION_FAILED);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r5 -> {
            return internalGetNamespaceIsolationPolicies(str);
        }).thenAccept(map -> {
            if (!map.containsKey(str2)) {
                throw new RestException(Response.Status.NOT_FOUND, "Cannot find NamespaceIsolationPolicy " + str2 + " for cluster " + str);
            }
            asyncResponse.resume(map.get(str2));
        }).exceptionally(th -> {
            log.error("[{}] Failed to get clusters/{}/namespaceIsolationPolicies/{}", new Object[]{clientAppId(), str, th});
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission."), @ApiResponse(code = 404, message = "Namespace-isolation policies not found."), @ApiResponse(code = 412, message = "Cluster doesn't exist."), @ApiResponse(code = 500, message = "Internal server error.")})
    @Path("/{cluster}/namespaceIsolationPolicies/brokers")
    @ApiOperation(value = "Get list of brokers with namespace-isolation policies attached to them.", response = BrokerNamespaceIsolationDataImpl.class, responseContainer = "set", notes = "This operation requires Pulsar superuser privileges.")
    public void getBrokersWithNamespaceIsolationPolicy(@Suspended AsyncResponse asyncResponse, @PathParam("cluster") @ApiParam(value = "The cluster name", required = true) String str) {
        CompletableFuture thenCompose = validateBothSuperuserAndClusterPolicyOperation(str, PolicyName.NAMESPACE_ISOLATION, PolicyOperation.READ).thenCompose(r6 -> {
            return validateClusterExistAsync(str, Response.Status.PRECONDITION_FAILED);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r3 -> {
            return pulsar().getLoadManager().get().getAvailableBrokersAsync();
        }).thenCompose(set -> {
            return internalGetNamespaceIsolationPolicies(str).thenApply(map -> {
                return (List) set.stream().map(str2 -> {
                    return internalGetBrokerNsIsolationData(str2, map);
                }).collect(Collectors.toList());
            });
        });
        Objects.requireNonNull(asyncResponse);
        thenCompose.thenAccept((v1) -> {
            r1.resume(v1);
        }).exceptionally(th -> {
            log.error("[{}] Failed to get namespace isolation-policies {}", new Object[]{clientAppId(), str, th});
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }

    private BrokerNamespaceIsolationData internalGetBrokerNsIsolationData(String str, Map<String, NamespaceIsolationDataImpl> map) {
        BrokerNamespaceIsolationData.Builder brokerName = BrokerNamespaceIsolationData.builder().brokerName(str);
        if (map == null) {
            return brokerName.build();
        }
        ArrayList arrayList = new ArrayList();
        map.forEach((str2, namespaceIsolationDataImpl) -> {
            NamespaceIsolationPolicyImpl namespaceIsolationPolicyImpl = new NamespaceIsolationPolicyImpl(namespaceIsolationDataImpl);
            if (namespaceIsolationPolicyImpl.isPrimaryBroker(str) || namespaceIsolationPolicyImpl.isSecondaryBroker(str)) {
                arrayList.addAll(namespaceIsolationDataImpl.getNamespaces());
                brokerName.primary(namespaceIsolationPolicyImpl.isPrimaryBroker(str));
                brokerName.policyName(str2);
            }
        });
        brokerName.namespaceRegex(arrayList);
        return brokerName.build();
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission."), @ApiResponse(code = 404, message = "Namespace-isolation policies/ Broker not found."), @ApiResponse(code = 412, message = "Cluster doesn't exist."), @ApiResponse(code = 500, message = "Internal server error.")})
    @Path("/{cluster}/namespaceIsolationPolicies/brokers/{broker}")
    @ApiOperation(value = "Get a broker with namespace-isolation policies attached to it.", response = BrokerNamespaceIsolationDataImpl.class, notes = "This operation requires Pulsar superuser privileges.")
    public void getBrokerWithNamespaceIsolationPolicy(@Suspended AsyncResponse asyncResponse, @PathParam("cluster") @ApiParam(value = "The cluster name", required = true) String str, @PathParam("broker") @ApiParam(value = "The broker name (<broker-hostname>:<web-service-port>)", required = true, example = "broker1:8080") String str2) {
        CompletableFuture thenApply = validateBothSuperuserAndClusterPolicyOperation(str, PolicyName.NAMESPACE_ISOLATION, PolicyOperation.READ).thenCompose(r6 -> {
            return validateClusterExistAsync(str, Response.Status.PRECONDITION_FAILED);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r5 -> {
            return internalGetNamespaceIsolationPolicies(str);
        }).thenApply(map -> {
            return internalGetBrokerNsIsolationData(str2, map);
        });
        Objects.requireNonNull(asyncResponse);
        thenApply.thenAccept((v1) -> {
            r1.resume(v1);
        }).exceptionally(th -> {
            log.error("[{}] Failed to get namespace isolation-policies {}", new Object[]{clientAppId(), str, th});
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 204, message = "Set namespace isolation policy successfully."), @ApiResponse(code = 400, message = "Namespace isolation policy data is invalid."), @ApiResponse(code = 403, message = "Don't have admin permission or policies are read-only."), @ApiResponse(code = 404, message = "Namespace isolation policy doesn't exist."), @ApiResponse(code = 412, message = "Cluster doesn't exist."), @ApiResponse(code = 500, message = "Internal server error.")})
    @Path("/{cluster}/namespaceIsolationPolicies/{policyName}")
    @ApiOperation(value = "Set namespace isolation policy.", notes = "This operation requires Pulsar superuser privileges.")
    @POST
    public void setNamespaceIsolationPolicy(@Suspended AsyncResponse asyncResponse, @PathParam("cluster") @ApiParam(value = "The cluster name", required = true) String str, @PathParam("policyName") @ApiParam(value = "The namespace isolation policy name", required = true) String str2, @ApiParam(value = "The namespace isolation policy data", required = true) NamespaceIsolationDataImpl namespaceIsolationDataImpl) {
        validateBothSuperuserAndClusterPolicyOperation(str, PolicyName.NAMESPACE_ISOLATION, PolicyOperation.WRITE).thenCompose(r3 -> {
            return validatePoliciesReadOnlyAccessAsync();
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r6 -> {
            return validateClusterExistAsync(str, Response.Status.PRECONDITION_FAILED);
        }).thenCompose(r62 -> {
            namespaceIsolationDataImpl.validate();
            return namespaceIsolationPolicies().getIsolationDataPoliciesAsync(str);
        }).thenCompose(optional -> {
            return (CompletionStage) optional.map((v0) -> {
                return CompletableFuture.completedFuture(v0);
            }).orElseGet(() -> {
                return namespaceIsolationPolicies().setIsolationDataWithCreateAsync(str, optional -> {
                    return Collections.emptyMap();
                }).thenApply(r32 -> {
                    return new NamespaceIsolationPolicies();
                });
            });
        }).thenCompose(namespaceIsolationPolicies -> {
            NamespaceIsolationDataImpl namespaceIsolationDataImpl2 = (NamespaceIsolationDataImpl) namespaceIsolationPolicies.getPolicies().getOrDefault(str2, null);
            namespaceIsolationPolicies.setPolicy(str2, namespaceIsolationDataImpl);
            return namespaceIsolationPolicies().setIsolationDataAsync(str, map -> {
                return namespaceIsolationPolicies.getPolicies();
            }).thenApply(r32 -> {
                return namespaceIsolationDataImpl2;
            });
        }).thenCompose(namespaceIsolationDataImpl2 -> {
            return filterAndUnloadMatchedNamespaceAsync(str, namespaceIsolationDataImpl, namespaceIsolationDataImpl2);
        }).thenAccept(r11 -> {
            log.info("[{}] Successful to update clusters/{}/namespaceIsolationPolicies/{}.", new Object[]{clientAppId(), str, str2});
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            String str3;
            Throwable unwrapCompletionException = FutureUtil.unwrapCompletionException(th);
            if (unwrapCompletionException instanceof IllegalArgumentException) {
                try {
                    str3 = JsonUtil.toJson(namespaceIsolationDataImpl);
                } catch (JsonUtil.ParseJsonException e) {
                    str3 = "[Failed to serialize]";
                }
                asyncResponse.resume(new RestException(Response.Status.BAD_REQUEST, "Invalid format of input policy data. policy: " + str2 + "; data: " + str3));
                return null;
            }
            if (unwrapCompletionException instanceof MetadataStoreException.NotFoundException) {
                log.warn("[{}] Failed to update clusters/{}/namespaceIsolationPolicies: Does not exist", clientAppId(), str);
                asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "NamespaceIsolationPolicies for cluster " + str + " does not exist"));
                return null;
            }
            log.info("[{}] Failed to update clusters/{}/namespaceIsolationPolicies/{}. Input data is invalid", new Object[]{clientAppId(), str, str2, unwrapCompletionException});
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }

    private CompletableFuture<Void> filterAndUnloadMatchedNamespaceAsync(String str, NamespaceIsolationDataImpl namespaceIsolationDataImpl, NamespaceIsolationDataImpl namespaceIsolationDataImpl2) {
        if (NamespaceIsolationPolicyUnloadScope.none.equals(namespaceIsolationDataImpl.getUnloadScope())) {
            return CompletableFuture.completedFuture(null);
        }
        try {
            PulsarAdmin adminClient = pulsar().getAdminClient();
            List list = namespaceIsolationDataImpl.getNamespaces().stream().map(Pattern::compile).toList();
            return adminClient.tenants().getTenantsAsync().thenCompose(list2 -> {
                List list2 = list2.stream().map(str2 -> {
                    return adminClient.namespaces().getNamespacesAsync(str2).thenCompose(list3 -> {
                        List list3 = (List) list3.stream().filter(str2 -> {
                            return list.stream().anyMatch(pattern -> {
                                return pattern.matcher(str2).matches();
                            });
                        }).map(str3 -> {
                            return adminClient.namespaces().getPoliciesAsync(str3).thenApply(policies -> {
                                if (policies.replication_clusters.contains(str)) {
                                    return str3;
                                }
                                return null;
                            });
                        }).collect(Collectors.toList());
                        return FutureUtil.waitForAll(list3).thenApply(r4 -> {
                            return (List) list3.stream().map((v0) -> {
                                return v0.join();
                            }).filter((v0) -> {
                                return Objects.nonNull(v0);
                            }).collect(Collectors.toList());
                        });
                    });
                }).toList();
                return FutureUtil.waitForAll(list2).thenApply(r4 -> {
                    return (List) list2.stream().map((v0) -> {
                        return v0.join();
                    }).flatMap((v0) -> {
                        return v0.stream();
                    }).collect(Collectors.toList());
                });
            }).thenCompose(list3 -> {
                if (CollectionUtils.isEmpty(list3)) {
                    return CompletableFuture.completedFuture(null);
                }
                log.debug("Old policy: {} ; new policy: {}", namespaceIsolationDataImpl2, namespaceIsolationDataImpl);
                if (namespaceIsolationDataImpl2 != null && NamespaceIsolationPolicyUnloadScope.changed.equals(namespaceIsolationDataImpl.getUnloadScope()) && CollectionUtils.isEqualCollection(namespaceIsolationDataImpl2.getPrimary(), namespaceIsolationDataImpl.getPrimary())) {
                    HashSet hashSet = new HashSet(namespaceIsolationDataImpl2.getNamespaces());
                    hashSet.addAll(namespaceIsolationDataImpl.getNamespaces());
                    HashSet hashSet2 = new HashSet(namespaceIsolationDataImpl2.getNamespaces());
                    hashSet2.retainAll(namespaceIsolationDataImpl.getNamespaces());
                    log.debug("combined regexes: {}; common regexes:{}", hashSet, hashSet);
                    hashSet.removeAll(hashSet2);
                    log.debug("changed regexes: {}", hashSet2);
                    list3 = list3.stream().filter(str2 -> {
                        return hashSet.stream().map(Pattern::compile).anyMatch(pattern -> {
                            return pattern.matcher(str2).matches();
                        });
                    }).toList();
                }
                return FutureUtil.waitForAll((List) list3.stream().map(str3 -> {
                    return adminClient.namespaces().unloadAsync(str3);
                }).collect(Collectors.toList())).thenAccept(r6 -> {
                    try {
                        pulsar().getLoadManager().get().writeLoadReportOnZookeeper(true);
                    } catch (Exception e) {
                        log.warn("[{}] Failed to writeLoadReportOnZookeeper.", clientAppId(), e);
                    }
                });
            });
        } catch (PulsarServerException e) {
            return FutureUtil.failedFuture(e);
        }
    }

    @ApiResponses({@ApiResponse(code = 204, message = "Delete namespace isolation policy successfully."), @ApiResponse(code = 403, message = "Don't have admin permission or policies are read only."), @ApiResponse(code = 404, message = "Namespace isolation policy doesn't exist."), @ApiResponse(code = 412, message = "Cluster doesn't exist."), @ApiResponse(code = 500, message = "Internal server error.")})
    @Path("/{cluster}/namespaceIsolationPolicies/{policyName}")
    @DELETE
    @ApiOperation(value = "Delete namespace isolation policy.", notes = "This operation requires Pulsar superuser privileges.")
    public void deleteNamespaceIsolationPolicy(@Suspended AsyncResponse asyncResponse, @PathParam("cluster") @ApiParam(value = "The cluster name", required = true) String str, @PathParam("policyName") @ApiParam(value = "The namespace isolation policy name", required = true) String str2) {
        validateBothSuperuserAndClusterPolicyOperation(str, PolicyName.NAMESPACE_ISOLATION, PolicyOperation.WRITE).thenCompose(r6 -> {
            return validateClusterExistAsync(str, Response.Status.PRECONDITION_FAILED);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r3 -> {
            return validatePoliciesReadOnlyAccessAsync();
        }).thenCompose(r5 -> {
            return namespaceIsolationPolicies().getIsolationDataPoliciesAsync(str);
        }).thenCompose(optional -> {
            return (CompletionStage) optional.map((v0) -> {
                return CompletableFuture.completedFuture(v0);
            }).orElseGet(() -> {
                return namespaceIsolationPolicies().setIsolationDataWithCreateAsync(str, optional -> {
                    return Collections.emptyMap();
                }).thenApply(r32 -> {
                    return new NamespaceIsolationPolicies();
                });
            });
        }).thenCompose(namespaceIsolationPolicies -> {
            namespaceIsolationPolicies.deletePolicy(str2);
            return namespaceIsolationPolicies().setIsolationDataAsync(str, map -> {
                return namespaceIsolationPolicies.getPolicies();
            });
        }).thenAccept(r4 -> {
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            if (FutureUtil.unwrapCompletionException(th) instanceof MetadataStoreException.NotFoundException) {
                log.warn("[{}] Failed to update brokers/{}/namespaceIsolationPolicies: Does not exist", clientAppId(), str);
                asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "NamespaceIsolationPolicies for cluster " + str + " does not exist"));
                return null;
            }
            log.error("[{}] Failed to update brokers/{}/namespaceIsolationPolicies/{}", new Object[]{clientAppId(), str, str2, th});
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 204, message = "Set the failure domain of the cluster successfully."), @ApiResponse(code = 403, message = "Don't have admin permission."), @ApiResponse(code = 404, message = "Failure domain doesn't exist."), @ApiResponse(code = 409, message = "Broker already exists in another domain."), @ApiResponse(code = 412, message = "Cluster doesn't exist."), @ApiResponse(code = 500, message = "Internal server error.")})
    @Path("/{cluster}/failureDomains/{domainName}")
    @ApiOperation(value = "Set the failure domain of the cluster.", notes = "This operation requires Pulsar superuser privileges.")
    @POST
    public void setFailureDomain(@Suspended AsyncResponse asyncResponse, @PathParam("cluster") @ApiParam(value = "The cluster name", required = true) String str, @PathParam("domainName") @ApiParam(value = "The failure domain name", required = true) String str2, @ApiParam(value = "The configuration data of a failure domain", required = true) FailureDomainImpl failureDomainImpl) {
        validateBothSuperuserAndClusterOperation(str, ClusterOperation.UPDATE_FAILURE_DOMAIN).thenCompose(r6 -> {
            return validateClusterExistAsync(str, Response.Status.PRECONDITION_FAILED);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r9 -> {
            return validateBrokerExistsInOtherDomain(str, str2, failureDomainImpl);
        }).thenCompose(r92 -> {
            return clusterResources().getFailureDomainResources().setFailureDomainWithCreateAsync(str, str2, optional -> {
                return failureDomainImpl;
            });
        }).thenAccept(r11 -> {
            log.info("[{}] Successful set failure domain {} for cluster {}", new Object[]{clientAppId(), str2, str});
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            if (FutureUtil.unwrapCompletionException(th) instanceof MetadataStoreException.NotFoundException) {
                log.warn("[{}] Failed to update domain {}. clusters {}  Does not exist", new Object[]{clientAppId(), str, str2});
                asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Domain " + str2 + " for cluster " + str + " does not exist"));
                return null;
            }
            log.error("[{}] Failed to update clusters/{}/domainName/{}", new Object[]{clientAppId(), str, str2, th});
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 500, message = "Internal server error")})
    @Path("/{cluster}/failureDomains")
    @ApiOperation(value = "Get the cluster failure domains.", response = FailureDomainImpl.class, responseContainer = "Map", notes = "This operation requires Pulsar superuser privileges.")
    public void getFailureDomains(@Suspended AsyncResponse asyncResponse, @PathParam("cluster") @ApiParam(value = "The cluster name", required = true) String str) {
        CompletableFuture<U> thenCompose = validateBothSuperuserAndClusterOperation(str, ClusterOperation.GET_FAILURE_DOMAIN).thenCompose(r6 -> {
            return clusterResources().getFailureDomainResources().listFailureDomainsAsync(str).thenCompose(list -> {
                List list = (List) list.stream().map(str2 -> {
                    return clusterResources().getFailureDomainResources().getFailureDomainAsync(str, str2).thenApply(optional -> {
                        return Pair.of(str2, optional);
                    }).exceptionally(th -> {
                        log.warn("Failed to get domain {}", str2, th);
                        return null;
                    });
                }).collect(Collectors.toList());
                return FutureUtil.waitForAll(list).thenApply(r5 -> {
                    return (Map) list.stream().map((v0) -> {
                        return v0.join();
                    }).filter((v0) -> {
                        return Objects.nonNull(v0);
                    }).filter(pair -> {
                        return ((Optional) pair.getRight()).isPresent();
                    }).collect(Collectors.toMap((v0) -> {
                        return v0.getLeft();
                    }, pair2 -> {
                        return (FailureDomainImpl) ((Optional) pair2.getRight()).get();
                    }));
                });
            }).exceptionally(th -> {
                if (!(FutureUtil.unwrapCompletionException(th) instanceof MetadataStoreException.NotFoundException)) {
                    throw FutureUtil.wrapToCompletionException(th);
                }
                log.warn("[{}] Failure-domain is not configured for cluster {}", new Object[]{clientAppId(), str, th});
                return Collections.emptyMap();
            });
        });
        Objects.requireNonNull(asyncResponse);
        thenCompose.thenAccept((Consumer<? super U>) (v1) -> {
            r1.resume(v1);
        }).exceptionally(th -> {
            log.error("[{}] Failed to get failure-domains for cluster {}", new Object[]{clientAppId(), str, th});
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "FailureDomain doesn't exist"), @ApiResponse(code = 412, message = "Cluster doesn't exist"), @ApiResponse(code = 500, message = "Internal server error")})
    @Path("/{cluster}/failureDomains/{domainName}")
    @ApiOperation(value = "Get a domain in a cluster", response = FailureDomainImpl.class, notes = "This operation requires Pulsar superuser privileges.")
    public void getDomain(@Suspended AsyncResponse asyncResponse, @PathParam("cluster") @ApiParam(value = "The cluster name", required = true) String str, @PathParam("domainName") @ApiParam(value = "The failure domain name", required = true) String str2) {
        validateBothSuperuserAndClusterOperation(str, ClusterOperation.GET_FAILURE_DOMAIN).thenCompose(r6 -> {
            return validateClusterExistAsync(str, Response.Status.PRECONDITION_FAILED);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r7 -> {
            return clusterResources().getFailureDomainResources().getFailureDomainAsync(str, str2);
        }).thenAccept(optional -> {
            asyncResponse.resume((FailureDomainImpl) optional.orElseThrow(() -> {
                return new RestException(Response.Status.NOT_FOUND, "Domain " + str2 + " for cluster " + str + " does not exist");
            }));
        }).exceptionally(th -> {
            log.error("[{}] Failed to get domain {} for cluster {}", new Object[]{clientAppId(), str2, str, th});
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }

    @ApiResponses({@ApiResponse(code = 200, message = "Delete the failure domain of the cluster successfully"), @ApiResponse(code = 403, message = "Don't have admin permission or policy is read only"), @ApiResponse(code = 404, message = "FailureDomain doesn't exist"), @ApiResponse(code = 412, message = "Cluster doesn't exist"), @ApiResponse(code = 500, message = "Internal server error")})
    @Path("/{cluster}/failureDomains/{domainName}")
    @DELETE
    @ApiOperation(value = "Delete the failure domain of the cluster", notes = "This operation requires Pulsar superuser privileges.")
    public void deleteFailureDomain(@Suspended AsyncResponse asyncResponse, @PathParam("cluster") @ApiParam(value = "The cluster name", required = true) String str, @PathParam("domainName") @ApiParam(value = "The failure domain name", required = true) String str2) {
        validateBothSuperuserAndClusterOperation(str, ClusterOperation.DELETE_FAILURE_DOMAIN).thenCompose(r6 -> {
            return validateClusterExistAsync(str, Response.Status.PRECONDITION_FAILED);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r7 -> {
            return clusterResources().getFailureDomainResources().deleteFailureDomainAsync(str, str2);
        }).thenAccept(r11 -> {
            log.info("[{}] Successful delete domain {} in cluster {}", new Object[]{clientAppId(), str2, str});
            asyncResponse.resume(Response.ok().build());
        }).exceptionally(th -> {
            if (FutureUtil.unwrapCompletionException(th) instanceof MetadataStoreException.NotFoundException) {
                log.warn("[{}] Domain {} does not exist in {}", new Object[]{clientAppId(), str2, str});
                asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Domain-name " + str2 + " or cluster " + str + " does not exist"));
                return null;
            }
            log.error("[{}] Failed to delete domain {} in cluster {}", new Object[]{clientAppId(), str2, str, th});
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }

    private CompletableFuture<Void> validateBrokerExistsInOtherDomain(String str, String str2, FailureDomainImpl failureDomainImpl) {
        return (failureDomainImpl == null || failureDomainImpl.brokers == null) ? CompletableFuture.completedFuture(null) : clusterResources().getFailureDomainResources().listFailureDomainsAsync(str).thenCompose(list -> {
            return FutureUtil.waitForAll((List) list.stream().filter(str3 -> {
                return !str3.equals(str2);
            }).map(str4 -> {
                return clusterResources().getFailureDomainResources().getFailureDomainAsync(str, str4).thenAccept(optional -> {
                    if (optional.isPresent() && CollectionUtils.isNotEmpty(((FailureDomainImpl) optional.get()).getBrokers())) {
                        Stream stream = (Stream) ((FailureDomainImpl) optional.get()).getBrokers().stream().parallel();
                        Set set = failureDomainImpl.brokers;
                        Objects.requireNonNull(set);
                        List list = (List) stream.filter((v1) -> {
                            return r1.contains(v1);
                        }).collect(Collectors.toList());
                        if (CollectionUtils.isNotEmpty(list)) {
                            throw new RestException(Response.Status.CONFLICT, list + " already exists in " + str4);
                        }
                    }
                }).exceptionally(th -> {
                    Throwable unwrapCompletionException = FutureUtil.unwrapCompletionException(th);
                    if (unwrapCompletionException instanceof WebApplicationException) {
                        throw FutureUtil.wrapToCompletionException(th);
                    }
                    if (!(unwrapCompletionException instanceof MetadataStoreException.NotFoundException)) {
                        log.warn("Failed to get domain {}", str4, th);
                        return null;
                    }
                    if (!log.isDebugEnabled()) {
                        return null;
                    }
                    log.debug("[{}] Domain is not configured for cluster", clientAppId(), th);
                    return null;
                });
            }).collect(Collectors.toList()));
        });
    }

    private CompletableFuture<Void> validateBothSuperuserAndClusterOperation(String str, ClusterOperation clusterOperation) {
        CompletableFuture<Void> validateSuperUserAccessAsync = validateSuperUserAccessAsync();
        CompletableFuture<Void> validateClusterOperationAsync = validateClusterOperationAsync(str, clusterOperation);
        return FutureUtil.waitForAll(List.of(validateSuperUserAccessAsync, validateClusterOperationAsync)).handle((r14, th) -> {
            if (!validateSuperUserAccessAsync.isCompletedExceptionally() || !validateClusterOperationAsync.isCompletedExceptionally()) {
                return null;
            }
            if (log.isDebugEnabled()) {
                Throwable th = null;
                try {
                    validateSuperUserAccessAsync.join();
                } catch (Throwable th2) {
                    th = FutureUtil.unwrapCompletionException(th2);
                }
                Throwable th3 = null;
                try {
                    validateClusterOperationAsync.join();
                } catch (Throwable th4) {
                    th3 = FutureUtil.unwrapCompletionException(th4);
                }
                log.debug("validateBothSuperuserAndClusterOperation failed. originalPrincipal={} clientAppId={} operation={} cluster={} superuserValidationError={} clusterOperationValidationError={}", new Object[]{originalPrincipal(), clientAppId(), clusterOperation.toString(), str, th, th3});
            }
            throw new RestException(Response.Status.UNAUTHORIZED, String.format("Unauthorized to validateBothSuperuserAndClusterOperation for originalPrincipal [%s] and clientAppId [%s] about operation [%s] on cluster [%s]", originalPrincipal(), clientAppId(), clusterOperation.toString(), str));
        });
    }

    private CompletableFuture<Void> validateBothSuperuserAndClusterPolicyOperation(String str, PolicyName policyName, PolicyOperation policyOperation) {
        CompletableFuture<Void> validateSuperUserAccessAsync = validateSuperUserAccessAsync();
        CompletableFuture<Void> validateClusterPolicyOperationAsync = validateClusterPolicyOperationAsync(str, policyName, policyOperation);
        return FutureUtil.waitForAll(List.of(validateSuperUserAccessAsync, validateClusterPolicyOperationAsync)).handle((r14, th) -> {
            if (!validateSuperUserAccessAsync.isCompletedExceptionally() || !validateClusterPolicyOperationAsync.isCompletedExceptionally()) {
                return null;
            }
            if (log.isDebugEnabled()) {
                Throwable th = null;
                try {
                    validateSuperUserAccessAsync.join();
                } catch (Throwable th2) {
                    th = FutureUtil.unwrapCompletionException(th2);
                }
                Throwable th3 = null;
                try {
                    validateClusterPolicyOperationAsync.join();
                } catch (Throwable th4) {
                    th3 = FutureUtil.unwrapCompletionException(th4);
                }
                log.debug("validateBothSuperuserAndClusterPolicyOperation failed. originalPrincipal={} clientAppId={} operation={} cluster={} superuserValidationError={} clusterOperationValidationError={}", new Object[]{originalPrincipal(), clientAppId(), policyOperation.toString(), str, th, th3});
            }
            throw new RestException(Response.Status.UNAUTHORIZED, String.format("Unauthorized to validateBothSuperuserAndClusterPolicyOperation for originalPrincipal [%s] and clientAppId [%s] about operation [%s] on cluster [%s]", originalPrincipal(), clientAppId(), policyOperation.toString(), str));
        });
    }

    private CompletableFuture<Void> validateClusterOperationAsync(String str, ClusterOperation clusterOperation) {
        PulsarService pulsar = pulsar();
        return (pulsar.getBrokerService().isAuthenticationEnabled() && pulsar.getBrokerService().isAuthorizationEnabled()) ? pulsar.getBrokerService().getAuthorizationService().allowClusterOperationAsync(str, clusterOperation, originalPrincipal(), clientAppId(), clientAuthData()).thenAccept(bool -> {
            if (!bool.booleanValue()) {
                throw new RestException(Response.Status.UNAUTHORIZED, String.format("Unauthorized to validateClusterOperation for originalPrincipal [%s] and clientAppId [%s] about operation [%s] on cluster [%s]", originalPrincipal(), clientAppId(), clusterOperation.toString(), str));
            }
        }) : CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<Void> validateClusterPolicyOperationAsync(String str, PolicyName policyName, PolicyOperation policyOperation) {
        PulsarService pulsar = pulsar();
        return (pulsar.getBrokerService().isAuthenticationEnabled() && pulsar.getBrokerService().isAuthorizationEnabled()) ? pulsar.getBrokerService().getAuthorizationService().allowClusterPolicyOperationAsync(str, policyName, policyOperation, originalPrincipal(), clientAppId(), clientAuthData()).thenAccept(bool -> {
            if (!bool.booleanValue()) {
                throw new RestException(Response.Status.UNAUTHORIZED, String.format("Unauthorized to validateClusterPolicyOperation for originalPrincipal [%s] and clientAppId [%s] about operation [%s] on cluster [%s]", originalPrincipal(), clientAppId(), policyOperation.toString(), str));
            }
        }) : CompletableFuture.completedFuture(null);
    }
}
