package io.specmesh.kafka.provision;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.specmesh.kafka.provision.Status;
import io.specmesh.kafka.provision.TopicProvisioner;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsOptions;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreatePartitionsOptions;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;

/* loaded from: input_file:io/specmesh/kafka/provision/TopicMutators.class */
public class TopicMutators {

    /* loaded from: input_file:io/specmesh/kafka/provision/TopicMutators$CleanUnspecifiedMutator.class */
    public static final class CleanUnspecifiedMutator implements TopicMutator {
        private final boolean dryRun;
        private final Admin adminClient;

        CleanUnspecifiedMutator(boolean z, Admin admin) {
            this.dryRun = z;
            this.adminClient = admin;
        }

        @Override // io.specmesh.kafka.provision.TopicMutators.TopicMutator
        public Collection<TopicProvisioner.Topic> mutate(Collection<TopicProvisioner.Topic> collection) throws ProvisioningException {
            List<TopicProvisioner.Topic> list = (List) collection.stream().filter(topic -> {
                return (topic.state().equals(Status.STATE.CREATE) || topic.state().equals(Status.STATE.UPDATE)) ? false : true;
            }).collect(Collectors.toList());
            try {
                list.forEach(topic2 -> {
                    topic2.state(Status.STATE.DELETE);
                });
                if (!this.dryRun) {
                    this.adminClient.deleteTopics(toTopicNames(list)).all().get(60L, TimeUnit.SECONDS);
                    list.forEach(topic3 -> {
                        topic3.state(Status.STATE.DELETED);
                    });
                }
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                list.forEach(topic4 -> {
                    topic4.exception(new ProvisioningException("failed to delete topics", e));
                });
            }
            return list;
        }

        private List<String> toTopicNames(List<TopicProvisioner.Topic> list) {
            return (List) list.stream().map((v0) -> {
                return v0.name();
            }).collect(Collectors.toList());
        }
    }

    /* loaded from: input_file:io/specmesh/kafka/provision/TopicMutators$CollectiveMutator.class */
    public static final class CollectiveMutator implements TopicMutator {
        private final Stream<TopicMutator> writers;

        private CollectiveMutator(TopicMutator... topicMutatorArr) {
            this.writers = Arrays.stream(topicMutatorArr);
        }

        @Override // io.specmesh.kafka.provision.TopicMutators.TopicMutator
        public Collection<TopicProvisioner.Topic> mutate(Collection<TopicProvisioner.Topic> collection) {
            return (Collection) this.writers.map(topicMutator -> {
                return topicMutator.mutate(collection);
            }).flatMap((v0) -> {
                return v0.stream();
            }).collect(Collectors.toList());
        }
    }

    /* loaded from: input_file:io/specmesh/kafka/provision/TopicMutators$CreateMutator.class */
    public static final class CreateMutator implements TopicMutator {
        private final Admin adminClient;

        private CreateMutator(Admin admin) {
            this.adminClient = admin;
        }

        @Override // io.specmesh.kafka.provision.TopicMutators.TopicMutator
        public Collection<TopicProvisioner.Topic> mutate(Collection<TopicProvisioner.Topic> collection) throws ProvisioningException {
            List list = (List) collection.stream().filter(topic -> {
                return topic.state().equals(Status.STATE.CREATE);
            }).collect(Collectors.toList());
            try {
                this.adminClient.createTopics(asNewTopic(list)).all().get(60L, TimeUnit.SECONDS);
                return (Collection) list.stream().map(topic2 -> {
                    return topic2.state(Status.STATE.CREATED);
                }).collect(Collectors.toList());
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                list.forEach(topic3 -> {
                    topic3.exception(new ProvisioningException("failed to write topics", e));
                });
                return collection;
            }
        }

        private Collection<NewTopic> asNewTopic(Collection<TopicProvisioner.Topic> collection) {
            return (Collection) collection.stream().map(topic -> {
                return new NewTopic(topic.name(), topic.partitions(), topic.replication()).configs(topic.config());
            }).collect(Collectors.toList());
        }
    }

    /* loaded from: input_file:io/specmesh/kafka/provision/TopicMutators$NoopMutator.class */
    public static final class NoopMutator implements TopicMutator {
        @Override // io.specmesh.kafka.provision.TopicMutators.TopicMutator
        public Collection<TopicProvisioner.Topic> mutate(Collection<TopicProvisioner.Topic> collection) throws ProvisioningException {
            return collection;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/specmesh/kafka/provision/TopicMutators$TopicMutator.class */
    public interface TopicMutator {
        Collection<TopicProvisioner.Topic> mutate(Collection<TopicProvisioner.Topic> collection);
    }

    @SuppressFBWarnings(value = {"EI_EXPOSE_REP2"}, justification = "adminClient() passed as param to prevent API pollution")
    /* loaded from: input_file:io/specmesh/kafka/provision/TopicMutators$TopicMutatorBuilder.class */
    public static final class TopicMutatorBuilder {
        private Admin adminClient;
        private boolean noop;
        private boolean cleanUnspecified;
        private boolean dryRun;

        private TopicMutatorBuilder() {
        }

        public TopicMutatorBuilder adminClient(Admin admin) {
            this.adminClient = admin;
            return this;
        }

        public TopicMutatorBuilder noopMutator(boolean z) {
            this.noop = z;
            return this;
        }

        public TopicMutatorBuilder cleanUnspecified(boolean z, boolean z2) {
            this.cleanUnspecified = z;
            this.dryRun = z2;
            return this;
        }

        public static TopicMutatorBuilder builder() {
            return new TopicMutatorBuilder();
        }

        public TopicMutator build() {
            return this.cleanUnspecified ? new CleanUnspecifiedMutator(this.dryRun, this.adminClient) : this.noop ? new NoopMutator() : new CollectiveMutator(new CreateMutator(this.adminClient), new UpdateMutator(this.adminClient));
        }
    }

    /* loaded from: input_file:io/specmesh/kafka/provision/TopicMutators$UpdateMutator.class */
    public static final class UpdateMutator implements TopicMutator {
        private final Admin adminClient;

        UpdateMutator(Admin admin) {
            this.adminClient = admin;
        }

        @Override // io.specmesh.kafka.provision.TopicMutators.TopicMutator
        public Collection<TopicProvisioner.Topic> mutate(Collection<TopicProvisioner.Topic> collection) throws ProvisioningException {
            List<TopicProvisioner.Topic> list = (List) collection.stream().filter(topic -> {
                return topic.state().equals(Status.STATE.UPDATE);
            }).collect(Collectors.toList());
            Map map = this.adminClient.describeTopics(toTopicNames(list)).topicNameValues();
            list.forEach(topic2 -> {
                try {
                    updatePartitions(topic2, (TopicDescription) ((KafkaFuture) map.get(topic2.name())).get(60L, TimeUnit.SECONDS));
                    updateConfigs(topic2);
                } catch (InterruptedException | ExecutionException | TimeoutException e) {
                    throw new ProvisioningException("Failed to update configs", e);
                }
            });
            return list;
        }

        private List<String> toTopicNames(List<TopicProvisioner.Topic> list) {
            return (List) list.stream().map((v0) -> {
                return v0.name();
            }).collect(Collectors.toList());
        }

        private void updateConfigs(TopicProvisioner.Topic topic) {
            try {
                this.adminClient.incrementalAlterConfigs(Map.of(new ConfigResource(ConfigResource.Type.TOPIC, topic.name()), (List) topic.config().entrySet().stream().map(entry -> {
                    return new AlterConfigOp(new ConfigEntry((String) entry.getKey(), (String) entry.getValue()), AlterConfigOp.OpType.SET);
                }).collect(Collectors.toList())), new AlterConfigsOptions().timeoutMs(60));
                topic.messages(topic.messages() + "\nUpdated config: retention.ms -> " + topic.config().get("retention.ms"));
                topic.state(Status.STATE.UPDATED);
            } catch (Exception e) {
                topic.exception(new ProvisioningException("Failed to update config ", e));
            }
        }

        private void updatePartitions(TopicProvisioner.Topic topic, TopicDescription topicDescription) {
            try {
                if (topicDescription.partitions().size() < topic.partitions()) {
                    this.adminClient.createPartitions(Map.of(topic.name(), NewPartitions.increaseTo(topic.partitions())), new CreatePartitionsOptions().retryOnQuotaViolation(false)).all().get(60L, TimeUnit.SECONDS);
                    topic.state(Status.STATE.UPDATED);
                    topic.messages(topic.messages() + "\n Updated partitionCount");
                } else {
                    topic.messages(topic.messages() + "\n Ignoring partition increase because new count is not higher");
                }
            } catch (Exception e) {
                topic.exception(new ProvisioningException("Failed to update partitions", e));
            }
        }
    }
}
