package io.telicent.smart.cache.entity.resolver.server;

import ch.qos.logback.classic.Level;
import io.telicent.smart.cache.cli.commands.AbstractCommandTests;
import io.telicent.smart.cache.cli.commands.SmartCacheCommandTester;
import io.telicent.smart.cache.configuration.Configurator;
import io.telicent.smart.cache.configuration.sources.NullSource;
import io.telicent.smart.cache.live.LiveReporter;
import io.telicent.smart.cache.live.model.LiveHeartbeat;
import io.telicent.smart.cache.live.model.LiveStatus;
import io.telicent.smart.cache.live.serializers.LiveHeartbeatDeserializer;
import io.telicent.smart.cache.server.jaxrs.model.HealthStatus;
import io.telicent.smart.cache.sources.Event;
import io.telicent.smart.cache.sources.kafka.KafkaEventSource;
import io.telicent.smart.cache.sources.kafka.KafkaTestCluster;
import jakarta.ws.rs.client.Client;
import jakarta.ws.rs.client.ClientBuilder;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import java.time.Duration;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.utils.Bytes;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.org.apache.commons.lang3.StringUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:io/telicent/smart/cache/entity/resolver/server/DockerSearchCommandTests.class */
public class DockerSearchCommandTests extends AbstractCommandTests {
    private final KafkaTestCluster kafka = new KafkaTestCluster();
    private static final Client client = ClientBuilder.newClient();

    protected void enableSpecificLogging(Class cls, Level level) {
        LoggerFactory.getILoggerFactory().getLogger(cls).setLevel(level);
    }

    protected void disableSpecificLogging() {
        LoggerFactory.getILoggerFactory().reset();
    }

    @BeforeClass
    public void setup() {
        Configurator.setSingleSource(NullSource.INSTANCE);
        SmartCacheCommandTester.TEE_TO_ORIGINAL_STREAMS = true;
        super.setup();
        this.kafka.setup();
    }

    @AfterMethod
    public void testCleanup() throws InterruptedException {
        Configurator.reset();
        this.kafka.resetTestTopic();
        super.testCleanup();
    }

    @AfterClass
    public void teardown() {
        this.kafka.teardown();
        super.teardown();
    }

    @Test
    public void api_server_live_reporting_01() throws InterruptedException {
        verifyServerStartup(Executors.newSingleThreadExecutor().submit(() -> {
            EntityResolutionApiCommand.main(new String[]{"--host", AbstractEntityResolutionApiServerTests.API_HOST, "--port", Integer.toString(AbstractEntityResolutionApiServerTests.API_PORT), "--live-bootstrap-servers", this.kafka.getBootstrapServers(), "--live-reporter-topic", "tests", "--live-reporter-interval", "1"});
        }));
        verifyLiveReports(getLiveReportsSource("tests"));
    }

    @Test
    public void api_server_live_reporting_02() throws InterruptedException {
        verifyServerStartup(Executors.newSingleThreadExecutor().submit(() -> {
            EntityResolutionApiCommand.main(new String[]{"--host", AbstractEntityResolutionApiServerTests.API_HOST, "--port", Integer.toString(AbstractEntityResolutionApiServerTests.API_PORT), "--live-bootstrap-servers", this.kafka.getBootstrapServers(), "--live-reporter-topic", "tests"});
        }));
        verifyLiveReports(getLiveReportsSource("tests"));
    }

    @Test
    public void api_server_live_reporting_03() throws InterruptedException {
        verifyServerStartup(Executors.newSingleThreadExecutor().submit(() -> {
            EntityResolutionApiCommand.main(new String[]{"--host", AbstractEntityResolutionApiServerTests.API_HOST, "--port", Integer.toString(AbstractEntityResolutionApiServerTests.API_PORT), "--live-bootstrap-servers", this.kafka.getBootstrapServers(), "--live-reporter-interval", "1"});
        }));
        verifyLiveReports(getLiveReportsSource("provenance.live"));
        this.kafka.resetTopic("provenance.live");
    }

    @Test
    public void api_server_live_reporting_04() throws InterruptedException {
        enableSpecificLogging(LiveReporter.class, Level.WARN);
        verifyServerStartup(Executors.newSingleThreadExecutor().submit(() -> {
            EntityResolutionApiCommand.main(new String[]{"--host", AbstractEntityResolutionApiServerTests.API_HOST, "--port", Integer.toString(AbstractEntityResolutionApiServerTests.API_PORT), "--live-reporter-interval", "1"});
        }));
        KafkaEventSource<Bytes, LiveHeartbeat> liveReportsSource = getLiveReportsSource("tests");
        Assert.assertNull(liveReportsSource.remaining());
        Assert.assertNull(liveReportsSource.poll(Duration.ofSeconds(1L)));
        Assert.assertTrue(StringUtils.contains(SmartCacheCommandTester.getLastStdErr(), "No sink specified"));
        disableSpecificLogging();
    }

    private static void verifyServerStartup(Future<?> future) throws InterruptedException {
        Assert.assertEquals(SmartCacheCommandTester.getLastExitStatus(), Integer.MIN_VALUE);
        Thread.sleep(5000L);
        Assert.assertFalse(future.isDone());
        future.cancel(true);
        try {
            future.get();
            Assert.fail("Expected server to have been cancelled");
        } catch (CancellationException e) {
        } catch (Throwable th) {
            Assert.fail("Unexpected error " + th.getMessage());
        }
        Thread.sleep(250L);
        Assert.assertEquals(SmartCacheCommandTester.getLastExitStatus(), 0);
    }

    private static void verifyLiveReports(KafkaEventSource<Bytes, LiveHeartbeat> kafkaEventSource) {
        Assert.assertFalse(kafkaEventSource.isExhausted());
        Event poll = kafkaEventSource.poll(Duration.ofSeconds(3L));
        Assert.assertNotNull(poll);
        Assert.assertNotNull(poll.value());
        Assert.assertEquals(((LiveHeartbeat) poll.value()).getStatus(), LiveStatus.STARTED);
        while (kafkaEventSource.remaining().longValue() > 1) {
            Event poll2 = kafkaEventSource.poll(Duration.ofSeconds(3L));
            Assert.assertNotNull(poll2);
            Assert.assertNotNull(poll2.value());
            Assert.assertEquals(((LiveHeartbeat) poll2.value()).getStatus(), LiveStatus.RUNNING);
        }
        Event poll3 = kafkaEventSource.poll(Duration.ofSeconds(3L));
        Assert.assertNotNull(poll3);
        Assert.assertNotNull(poll3.value());
        Assert.assertEquals(((LiveHeartbeat) poll3.value()).getStatus(), LiveStatus.COMPLETED);
        kafkaEventSource.close();
    }

    private KafkaEventSource<Bytes, LiveHeartbeat> getLiveReportsSource(String str) {
        return KafkaEventSource.create().bootstrapServers(this.kafka.getBootstrapServers()).topic(str).consumerGroup("test-live-reports").keyDeserializer(BytesDeserializer.class).valueDeserializer(LiveHeartbeatDeserializer.class).fromBeginning().build();
    }

    @Test
    public void api_server_base_path_01() throws InterruptedException {
        verifyServerHealthy(Executors.newSingleThreadExecutor().submit(() -> {
            EntityResolutionApiCommand.main(new String[]{"--host", AbstractEntityResolutionApiServerTests.API_HOST, "--port", Integer.toString(AbstractEntityResolutionApiServerTests.API_PORT)});
        }), "");
    }

    @Test
    public void api_server_base_path_02() throws InterruptedException {
        verifyServerHealthy(Executors.newSingleThreadExecutor().submit(() -> {
            EntityResolutionApiCommand.main(new String[]{"--host", AbstractEntityResolutionApiServerTests.API_HOST, "--port", Integer.toString(AbstractEntityResolutionApiServerTests.API_PORT), "--base-path", "/api/search"});
        }), "/api/search");
    }

    @Test
    public void api_server_base_path_03() throws InterruptedException {
        verifyServerHealthy(Executors.newSingleThreadExecutor().submit(() -> {
            EntityResolutionApiCommand.main(new String[]{"--host", AbstractEntityResolutionApiServerTests.API_HOST, "--port", Integer.toString(AbstractEntityResolutionApiServerTests.API_PORT), "--base-path", "/a/b/c/d"});
        }), "/a/b/c/d");
    }

    private static void verifyServerHealthy(Future<?> future, String str) throws InterruptedException {
        Assert.assertEquals(SmartCacheCommandTester.getLastExitStatus(), Integer.MIN_VALUE);
        Thread.sleep(5000L);
        Assert.assertFalse(future.isDone());
        try {
            Response response = client.target("http://localhost:18081" + str + "/healthz").request(new MediaType[]{MediaType.APPLICATION_JSON_TYPE}).get();
            try {
                Assert.assertNotNull((HealthStatus) response.readEntity(HealthStatus.class));
                if (response != null) {
                    response.close();
                }
            } finally {
            }
        } finally {
            future.cancel(true);
            try {
                future.get();
                Assert.fail("Expected server to have been cancelled");
            } catch (CancellationException e) {
            } catch (Throwable th) {
                Assert.fail("Unexpected error " + th.getMessage());
            }
            Thread.sleep(250L);
            Assert.assertEquals(SmartCacheCommandTester.getLastExitStatus(), 0);
        }
    }
}
