package kalix.javasdk.testkit;

import akka.actor.ActorSystem;
import akka.annotation.InternalApi;
import akka.grpc.GrpcClientSettings;
import akka.http.javadsl.Http;
import akka.http.javadsl.model.HttpRequest;
import akka.pattern.Patterns;
import akka.stream.Materializer;
import akka.stream.SystemMaterializer;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.io.IOException;
import java.net.ServerSocket;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import kalix.javasdk.Kalix;
import kalix.javasdk.KalixRunner;
import kalix.javasdk.Principal;
import kalix.javasdk.impl.GrpcClients;
import kalix.javasdk.impl.MessageCodec;
import kalix.javasdk.impl.ProxyInfoHolder;
import kalix.javasdk.testkit.EventingTestKit;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kalix/javasdk/testkit/KalixTestKit.class */
public class KalixTestKit {
    private static final Logger log = LoggerFactory.getLogger(KalixTestKit.class);

    /* renamed from: kalix, reason: collision with root package name */
    private final Kalix f0kalix;
    private final MessageCodec messageCodec;
    private final EventingTestKit.MessageBuilder messageBuilder;
    private final Settings settings;
    private boolean started;
    private String proxyHost;
    private int proxyPort;
    private Optional<KalixProxyContainer> proxyContainer;
    private KalixRunner runner;
    private ActorSystem testSystem;
    private EventingTestKit eventingTestKit;

    /* loaded from: input_file:kalix/javasdk/testkit/KalixTestKit$MockedEventing.class */
    public static class MockedEventing {
        public static final String VALUE_ENTITY = "value-entity";
        public static final String EVENT_SOURCED_ENTITY = "event-sourced-entity";
        public static final String STREAM = "stream";
        public static final String TOPIC = "topic";
        private final Map<String, Set<String>> mockedIncomingEvents;
        private final Map<String, Set<String>> mockedOutgoingEvents;
        public static MockedEventing EMPTY = new MockedEventing();

        private MockedEventing() {
            this(new HashMap(), new HashMap());
        }

        private MockedEventing(Map<String, Set<String>> map, Map<String, Set<String>> map2) {
            this.mockedIncomingEvents = map;
            this.mockedOutgoingEvents = map2;
        }

        public MockedEventing withValueEntityIncomingMessages(String str) {
            HashMap hashMap = new HashMap(this.mockedIncomingEvents);
            hashMap.compute(VALUE_ENTITY, updateValues(str));
            return new MockedEventing(hashMap, new HashMap(this.mockedOutgoingEvents));
        }

        public MockedEventing withEventSourcedIncomingMessages(String str) {
            HashMap hashMap = new HashMap(this.mockedIncomingEvents);
            hashMap.compute(EVENT_SOURCED_ENTITY, updateValues(str));
            return new MockedEventing(hashMap, new HashMap(this.mockedOutgoingEvents));
        }

        public MockedEventing withStreamIncomingMessages(String str, String str2) {
            HashMap hashMap = new HashMap(this.mockedIncomingEvents);
            hashMap.compute(STREAM, updateValues(str + "/" + str2));
            return new MockedEventing(hashMap, new HashMap(this.mockedOutgoingEvents));
        }

        public MockedEventing withTopicIncomingMessages(String str) {
            HashMap hashMap = new HashMap(this.mockedIncomingEvents);
            hashMap.compute(TOPIC, updateValues(str));
            return new MockedEventing(hashMap, new HashMap(this.mockedOutgoingEvents));
        }

        public MockedEventing withTopicOutgoingMessages(String str) {
            HashMap hashMap = new HashMap(this.mockedOutgoingEvents);
            hashMap.compute(TOPIC, updateValues(str));
            return new MockedEventing(new HashMap(this.mockedIncomingEvents), hashMap);
        }

        @NotNull
        private BiFunction<String, Set<String>, Set<String>> updateValues(String str) {
            return (str2, set) -> {
                if (set != null) {
                    set.add(str);
                    return set;
                }
                LinkedHashSet linkedHashSet = new LinkedHashSet();
                linkedHashSet.add(str);
                return linkedHashSet;
            };
        }

        public String toString() {
            return "MockedEventing{mockedIncomingEvents=" + this.mockedIncomingEvents + ", mockedOutgoingEvents=" + this.mockedOutgoingEvents + "}";
        }

        public boolean hasIncomingConfig() {
            return !this.mockedIncomingEvents.isEmpty();
        }

        public boolean hasConfig() {
            return hasIncomingConfig() || hasOutgoingConfig();
        }

        public boolean hasOutgoingConfig() {
            return !this.mockedOutgoingEvents.isEmpty();
        }

        public String toIncomingFlowConfig() {
            return toConfig(this.mockedIncomingEvents);
        }

        public String toOutgoingFlowConfig() {
            return toConfig(this.mockedOutgoingEvents);
        }

        private String toConfig(Map<String, Set<String>> map) {
            return (String) map.entrySet().stream().sorted(Map.Entry.comparingByKey()).flatMap(entry -> {
                String str = (String) entry.getKey();
                return ((Set) entry.getValue()).stream().map(str2 -> {
                    return str + "," + str2;
                });
            }).collect(Collectors.joining(";"));
        }

        boolean hasValueEntitySubscription(String str) {
            return checkExistence(VALUE_ENTITY, str);
        }

        boolean hasEventSourcedEntitySubscription(String str) {
            return checkExistence(EVENT_SOURCED_ENTITY, str);
        }

        boolean hasStreamSubscription(String str, String str2) {
            return checkExistence(STREAM, str + "/" + str2);
        }

        boolean hasTopicSubscription(String str) {
            return checkExistence(TOPIC, str);
        }

        boolean hasTopicDestination(String str) {
            Set<String> set = this.mockedOutgoingEvents.get(TOPIC);
            return set != null && set.contains(str);
        }

        private boolean checkExistence(String str, String str2) {
            Set<String> set = this.mockedIncomingEvents.get(str);
            return set != null && set.contains(str2);
        }
    }

    /* loaded from: input_file:kalix/javasdk/testkit/KalixTestKit$Settings.class */
    public static class Settings {
        public static Duration DEFAULT_STOP_TIMEOUT = Duration.ofSeconds(10);
        public static Settings DEFAULT = new Settings(DEFAULT_STOP_TIMEOUT);
        public final Duration stopTimeout;
        public final String serviceName;
        public final boolean aclEnabled;
        public final boolean advancedViews;
        public final Optional<Duration> workflowTickInterval;
        public final Map<String, String> servicePortMappings;
        public final EventingSupport eventingSupport;
        public final MockedEventing mockedEventing;

        /* loaded from: input_file:kalix/javasdk/testkit/KalixTestKit$Settings$EventingSupport.class */
        public enum EventingSupport {
            TEST_BROKER,
            GOOGLE_PUBSUB,
            KAFKA
        }

        @Deprecated
        public Settings(Duration duration) {
            this(duration, "self", false, false, Optional.empty(), Collections.emptyMap(), EventingSupport.TEST_BROKER, MockedEventing.EMPTY);
        }

        private Settings(Duration duration, String str, boolean z, boolean z2, Optional<Duration> optional, Map<String, String> map, EventingSupport eventingSupport, MockedEventing mockedEventing) {
            this.stopTimeout = duration;
            this.serviceName = str;
            this.aclEnabled = z;
            this.advancedViews = z2;
            this.workflowTickInterval = optional;
            this.servicePortMappings = map;
            this.eventingSupport = eventingSupport;
            this.mockedEventing = mockedEventing;
        }

        public Settings withStopTimeout(Duration duration) {
            return new Settings(duration, this.serviceName, this.aclEnabled, this.advancedViews, this.workflowTickInterval, this.servicePortMappings, this.eventingSupport, this.mockedEventing);
        }

        public Settings withServiceName(String str) {
            return new Settings(this.stopTimeout, str, this.aclEnabled, this.advancedViews, this.workflowTickInterval, this.servicePortMappings, this.eventingSupport, this.mockedEventing);
        }

        public Settings withAclDisabled() {
            return new Settings(this.stopTimeout, this.serviceName, false, this.advancedViews, this.workflowTickInterval, this.servicePortMappings, this.eventingSupport, this.mockedEventing);
        }

        public Settings withAclEnabled() {
            return new Settings(this.stopTimeout, this.serviceName, true, this.advancedViews, this.workflowTickInterval, this.servicePortMappings, this.eventingSupport, this.mockedEventing);
        }

        public Settings withAdvancedViews() {
            return new Settings(this.stopTimeout, this.serviceName, this.aclEnabled, true, this.workflowTickInterval, this.servicePortMappings, this.eventingSupport, this.mockedEventing);
        }

        public Settings withWorkflowTickInterval(Duration duration) {
            return new Settings(this.stopTimeout, this.serviceName, this.aclEnabled, true, Optional.of(duration), this.servicePortMappings, this.eventingSupport, this.mockedEventing);
        }

        public Settings withValueEntityIncomingMessages(String str) {
            return new Settings(this.stopTimeout, this.serviceName, this.aclEnabled, true, this.workflowTickInterval, this.servicePortMappings, this.eventingSupport, this.mockedEventing.withValueEntityIncomingMessages(str));
        }

        public Settings withEventSourcedEntityIncomingMessages(String str) {
            return new Settings(this.stopTimeout, this.serviceName, this.aclEnabled, true, this.workflowTickInterval, this.servicePortMappings, this.eventingSupport, this.mockedEventing.withEventSourcedIncomingMessages(str));
        }

        public Settings withStreamIncomingMessages(String str, String str2) {
            return new Settings(this.stopTimeout, this.serviceName, this.aclEnabled, true, this.workflowTickInterval, this.servicePortMappings, this.eventingSupport, this.mockedEventing.withStreamIncomingMessages(str, str2));
        }

        public Settings withTopicIncomingMessages(String str) {
            return new Settings(this.stopTimeout, this.serviceName, this.aclEnabled, true, this.workflowTickInterval, this.servicePortMappings, this.eventingSupport, this.mockedEventing.withTopicIncomingMessages(str));
        }

        public Settings withTopicOutgoingMessages(String str) {
            return new Settings(this.stopTimeout, this.serviceName, this.aclEnabled, true, this.workflowTickInterval, this.servicePortMappings, this.eventingSupport, this.mockedEventing.withTopicOutgoingMessages(str));
        }

        public Settings withServicePortMapping(String str, String str2, int i) {
            HashMap hashMap = new HashMap(this.servicePortMappings);
            hashMap.put(str, str2 + ":" + i);
            return new Settings(this.stopTimeout, str, this.aclEnabled, this.advancedViews, this.workflowTickInterval, Map.copyOf(hashMap), this.eventingSupport, this.mockedEventing);
        }

        public Settings withEventingSupport(EventingSupport eventingSupport) {
            return new Settings(this.stopTimeout, this.serviceName, this.aclEnabled, this.advancedViews, this.workflowTickInterval, this.servicePortMappings, eventingSupport, this.mockedEventing);
        }

        public String toString() {
            return "Settings(stopTimeout=" + this.stopTimeout + ", serviceName='" + this.serviceName + "', aclEnabled=" + this.aclEnabled + ", advancedViews=" + this.advancedViews + ", workflowTickInterval=" + this.workflowTickInterval + ", servicePortMappings=[" + String.join(", ", (List) this.servicePortMappings.entrySet().stream().map(entry -> {
                return ((String) entry.getKey()) + "=" + ((String) entry.getValue());
            }).collect(Collectors.toList())) + "], eventingSupport=" + this.eventingSupport + ", mockedEventing=" + this.mockedEventing + ")";
        }
    }

    public KalixTestKit(Kalix kalix2) {
        this(kalix2, kalix2.getMessageCodec(), Settings.DEFAULT);
    }

    public KalixTestKit(Kalix kalix2, Settings settings) {
        this(kalix2, kalix2.getMessageCodec(), settings);
    }

    public KalixTestKit(Kalix kalix2, MessageCodec messageCodec, Settings settings) {
        this.started = false;
        this.proxyContainer = Optional.empty();
        this.f0kalix = kalix2;
        this.messageCodec = messageCodec;
        this.messageBuilder = new EventingTestKit.MessageBuilder(messageCodec);
        this.settings = settings;
    }

    public KalixTestKit start() {
        return start(ConfigFactory.load());
    }

    public KalixTestKit start(Config config) {
        if (this.started) {
            throw new IllegalStateException("KalixTestkit already started");
        }
        Boolean bool = (Boolean) Optional.ofNullable(System.getenv("KALIX_TESTKIT_USE_TEST_CONTAINERS")).map(Boolean::valueOf).orElse(true);
        int userFunctionPort = userFunctionPort(bool);
        HashMap hashMap = new HashMap();
        hashMap.put("kalix.user-function-port", Integer.valueOf(userFunctionPort));
        hashMap.put("kalix.system.akka.coordinated-shutdown.exit-jvm", "off");
        hashMap.put("kalix.dev-mode.docker-compose-file", "none");
        this.runner = this.f0kalix.createRunner(ConfigFactory.parseMap(hashMap).withFallback(config));
        this.runner.run();
        this.testSystem = ActorSystem.create("KalixTestkit", ConfigFactory.parseString("akka.http.server.preview.enable-http2 = true"));
        runProxy(bool, userFunctionPort, startEventingTestkit(bool));
        this.started = true;
        if (log.isDebugEnabled()) {
            log.debug("TestKit using [{}:{}] for calls to proxy from service", this.proxyHost, Integer.valueOf(this.proxyPort));
        }
        return this;
    }

    private int startEventingTestkit(Boolean bool) {
        int eventingTestKitPort = eventingTestKitPort(bool);
        if (this.settings.eventingSupport == Settings.EventingSupport.TEST_BROKER || this.settings.mockedEventing.hasConfig()) {
            log.info("Eventing TestKit booting up on port: " + eventingTestKitPort);
            this.eventingTestKit = EventingTestKit.start(this.testSystem, "0.0.0.0", eventingTestKitPort, this.messageCodec);
        }
        return eventingTestKitPort;
    }

    private int eventingTestKitPort(Boolean bool) {
        if (bool.booleanValue()) {
            return availableLocalPort();
        }
        return 8999;
    }

    private void runProxy(Boolean bool, int i, int i2) {
        if (bool.booleanValue()) {
            KalixProxyContainer kalixProxyContainer = new KalixProxyContainer(this.settings.eventingSupport, i, i2);
            this.proxyContainer = Optional.of(kalixProxyContainer);
            kalixProxyContainer.addEnv("SERVICE_NAME", this.settings.serviceName);
            kalixProxyContainer.addEnv("ACL_ENABLED", Boolean.toString(this.settings.aclEnabled));
            kalixProxyContainer.addEnv("VIEW_FEATURES_ALL", Boolean.toString(this.settings.advancedViews));
            ArrayList arrayList = new ArrayList();
            arrayList.add("-Dlogback.configurationFile=logback-dev-mode.xml");
            if (this.settings.mockedEventing.hasConfig()) {
                arrayList.add("-Dkalix.proxy.eventing.grpc-backend.host=host.testcontainers.internal");
                arrayList.add("-Dkalix.proxy.eventing.grpc-backend.port=" + i2);
            }
            if (this.settings.eventingSupport == Settings.EventingSupport.TEST_BROKER) {
                arrayList.add("-Dkalix.proxy.eventing.support=grpc-backend");
            } else if (this.settings.eventingSupport == Settings.EventingSupport.KAFKA) {
                arrayList.add("-Dkalix.proxy.eventing.support=kafka");
                arrayList.add("-Dkalix.proxy.eventing.kafka.bootstrap-servers=host.testcontainers.internal:9092");
            } else if (this.settings.eventingSupport == Settings.EventingSupport.GOOGLE_PUBSUB) {
                arrayList.add("-Dkalix.proxy.eventing.support=google-pubsub-emulator");
                arrayList.add("-Dkalix.proxy.eventing.google-pubsub-emulator-defaults.host=host.testcontainers.internal");
                arrayList.add("-Dkalix.proxy.eventing.google-pubsub-emulator-defaults.port=8085");
            }
            if (this.settings.mockedEventing.hasIncomingConfig()) {
                arrayList.add("-Dkalix.proxy.eventing.override.sources=" + this.settings.mockedEventing.toIncomingFlowConfig());
            }
            if (this.settings.mockedEventing.hasOutgoingConfig()) {
                arrayList.add("-Dkalix.proxy.eventing.override.destinations=" + this.settings.mockedEventing.toOutgoingFlowConfig());
            }
            this.settings.servicePortMappings.forEach((str, str2) -> {
                arrayList.add("-Dkalix.dev-mode.service-port-mappings." + str + "=" + str2);
            });
            log.debug("Running container with javaOptions=" + arrayList);
            kalixProxyContainer.addEnv("JAVA_TOOL_OPTIONS", String.join(" ", arrayList));
            this.settings.workflowTickInterval.ifPresent(duration -> {
                kalixProxyContainer.addEnv("WORKFLOW_TICK_INTERVAL", duration.toMillis() + ".millis");
            });
            kalixProxyContainer.start();
            this.proxyPort = kalixProxyContainer.getProxyPort();
            this.proxyHost = kalixProxyContainer.getHost();
        } else {
            this.proxyPort = KalixProxyContainer.DEFAULT_PROXY_PORT;
            this.proxyHost = "localhost";
            Http http = Http.get(this.testSystem);
            log.info("Checking kalix-runtime status");
            try {
                Patterns.retry(() -> {
                    return http.singleRequest(HttpRequest.GET("http://localhost:8558/ready")).thenCompose(httpResponse -> {
                        int intValue = httpResponse.status().intValue();
                        if (intValue == 200) {
                            log.info("Kalix-runtime started");
                            return CompletableFuture.completedStage("Ok");
                        }
                        log.info("Waiting for kalix-runtime, current response code is {}", Integer.valueOf(intValue));
                        return CompletableFuture.failedFuture(new IllegalStateException("Kalix Runtime not started."));
                    });
                }, 10, Duration.ofSeconds(1L), this.testSystem).toCompletableFuture().get();
            } catch (InterruptedException | ExecutionException e) {
                log.error("Failed to connect to Kalix Runtime with:", e);
                throw new RuntimeException(e);
            }
        }
        ProxyInfoHolder proxyInfoHolder = ProxyInfoHolder.get(this.runner.system());
        proxyInfoHolder.overridePort(this.proxyPort);
        proxyInfoHolder.overrideProxyHost(this.proxyHost);
        proxyInfoHolder.overrideTracingCollectorEndpoint("");
    }

    private int userFunctionPort(Boolean bool) {
        return bool.booleanValue() ? availableLocalPort() : KalixProxyContainer.DEFAULT_USER_FUNCTION_PORT;
    }

    public String getHost() {
        if (this.started) {
            return this.proxyHost;
        }
        throw new IllegalStateException("Need to start KalixTestkit before accessing the host name");
    }

    public int getPort() {
        if (this.started) {
            return this.proxyPort;
        }
        throw new IllegalStateException("Need to start KalixTestkit before accessing the port");
    }

    public <T> T getGrpcClient(Class<T> cls) {
        return (T) GrpcClients.get(getActorSystem()).getGrpcClient(cls, getHost(), getPort());
    }

    public <T> T getGrpcClientForPrincipal(Class<T> cls, Principal principal) {
        String str = null;
        if (principal == Principal.SELF) {
            str = this.settings.serviceName;
        } else if (principal instanceof Principal.LocalService) {
            str = ((Principal.LocalService) principal).getName();
        }
        return str != null ? (T) GrpcClients.get(getActorSystem()).getGrpcClient(cls, getHost(), getPort(), str) : (T) GrpcClients.get(getActorSystem()).getGrpcClient(cls, getHost(), getPort());
    }

    public Materializer getMaterializer() {
        return SystemMaterializer.get(getActorSystem()).materializer();
    }

    public ActorSystem getActorSystem() {
        if (this.started) {
            return this.testSystem;
        }
        throw new IllegalStateException("Need to start KalixTestkit before accessing actor system");
    }

    @Deprecated(since = "0.8.1", forRemoval = true)
    public GrpcClientSettings getGrpcClientSettings() {
        if (this.started) {
            return GrpcClientSettings.connectToServiceAt(getHost(), getPort(), this.testSystem).withTls(false);
        }
        throw new IllegalStateException("Need to start KalixTestkit before accessing gRPC client settings");
    }

    @Deprecated
    public EventingTestKit.Topic getTopic(String str) {
        if (this.settings.eventingSupport.equals(Settings.EventingSupport.TEST_BROKER)) {
            return this.eventingTestKit.getTopic(str);
        }
        throw new IllegalStateException("Currently configured eventing support is (" + this.settings.eventingSupport + "). To use this API, configure it to be (" + Settings.EventingSupport.TEST_BROKER + ")");
    }

    public EventingTestKit.IncomingMessages getValueEntityIncomingMessages(String str) {
        if (!this.settings.mockedEventing.hasValueEntitySubscription(str)) {
            throwMissingConfigurationException("ValueEntity " + str);
        }
        return this.eventingTestKit.getValueEntityIncomingMessages(str);
    }

    public EventingTestKit.IncomingMessages getEventSourcedEntityIncomingMessages(String str) {
        if (!this.settings.mockedEventing.hasEventSourcedEntitySubscription(str)) {
            throwMissingConfigurationException("EventSourcedEntity " + str);
        }
        return this.eventingTestKit.getEventSourcedEntityIncomingMessages(str);
    }

    public EventingTestKit.IncomingMessages getStreamIncomingMessages(String str, String str2) {
        if (!this.settings.mockedEventing.hasStreamSubscription(str, str2)) {
            throwMissingConfigurationException("Stream " + str + "/" + str2);
        }
        return this.eventingTestKit.getStreamIncomingMessages(str, str2);
    }

    public EventingTestKit.IncomingMessages getTopicIncomingMessages(String str) {
        if (!this.settings.mockedEventing.hasTopicSubscription(str)) {
            throwMissingConfigurationException("Topic " + str);
        }
        return this.eventingTestKit.getTopicIncomingMessages(str);
    }

    public EventingTestKit.OutgoingMessages getTopicOutgoingMessages(String str) {
        if (!this.settings.mockedEventing.hasTopicDestination(str)) {
            throwMissingConfigurationException("Topic " + str);
        }
        return this.eventingTestKit.getTopicOutgoingMessages(str);
    }

    private void throwMissingConfigurationException(String str) {
        throw new IllegalStateException("Currently configured mocked eventing is [" + this.settings.mockedEventing + "]. To use the MockedEventing API, to configure mocking of " + str);
    }

    public void stop() {
        try {
            this.proxyContainer.ifPresent(kalixProxyContainer -> {
                kalixProxyContainer.stop();
            });
        } catch (Exception e) {
            log.error("KalixTestkit proxy container failed to stop", e);
        }
        try {
            this.testSystem.terminate();
            this.testSystem.getWhenTerminated().toCompletableFuture().get(this.settings.stopTimeout.toMillis(), TimeUnit.MILLISECONDS);
        } catch (Exception e2) {
            log.error("KalixTestkit ActorSystem failed to terminate", e2);
        }
        try {
            this.runner.terminate().toCompletableFuture().get(this.settings.stopTimeout.toMillis(), TimeUnit.MILLISECONDS);
        } catch (Exception e3) {
            log.error("KalixTestkit KalixRunner failed to terminate", e3);
        }
        this.started = false;
    }

    public static int availableLocalPort() {
        try {
            ServerSocket serverSocket = new ServerSocket(0);
            try {
                int localPort = serverSocket.getLocalPort();
                serverSocket.close();
                return localPort;
            } finally {
            }
        } catch (IOException e) {
            throw new RuntimeException("Couldn't get available local port", e);
        }
    }

    @InternalApi
    public MessageCodec getMessageCodec() {
        return this.messageCodec;
    }

    @InternalApi
    public KalixRunner getRunner() {
        return this.runner;
    }

    public EventingTestKit.MessageBuilder getMessageBuilder() {
        return this.messageBuilder;
    }
}
