package io.debezium.converters;

import io.debezium.config.Configuration;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.transforms.outbox.EventRouter;
import java.util.LinkedHashMap;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.HeaderFrom;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/converters/AbstractCloudEventsConverterTest.class */
public abstract class AbstractCloudEventsConverterTest<T extends SourceConnector> extends AbstractConnectorTest {
    protected EventRouter<SourceRecord> outboxEventRouter;
    protected HeaderFrom<SourceRecord> headerFrom;

    protected abstract Class<T> getConnectorClass();

    protected abstract String getConnectorName();

    protected abstract String getServerName();

    protected abstract JdbcConnection databaseConnection();

    protected abstract Configuration.Builder getConfigurationBuilder();

    protected abstract String topicName();

    protected abstract String tableName();

    protected abstract void createTable() throws Exception;

    protected abstract String createInsert(String str, String str2, String str3, String str4, String str5, String str6);

    protected abstract void waitForStreamingStarted() throws InterruptedException;

    @Before
    public void beforeEach() throws Exception {
        createTable();
        this.headerFrom = new HeaderFrom.Value();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("fields", "source,op,transaction");
        linkedHashMap.put("headers", "source,op,transaction");
        linkedHashMap.put("operation", "copy");
        linkedHashMap.put("header.converter.schemas.enable", "true");
        this.headerFrom.configure(linkedHashMap);
        this.outboxEventRouter = new EventRouter<>();
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        linkedHashMap2.put("table.expand.json.payload", "true");
        this.outboxEventRouter.configure(linkedHashMap2);
        startConnector();
    }

    @After
    public void afterEach() throws Exception {
        stopConnector();
        assertNoRecordsToConsume();
        databaseConnection().close();
        this.headerFrom.close();
        this.outboxEventRouter.close();
    }

    @Test
    @FixFor({"DBZ-3642"})
    public void shouldConvertToCloudEventsInJsonWithMetadataInHeadersAfterOutboxEventRouter() throws Exception {
        databaseConnection().execute(new String[]{createInsert("59a42efd-b015-44a9-9dde-cb36d9002425", "UserCreated", "User", "10711fa5", "{\"someField1\": \"some value 1\",\"someField2\": 7005}", "")});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.allRecordsInOrder()).hasSize(1);
        SourceRecord apply = this.outboxEventRouter.apply(this.headerFrom.apply(consumeRecordsByTopic.recordsForTopic(topicName()).get(0)));
        Assertions.assertThat(apply).isNotNull();
        Assertions.assertThat(apply.topic()).isEqualTo("outbox.event.User");
        Assertions.assertThat(apply.keySchema()).isEqualTo(Schema.STRING_SCHEMA);
        Assertions.assertThat(apply.key()).isEqualTo("10711fa5");
        Assertions.assertThat(apply.value()).isInstanceOf(Struct.class);
        CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithMetadataInHeaders(apply, getConnectorName(), getServerName());
    }

    private void startConnector() throws Exception {
        start(getConnectorClass(), getConfigurationBuilder().build());
        assertConnectorIsRunning();
        waitForStreamingStarted();
        assertNoRecordsToConsume();
    }
}
