package io.specmesh.kafka.provision;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.specmesh.kafka.provision.Provisioner;
import io.specmesh.kafka.provision.Status;
import io.specmesh.kafka.provision.TopicProvisioner;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsOptions;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreatePartitionsOptions;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;

/* loaded from: input_file:io/specmesh/kafka/provision/TopicWriters.class */
public class TopicWriters {

    /* loaded from: input_file:io/specmesh/kafka/provision/TopicWriters$CollectiveWriter.class */
    public static final class CollectiveWriter implements TopicWriter {
        private final Stream<TopicWriter> writers;

        private CollectiveWriter(TopicWriter... topicWriterArr) {
            this.writers = Arrays.stream(topicWriterArr);
        }

        @Override // io.specmesh.kafka.provision.TopicWriters.TopicWriter
        public Collection<TopicProvisioner.Topic> write(Collection<TopicProvisioner.Topic> collection) {
            return (Collection) this.writers.map(topicWriter -> {
                return topicWriter.write(collection);
            }).flatMap((v0) -> {
                return v0.stream();
            }).collect(Collectors.toList());
        }
    }

    /* loaded from: input_file:io/specmesh/kafka/provision/TopicWriters$CreateWriter.class */
    public static final class CreateWriter implements TopicWriter {
        private final Admin adminClient;

        private CreateWriter(Admin admin) {
            this.adminClient = admin;
        }

        @Override // io.specmesh.kafka.provision.TopicWriters.TopicWriter
        public Collection<TopicProvisioner.Topic> write(Collection<TopicProvisioner.Topic> collection) throws Provisioner.ProvisioningException {
            List list = (List) collection.stream().filter(topic -> {
                return topic.state().equals(Status.STATE.CREATE);
            }).collect(Collectors.toList());
            try {
                this.adminClient.createTopics(asNewTopic(list)).all().get(60L, TimeUnit.SECONDS);
                return (Collection) list.stream().map(topic2 -> {
                    return topic2.state(Status.STATE.CREATED);
                }).collect(Collectors.toList());
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                list.forEach(topic3 -> {
                    topic3.exception(new Provisioner.ProvisioningException("failed to write topics", e)).state(Status.STATE.FAILED);
                });
                return collection;
            }
        }

        private Collection<NewTopic> asNewTopic(Collection<TopicProvisioner.Topic> collection) {
            return (Collection) collection.stream().map(topic -> {
                return new NewTopic(topic.name(), topic.partitions(), topic.replication()).configs(topic.config());
            }).collect(Collectors.toList());
        }
    }

    /* loaded from: input_file:io/specmesh/kafka/provision/TopicWriters$NoopWriter.class */
    public static final class NoopWriter implements TopicWriter {
        @Override // io.specmesh.kafka.provision.TopicWriters.TopicWriter
        public Collection<TopicProvisioner.Topic> write(Collection<TopicProvisioner.Topic> collection) throws Provisioner.ProvisioningException {
            return collection;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/specmesh/kafka/provision/TopicWriters$TopicWriter.class */
    public interface TopicWriter {
        Collection<TopicProvisioner.Topic> write(Collection<TopicProvisioner.Topic> collection);
    }

    @SuppressFBWarnings(value = {"EI_EXPOSE_REP2"}, justification = "adminClient() passed as param to prevent API pollution")
    /* loaded from: input_file:io/specmesh/kafka/provision/TopicWriters$TopicWriterBuilder.class */
    public static final class TopicWriterBuilder {
        private Admin adminClient;
        private boolean noopWriter;

        private TopicWriterBuilder() {
        }

        public TopicWriterBuilder adminClient(Admin admin) {
            this.adminClient = admin;
            return this;
        }

        public TopicWriterBuilder noopWriter(boolean z) {
            this.noopWriter = z;
            return this;
        }

        public static TopicWriterBuilder builder() {
            return new TopicWriterBuilder();
        }

        public TopicWriter build() {
            return this.noopWriter ? new NoopWriter() : new CollectiveWriter(new CreateWriter(this.adminClient), new UpdateWriter(this.adminClient));
        }
    }

    /* loaded from: input_file:io/specmesh/kafka/provision/TopicWriters$UpdateWriter.class */
    public static final class UpdateWriter implements TopicWriter {
        private final Admin adminClient;

        UpdateWriter(Admin admin) {
            this.adminClient = admin;
        }

        @Override // io.specmesh.kafka.provision.TopicWriters.TopicWriter
        public Collection<TopicProvisioner.Topic> write(Collection<TopicProvisioner.Topic> collection) throws Provisioner.ProvisioningException {
            List<TopicProvisioner.Topic> list = (List) collection.stream().filter(topic -> {
                return topic.state().equals(Status.STATE.UPDATE);
            }).collect(Collectors.toList());
            Map map = this.adminClient.describeTopics(toTopicNames(list)).topicNameValues();
            list.forEach(topic2 -> {
                try {
                    updatePartitions(topic2, (TopicDescription) ((KafkaFuture) map.get(topic2.name())).get(60L, TimeUnit.SECONDS));
                    updateConfigs(topic2);
                } catch (InterruptedException | ExecutionException | TimeoutException e) {
                    throw new Provisioner.ProvisioningException("Failed to update configs", e);
                }
            });
            return list;
        }

        private List<String> toTopicNames(List<TopicProvisioner.Topic> list) {
            return (List) list.stream().map((v0) -> {
                return v0.name();
            }).collect(Collectors.toList());
        }

        private void updateConfigs(TopicProvisioner.Topic topic) {
            try {
                this.adminClient.incrementalAlterConfigs(Map.of(new ConfigResource(ConfigResource.Type.TOPIC, topic.name()), (List) topic.config().entrySet().stream().map(entry -> {
                    return new AlterConfigOp(new ConfigEntry((String) entry.getKey(), (String) entry.getValue()), AlterConfigOp.OpType.SET);
                }).collect(Collectors.toList())), new AlterConfigsOptions().timeoutMs(60));
                topic.messages(topic.messages() + "\nUpdated config: retention.ms -> " + topic.config().get("retention.ms"));
                topic.state(Status.STATE.UPDATED);
            } catch (Exception e) {
                topic.state(Status.STATE.FAILED).exception(new Provisioner.ProvisioningException("Failed to update config ", e));
            }
        }

        private void updatePartitions(TopicProvisioner.Topic topic, TopicDescription topicDescription) {
            try {
                if (topicDescription.partitions().size() < topic.partitions()) {
                    this.adminClient.createPartitions(Map.of(topic.name(), NewPartitions.increaseTo(topic.partitions())), new CreatePartitionsOptions().retryOnQuotaViolation(false)).all().get(60L, TimeUnit.SECONDS);
                    topic.state(Status.STATE.UPDATED);
                    topic.messages(topic.messages() + "\n Updated partitionCount");
                } else {
                    topic.messages(topic.messages() + "\n Ignoring partition increase because new count is not higher");
                }
            } catch (Exception e) {
                topic.state(Status.STATE.FAILED).exception(new Provisioner.ProvisioningException("Failed to update partitions", e));
            }
        }
    }
}
