package io.debezium.pipeline;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.doc.FixFor;
import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest;
import io.debezium.util.Testing;
import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanException;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.ReflectionException;
import javax.management.openmbean.TabularDataSupport;
import org.apache.kafka.connect.source.SourceConnector;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:io/debezium/pipeline/AbstractMetricsTest.class */
public abstract class AbstractMetricsTest<T extends SourceConnector> extends AbstractAsyncEngineConnectorTest {
    protected abstract Class<T> getConnectorClass();

    protected abstract String connector();

    protected abstract String server();

    protected abstract Configuration.Builder config();

    protected abstract Configuration.Builder noSnapshot(Configuration.Builder builder);

    protected abstract void executeInsertStatements() throws Exception;

    protected abstract String tableName();

    protected abstract long expectedEvents();

    protected abstract boolean snapshotCompleted();

    protected String task() {
        return null;
    }

    protected String database() {
        return null;
    }

    protected void start() {
        start(getConnectorClass(), config().build(), loggingCompletion(), null);
    }

    protected void start(Function<Configuration.Builder, Configuration.Builder> function) {
        start(getConnectorClass(), function.apply(config()).build(), loggingCompletion(), null);
    }

    @Test
    public void testLifecycle() throws Exception {
        start();
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(connector(), server(), task(), database());
        waitForStreamingRunning(connector(), server(), getStreamingNamespace(), task());
        stopConnector();
        MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer();
        try {
            platformMBeanServer.getMBeanInfo(getSnapshotMetricsObjectName());
            Assert.fail("Expected Snapshot Metrics no longer to exist");
        } catch (InstanceNotFoundException e) {
        }
        try {
            platformMBeanServer.getMBeanInfo(getStreamingMetricsObjectName());
            Assert.fail("Expected Streaming Metrics no longer to exist");
        } catch (InstanceNotFoundException e2) {
        }
    }

    @Test
    public void testSnapshotOnlyMetrics() throws Exception {
        executeInsertStatements();
        start();
        assertSnapshotMetrics();
    }

    @Test
    public void testSnapshotAndStreamingMetrics() throws Exception {
        executeInsertStatements();
        start();
        assertConnectorIsRunning();
        assertSnapshotMetrics();
        consumeRecords(2);
        assertStreamingMetrics(false, expectedEvents());
    }

    @Test
    @FixFor({"DBZ-6603"})
    public void testSnapshotAndStreamingWithCustomMetrics() throws Exception {
        executeInsertStatements();
        Map<String, String> of = Map.of("env", "test", "bu", "bigdata");
        start(builder -> {
            return builder.with(CommonConnectorConfig.CUSTOM_METRIC_TAGS, "env=test,bu=bigdata");
        });
        assertSnapshotWithCustomMetrics(of);
        consumeRecords(2);
        assertStreamingWithCustomMetrics(of, expectedEvents());
    }

    @Test
    public void testStreamingOnlyMetrics() throws Exception {
        start(this::noSnapshot);
        waitForStreamingRunning(connector(), server(), getStreamingNamespace(), task());
        assertSnapshotNotExecutedMetrics();
        assertStreamingMetrics(false, expectedEvents());
    }

    @Test
    public void testAdvancedStreamingMetrics() throws Exception {
        start(builder -> {
            return noSnapshot(builder).with(CommonConnectorConfig.ADVANCED_METRICS_ENABLE, Boolean.TRUE);
        });
        waitForStreamingRunning(connector(), server(), getStreamingNamespace(), task());
        assertSnapshotNotExecutedMetrics();
        assertStreamingMetrics(true, expectedEvents());
    }

    @Test
    public void testPauseAndResumeAdvancedStreamingMetrics() throws Exception {
        start(builder -> {
            return noSnapshot(builder).with(CommonConnectorConfig.ADVANCED_METRICS_ENABLE, Boolean.TRUE);
        });
        waitForStreamingRunning(connector(), server(), getStreamingNamespace(), task());
        assertSnapshotNotExecutedMetrics();
        assertStreamingMetrics(true, expectedEvents());
        invokeOperation(getMultiplePartitionStreamingMetricsObjectName(), "pause");
        insertRecords();
        assertAdvancedMetrics(2L);
        invokeOperation(getMultiplePartitionStreamingMetricsObjectName(), "resume");
        insertRecords();
        consumeRecords(4);
        assertAdvancedMetrics(4L);
    }

    private void insertRecords() throws Exception {
        executeInsertStatements();
        waitForAvailableRecords(30L, TimeUnit.SECONDS);
        Thread.sleep(Duration.ofSeconds(2L).toMillis());
    }

    protected void assertSnapshotMetrics() throws Exception {
        MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer();
        waitForSnapshotToBeCompleted(connector(), server(), task(), database());
        Assertions.assertThat(platformMBeanServer.getAttribute(getSnapshotMetricsObjectName(), "TotalTableCount")).isEqualTo(1);
        Assertions.assertThat(platformMBeanServer.getAttribute(getSnapshotMetricsObjectName(), "CapturedTables")).isEqualTo(new String[]{tableName()});
        Assertions.assertThat(platformMBeanServer.getAttribute(getSnapshotMetricsObjectName(), "TotalNumberOfEventsSeen")).isEqualTo(2L);
        Assertions.assertThat(platformMBeanServer.getAttribute(getSnapshotMetricsObjectName(), "RemainingTableCount")).isEqualTo(0);
        Assertions.assertThat(platformMBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotRunning")).isEqualTo(false);
        Assertions.assertThat(platformMBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotAborted")).isEqualTo(false);
        Assertions.assertThat(platformMBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotCompleted")).isEqualTo(true);
        Assertions.assertThat(platformMBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotPaused")).isEqualTo(false);
        Assertions.assertThat(platformMBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotPausedDurationInSeconds")).isEqualTo(0L);
    }

    protected void assertSnapshotWithCustomMetrics(Map<String, String> map) throws Exception {
        MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer();
        ObjectName snapshotMetricsObjectName = getSnapshotMetricsObjectName(connector(), server(), task(), database(), map);
        waitForSnapshotWithCustomMetricsToBeCompleted(connector(), server(), task(), database(), map);
        Assertions.assertThat(platformMBeanServer.getAttribute(snapshotMetricsObjectName, "TotalTableCount")).isEqualTo(1);
        Assertions.assertThat(platformMBeanServer.getAttribute(snapshotMetricsObjectName, "CapturedTables")).isEqualTo(new String[]{tableName()});
        Assertions.assertThat(platformMBeanServer.getAttribute(snapshotMetricsObjectName, "TotalNumberOfEventsSeen")).isEqualTo(2L);
        Assertions.assertThat(platformMBeanServer.getAttribute(snapshotMetricsObjectName, "RemainingTableCount")).isEqualTo(0);
        Assertions.assertThat(platformMBeanServer.getAttribute(snapshotMetricsObjectName, "SnapshotRunning")).isEqualTo(false);
        Assertions.assertThat(platformMBeanServer.getAttribute(snapshotMetricsObjectName, "SnapshotAborted")).isEqualTo(false);
        Assertions.assertThat(platformMBeanServer.getAttribute(snapshotMetricsObjectName, "SnapshotCompleted")).isEqualTo(true);
        Assertions.assertThat(platformMBeanServer.getAttribute(snapshotMetricsObjectName, "SnapshotPaused")).isEqualTo(false);
        Assertions.assertThat(platformMBeanServer.getAttribute(snapshotMetricsObjectName, "SnapshotPausedDurationInSeconds")).isEqualTo(0L);
    }

    private void assertSnapshotNotExecutedMetrics() throws Exception {
        MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer();
        Awaitility.await("Waiting for snapshot metrics to appear").atMost(waitTimeForRecords(), TimeUnit.SECONDS).until(() -> {
            try {
                platformMBeanServer.getObjectInstance(getSnapshotMetricsObjectName());
                return true;
            } catch (InstanceNotFoundException e) {
                return false;
            }
        });
        Assertions.assertThat(platformMBeanServer.getAttribute(getSnapshotMetricsObjectName(), "TotalNumberOfEventsSeen")).isEqualTo(0L);
        Assertions.assertThat(platformMBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotCompleted")).isEqualTo(Boolean.valueOf(snapshotCompleted()));
    }

    protected void assertStreamingMetrics(boolean z, long j) throws Exception {
        MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer();
        waitForStreamingRunning(connector(), server(), getStreamingNamespace(), task());
        executeInsertStatements();
        consumeRecordsByTopic((int) j);
        Thread.sleep(Duration.ofSeconds(2L).toMillis());
        Testing.print("****ASSERTIONS****");
        Assertions.assertThat(platformMBeanServer.getAttribute(getStreamingMetricsObjectName(), "Connected")).isEqualTo(true);
        Assertions.assertThat(platformMBeanServer.getAttribute(getMultiplePartitionStreamingMetricsObjectName(), "TotalNumberOfCreateEventsSeen")).isEqualTo(Long.valueOf(j));
        if (z) {
            assertAdvancedMetrics(2L);
        }
    }

    public void assertAdvancedMetrics(long j) throws Exception {
        Assertions.assertThat(((TabularDataSupport) ManagementFactory.getPlatformMBeanServer().getAttribute(getStreamingMetricsObjectName(connector(), server(), getStreamingNamespace(), task(), database()), "NumberOfCreateEventsSeen")).values().stream().limit(1L).toList().get(0).toString()).isEqualTo("javax.management.openmbean.CompositeDataSupport(compositeType=javax.management.openmbean.CompositeType(name=java.util.Map<java.lang.String, java.lang.Long>,items=((itemName=key,itemType=javax.management.openmbean.SimpleType(name=java.lang.String)),(itemName=value,itemType=javax.management.openmbean.SimpleType(name=java.lang.Long)))),contents={key=" + tableName() + ", value=" + j + "})");
    }

    private void invokeOperation(ObjectName objectName, String str) throws ReflectionException, InstanceNotFoundException, MBeanException {
        ManagementFactory.getPlatformMBeanServer().invoke(objectName, str, new Object[0], new String[0]);
    }

    protected void assertStreamingWithCustomMetrics(Map<String, String> map, long j) throws Exception {
        MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer();
        waitForStreamingWithCustomMetricsToStart(connector(), server(), task(), database(), map);
        executeInsertStatements();
        waitForAvailableRecords(30L, TimeUnit.SECONDS);
        consumeRecords((int) j);
        Thread.sleep(Duration.ofSeconds(2L).toMillis());
        Testing.print("****ASSERTIONS****");
        Assertions.assertThat(platformMBeanServer.getAttribute(getStreamingMetricsObjectName(connector(), server(), task(), (String) null, map), "Connected")).isEqualTo(true);
        Assertions.assertThat(platformMBeanServer.getAttribute(getMultiplePartitionStreamingMetricsObjectNameCustomTags(map), "TotalNumberOfCreateEventsSeen")).isEqualTo(Long.valueOf(j));
    }

    protected ObjectName getSnapshotMetricsObjectName() throws MalformedObjectNameException {
        return getSnapshotMetricsObjectName(connector(), server());
    }

    protected ObjectName getStreamingMetricsObjectName() throws MalformedObjectNameException {
        return getStreamingMetricsObjectName(connector(), server());
    }

    protected ObjectName getMultiplePartitionStreamingMetricsObjectName() throws MalformedObjectNameException {
        return getStreamingMetricsObjectName(connector(), server());
    }

    protected ObjectName getMultiplePartitionStreamingMetricsObjectNameCustomTags(Map<String, String> map) throws MalformedObjectNameException {
        return getStreamingMetricsObjectName(connector(), server(), map);
    }
}
