package io.specmesh.kafka.admin;

import io.specmesh.kafka.admin.SmAdminClient;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.LogDirDescription;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.ReplicaInfo;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;

/* loaded from: input_file:io/specmesh/kafka/admin/SimpleAdminClient.class */
public class SimpleAdminClient implements SmAdminClient {
    public static final long TIMEOUT = 300;
    private final Admin adminClient;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SimpleAdminClient(Admin admin) {
        this.adminClient = admin;
    }

    @Override // io.specmesh.kafka.admin.SmAdminClient
    public List<SmAdminClient.ConsumerGroup> groupsForTopicPrefix(String str) {
        try {
            List<ConsumerGroupDescription> groupsDescriptions = groupsDescriptions(str);
            Map<String, List<SmAdminClient.Partition>> groupOffsets = groupOffsets(groupsDescriptions, str);
            return (List) groupsDescriptions.stream().map(consumerGroupDescription -> {
                SmAdminClient.ConsumerGroup.ConsumerGroupBuilder builder = SmAdminClient.ConsumerGroup.builder();
                builder.id(consumerGroupDescription.groupId());
                builder.members((List) consumerGroupDescription.members().stream().map(memberDescription -> {
                    return SmAdminClient.Member.builder().id(memberDescription.consumerId()).host(memberDescription.host()).clientId(memberDescription.clientId()).build();
                }).collect(Collectors.toList()));
                builder.partitions((List) groupOffsets.getOrDefault(consumerGroupDescription.groupId(), List.of()));
                SmAdminClient.ConsumerGroup build = builder.build();
                build.calculateTotalOffset();
                return build;
            }).collect(Collectors.toList());
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw new SmAdminClient.ClientException("Failed to list consumer-groups for:" + str, e);
        }
    }

    private List<ConsumerGroupDescription> groupsDescriptions(String str) throws InterruptedException, ExecutionException, TimeoutException {
        return (List) ((Map) this.adminClient.describeConsumerGroups((List) ((Collection) this.adminClient.listConsumerGroups().all().get(300L, TimeUnit.SECONDS)).stream().filter(consumerGroupListing -> {
            return !consumerGroupListing.isSimpleConsumerGroup() && consumerGroupListing.state().isPresent() && ((ConsumerGroupState) consumerGroupListing.state().get()).equals(ConsumerGroupState.STABLE);
        }).map((v0) -> {
            return v0.groupId();
        }).collect(Collectors.toList())).all().get(300L, TimeUnit.SECONDS)).values().stream().filter(consumerGroupDescription -> {
            return isConsumingFromTopicPrefix(consumerGroupDescription, str);
        }).collect(Collectors.toList());
    }

    private Map<String, List<SmAdminClient.Partition>> groupOffsets(List<ConsumerGroupDescription> list, String str) throws InterruptedException, ExecutionException, TimeoutException {
        return (Map) ((Map) this.adminClient.listConsumerGroupOffsets((Map) list.stream().map((v0) -> {
            return v0.groupId();
        }).collect(Collectors.toMap(Function.identity(), str2 -> {
            return new ListConsumerGroupOffsetsSpec();
        }))).all().get(300L, TimeUnit.SECONDS)).entrySet().stream().filter(entry -> {
            return ((Map) entry.getValue()).keySet().stream().anyMatch(topicPartition -> {
                return topicPartition.topic().startsWith(str);
            });
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry2 -> {
            return (List) ((Map) entry2.getValue()).entrySet().stream().map(entry2 -> {
                return SmAdminClient.Partition.builder().id(((TopicPartition) entry2.getKey()).partition()).topic(((TopicPartition) entry2.getKey()).topic()).offset(((OffsetAndMetadata) entry2.getValue()).offset()).build();
            }).collect(Collectors.toList());
        }));
    }

    private boolean isConsumingFromTopicPrefix(ConsumerGroupDescription consumerGroupDescription, String str) {
        return consumerGroupDescription.members().stream().map((v0) -> {
            return v0.assignment();
        }).map((v0) -> {
            return v0.topicPartitions();
        }).flatMap((v0) -> {
            return v0.stream();
        }).anyMatch(topicPartition -> {
            return topicPartition.topic().startsWith(str);
        });
    }

    @Override // io.specmesh.kafka.admin.SmAdminClient
    public long topicVolumeUsingLogDirs(String str) {
        try {
            long j = 0;
            Iterator it = ((Map) this.adminClient.describeLogDirs(brokerIds()).allDescriptions().get(300L, TimeUnit.SECONDS)).values().iterator();
            while (it.hasNext()) {
                Iterator it2 = ((Map) it.next()).values().iterator();
                while (it2.hasNext()) {
                    for (Map.Entry entry : ((LogDirDescription) it2.next()).replicaInfos().entrySet()) {
                        if (str.equals(((TopicPartition) entry.getKey()).topic())) {
                            j += ((ReplicaInfo) entry.getValue()).size();
                        }
                    }
                }
            }
            return j;
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw new SmAdminClient.ClientException("Failed to get topicVolumeUsingLogDirs:" + str, e);
        }
    }

    @Override // io.specmesh.kafka.admin.SmAdminClient
    public List<Integer> brokerIds() {
        try {
            return (List) ((Collection) this.adminClient.describeCluster().nodes().get()).stream().mapToInt((v0) -> {
                return v0.id();
            }).boxed().collect(Collectors.toList());
        } catch (InterruptedException | ExecutionException e) {
            throw new SmAdminClient.ClientException("Failed to describe cluster to get brokerIds", e);
        }
    }

    @Override // io.specmesh.kafka.admin.SmAdminClient
    public long topicVolumeOffsets(String str) {
        try {
            TopicDescription topicDescription = (TopicDescription) ((Map) this.adminClient.describeTopics(Collections.singleton(str)).allTopicNames().get(300L, TimeUnit.SECONDS)).get(str);
            HashMap hashMap = new HashMap();
            HashMap hashMap2 = new HashMap();
            Iterator it = topicDescription.partitions().iterator();
            while (it.hasNext()) {
                TopicPartition topicPartition = new TopicPartition(str, ((TopicPartitionInfo) it.next()).partition());
                hashMap.put(topicPartition, OffsetSpec.latest());
                hashMap2.put(topicPartition, OffsetSpec.earliest());
            }
            ListOffsetsResult listOffsets = this.adminClient.listOffsets(hashMap);
            ListOffsetsResult listOffsets2 = this.adminClient.listOffsets(hashMap2);
            long j = 0;
            for (TopicPartition topicPartition2 : hashMap.keySet()) {
                j += ((ListOffsetsResult.ListOffsetsResultInfo) listOffsets.partitionResult(topicPartition2).get()).offset() - ((ListOffsetsResult.ListOffsetsResultInfo) listOffsets2.partitionResult(topicPartition2).get()).offset();
            }
            return j;
        } catch (InterruptedException | TimeoutException e) {
            throw new SmAdminClient.ClientException("Failed to get topicVolumeOffsets for topic:" + str, e);
        } catch (ExecutionException e2) {
            if (e2.toString().contains(UnknownTopicOrPartitionException.class.getName())) {
                return 0L;
            }
            throw new SmAdminClient.ClientException("Failed to get topicVolumeOffsets for topic:" + str, e2);
        }
    }
}
