package org.apache.pulsar.broker.service;

import java.util.Collections;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
import org.apache.pulsar.client.api.schema.SchemaBuilder;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/KeyValueTest.class */
public class KeyValueTest extends BrokerTestBase {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KeyValueTest.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        super.baseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void keyValueAutoConsumeTest() throws Exception {
        this.admin.topics().createNonPartitionedTopic("persistent://prop/ns-abc/kv-record");
        RecordSchemaBuilder record = SchemaBuilder.record("test");
        record.field("test").type(SchemaType.STRING);
        GenericSchemaImpl of = GenericAvroSchema.of(record.build(SchemaType.AVRO));
        GenericRecord build = of.newRecordBuilder().set("test", "foo").build();
        GenericRecord build2 = of.newRecordBuilder().set("test", "bar").build();
        Producer create = this.pulsarClient.newProducer(KeyValueSchema.of(of, of)).topic("persistent://prop/ns-abc/kv-record").create();
        try {
            create.newMessage().value(new KeyValue(build, build2)).send();
            Consumer subscribe = this.pulsarClient.newConsumer(KeyValueSchema.of(Schema.AUTO_CONSUME(), Schema.AUTO_CONSUME())).topic("persistent://prop/ns-abc/kv-record").subscriptionName("test").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
            try {
                Message receive = subscribe.receive();
                Assert.assertEquals(build.getField("test"), ((GenericRecord) ((KeyValue) receive.getValue()).getKey()).getField("test"));
                Assert.assertEquals(build2.getField("test"), ((GenericRecord) ((KeyValue) receive.getValue()).getValue()).getField("test"));
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }
}
