package io.kroxylicious.testing.kafka.common;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.Node;
import org.awaitility.Awaitility;
import org.hamcrest.Matchers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/kroxylicious/testing/kafka/common/Utils.class */
public class Utils {
    private static final Logger log = LoggerFactory.getLogger(Utils.class);

    public static void awaitExpectedBrokerCountInCluster(Map<String, Object> map, int i, TimeUnit timeUnit, Integer num) {
        Set synchronizedSet = Collections.synchronizedSet(new HashSet());
        Set synchronizedSet2 = Collections.synchronizedSet(new HashSet());
        synchronizedSet2.addAll(Arrays.asList(String.valueOf(map.get("bootstrap.servers")).split(",")));
        while (synchronizedSet.size() < num.intValue() && !synchronizedSet2.isEmpty()) {
            String str = (String) synchronizedSet2.iterator().next();
            HashMap hashMap = new HashMap(map);
            hashMap.put("bootstrap.servers", str);
            Admin create = Admin.create(hashMap);
            try {
                Awaitility.await().pollDelay(Duration.ZERO).pollInterval(1L, TimeUnit.SECONDS).atMost(i, timeUnit).ignoreExceptions().until(() -> {
                    log.debug("describing cluster using address: {}", str);
                    try {
                        ((Node) create.describeCluster().controller().get()).id();
                        Collection collection = (Collection) create.describeCluster().nodes().get(10L, TimeUnit.SECONDS);
                        log.debug("{} sees peers: {}", str, collection);
                        Stream map2 = collection.stream().filter(Predicate.not((v0) -> {
                            return v0.isEmpty();
                        })).map(Utils::nodeToAddr);
                        Objects.requireNonNull(synchronizedSet);
                        synchronizedSet2.addAll((Collection) map2.filter(Predicate.not((v1) -> {
                            return r2.contains(v1);
                        })).collect(Collectors.toSet()));
                        return collection;
                    } catch (InterruptedException | ExecutionException e) {
                        log.warn("caught: {}", e.getMessage(), e);
                        return Collections.emptyList();
                    } catch (TimeoutException e2) {
                        log.warn("Kafka timed out describing the the cluster");
                        return Collections.emptyList();
                    }
                }, Matchers.hasSize(num.intValue()));
                if (create != null) {
                    create.close();
                }
                synchronizedSet.add(str);
                synchronizedSet2.remove(str);
            } catch (Throwable th) {
                if (create != null) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        int size = synchronizedSet.size();
        if (size < num.intValue()) {
            throw new IllegalArgumentException(String.format("Too few broker(s) became ready (%d), expected %d.", Integer.valueOf(size), num));
        }
    }

    private static String nodeToAddr(Node node) {
        return node.host() + ":" + node.port();
    }
}
