package org.creekservice.internal.kafka.streams.test.extension.testsuite;

import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.creekservice.api.kafka.extension.resource.KafkaTopic;
import org.creekservice.api.kafka.metadata.KafkaTopicDescriptor;
import org.creekservice.api.kafka.metadata.KafkaTopicInput;
import org.creekservice.api.kafka.metadata.KafkaTopicOutput;
import org.creekservice.api.kafka.metadata.OwnedKafkaTopicInput;
import org.creekservice.api.kafka.metadata.OwnedKafkaTopicOutput;
import org.creekservice.api.system.test.extension.CreekSystemTest;
import org.creekservice.api.system.test.extension.test.env.listener.TestEnvironmentListener;
import org.creekservice.api.system.test.extension.test.model.CreekTestSuite;
import org.creekservice.api.system.test.extension.test.model.TestSuiteResult;
import org.creekservice.internal.kafka.extension.resource.TopicCollector;
import org.creekservice.internal.kafka.streams.test.extension.handler.TopicValidator;

/* loaded from: input_file:org/creekservice/internal/kafka/streams/test/extension/testsuite/TopicValidatingListener.class */
public final class TopicValidatingListener implements TestEnvironmentListener, TopicValidator {
    private final CreekSystemTest api;
    private final TopicCollector topicCollector;
    private Optional<TopicCollector.CollectedTopics> collectedTopics;

    /* loaded from: input_file:org/creekservice/internal/kafka/streams/test/extension/testsuite/TopicValidatingListener$InvalidTopicOperationException.class */
    private static final class InvalidTopicOperationException extends RuntimeException {
        InvalidTopicOperationException(String str, String str2, KafkaTopic<?, ?> kafkaTopic) {
            super("Tests can not " + str + " topic " + kafkaTopic.descriptor().id() + " as the services-under-test do not " + str2 + " it. Please check the topic name.");
        }
    }

    public TopicValidatingListener(CreekSystemTest creekSystemTest) {
        this(creekSystemTest, new TopicCollector());
    }

    TopicValidatingListener(CreekSystemTest creekSystemTest, TopicCollector topicCollector) {
        this.collectedTopics = Optional.empty();
        this.api = (CreekSystemTest) Objects.requireNonNull(creekSystemTest, "api");
        this.topicCollector = (TopicCollector) Objects.requireNonNull(topicCollector, "topicCollector");
    }

    public void beforeSuite(CreekTestSuite creekTestSuite) {
        this.collectedTopics = Optional.of(this.topicCollector.collectTopics((List) this.api.tests().env().currentSuite().services().stream().map((v0) -> {
            return v0.descriptor();
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList())));
    }

    public void afterSuite(CreekTestSuite creekTestSuite, TestSuiteResult testSuiteResult) {
        this.collectedTopics = Optional.empty();
    }

    @Override // org.creekservice.internal.kafka.streams.test.extension.handler.TopicValidator
    public void validateCanProduce(KafkaTopic<?, ?> kafkaTopic) {
        if (allDescriptorsAre(kafkaTopicDescriptor -> {
            return (kafkaTopicDescriptor instanceof KafkaTopicOutput) | (kafkaTopicDescriptor instanceof OwnedKafkaTopicOutput);
        }, kafkaTopic)) {
            throw new InvalidTopicOperationException("produce to", "consume from", kafkaTopic);
        }
    }

    @Override // org.creekservice.internal.kafka.streams.test.extension.handler.TopicValidator
    public void validateCanConsume(KafkaTopic<?, ?> kafkaTopic) {
        if (allDescriptorsAre(kafkaTopicDescriptor -> {
            return (kafkaTopicDescriptor instanceof KafkaTopicInput) | (kafkaTopicDescriptor instanceof OwnedKafkaTopicInput);
        }, kafkaTopic)) {
            throw new InvalidTopicOperationException("consume from", "produce to", kafkaTopic);
        }
    }

    private boolean allDescriptorsAre(Predicate<KafkaTopicDescriptor<?, ?>> predicate, KafkaTopic<?, ?> kafkaTopic) {
        return this.collectedTopics.orElseThrow(IllegalStateException::new).getAll(kafkaTopic.descriptor().id()).stream().allMatch(predicate);
    }
}
