/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarTestClient;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.tests.EnumValuesDataProvider;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker-impl"})
public class MessageChecksumTest
extends BrokerTestBase {
    private static final Logger log = LoggerFactory.getLogger(MessageChecksumTest.class);

    @Override
    @BeforeMethod
    public void setup() throws Exception {
        this.baseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        this.internalCleanup();
    }

    @Override
    protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
        clientBuilder.connectionsPerBroker(0);
    }

    @Override
    protected PulsarClient createNewPulsarClient(ClientBuilder clientBuilder) throws PulsarClientException {
        return PulsarTestClient.create(clientBuilder);
    }

    @Test(dataProviderClass=EnumValuesDataProvider.class, dataProvider="values")
    public void testChecksumCompatibilityInMixedVersionBrokerCluster(MixedVersionScenario mixedVersionScenario) throws Exception {
        String topicName = "persistent://prop/use/ns-abc/testChecksumBackwardsCompatibilityWithOldBrokerWithoutChecksumHandling";
        if (mixedVersionScenario == MixedVersionScenario.CONNECTED_TO_OLD_THEN_NEW_VERSION) {
            this.makeClientAssumeThatItsConnectedToBrokerWithoutChecksumSupport();
        }
        PulsarTestClient pulsarTestClient = (PulsarTestClient)this.pulsarClient;
        ProducerImpl producer = (ProducerImpl)this.pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/testChecksumBackwardsCompatibilityWithOldBrokerWithoutChecksumHandling").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/testChecksumBackwardsCompatibilityWithOldBrokerWithoutChecksumHandling"}).subscriptionName("my-sub").subscribe();
        CountDownLatch messageSendingProcessedLatch = new CountDownLatch(2);
        producer.send((Object)"message-1".getBytes());
        pulsarTestClient.dropOpSendMessages();
        byte[] messageBytes = "message-2".getBytes();
        TypedMessageBuilder messageBuilder = producer.newMessage().value((Object)messageBytes);
        CompletableFuture tamperedMessageSendFuture = messageBuilder.sendAsync();
        pulsarTestClient.setPendingMessageCallback(null);
        pulsarTestClient.disconnectProducerAndRejectReconnecting(producer);
        ((TypedMessageBuilderImpl)messageBuilder).getContent().put(messageBytes.length - 1, (byte)51);
        if (mixedVersionScenario == MixedVersionScenario.CONNECTED_TO_NEW_THEN_OLD_VERSION) {
            this.makeClientAssumeThatItsConnectedToBrokerWithoutChecksumSupport();
        } else {
            this.resetOverridingConnectedBrokerVersion();
        }
        pulsarTestClient.allowReconnecting();
        try {
            tamperedMessageSendFuture.get(10L, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            Assert.fail((String)"Broker shouldn't verify checksum for corrupted message and it shouldn't fail", (Throwable)e);
        }
        Message msg = consumer.receive(1, TimeUnit.SECONDS);
        Assert.assertEquals((String)new String(msg.getData()), (String)"message-1");
        msg = consumer.receive(1, TimeUnit.SECONDS);
        Assert.assertEquals((String)new String(msg.getData()), (String)"message-3");
    }

    private void makeClientAssumeThatItsConnectedToBrokerWithoutChecksumSupport() {
        ((PulsarTestClient)this.pulsarClient).setOverrideRemoteEndpointProtocolVersion(ProtocolVersion.v5.getValue());
    }

    private void resetOverridingConnectedBrokerVersion() {
        ((PulsarTestClient)this.pulsarClient).setOverrideRemoteEndpointProtocolVersion(0);
    }

    private void waitUntilMessageIsPendingWithCalculatedChecksum(ProducerImpl<?> producer) {
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((int)producer.getPendingQueueSize(), (int)1));
    }

    @Test
    public void testTamperingMessageIsDetected() throws Exception {
        ProducerImpl producer = (ProducerImpl)this.pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/testTamperingMessageIsDetected").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        TypedMessageBuilderImpl msgBuilder = (TypedMessageBuilderImpl)producer.newMessage().value((Object)"a message".getBytes());
        MessageMetadata msgMetadata = msgBuilder.getMetadataBuilder().setProducerName("test").setSequenceId(1L).setPublishTime(10L);
        ByteBuf payload = Unpooled.wrappedBuffer((ByteBuffer)msgBuilder.getContent());
        ByteBufPair cmd = Commands.newSend((long)1L, (long)1L, (int)1, (Commands.ChecksumType)Commands.ChecksumType.Crc32c, (MessageMetadata)msgMetadata, (ByteBuf)payload);
        ProducerImpl.OpSendMsg op = ProducerImpl.OpSendMsg.create((MessageImpl)((MessageImpl)msgBuilder.getMessage()), (ByteBufPair)cmd, (long)1L, null);
        Assert.assertTrue((boolean)producer.verifyLocalBufferIsNotCorrupted(op));
        msgBuilder.getContent().put(0, (byte)98);
        Assert.assertFalse((boolean)producer.verifyLocalBufferIsNotCorrupted(op));
    }

    static enum MixedVersionScenario {
        CONNECTED_TO_NEW_THEN_OLD_VERSION,
        CONNECTED_TO_OLD_THEN_NEW_VERSION;

    }
}

