package io.nosqlbench.driver.pulsar.ops;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import io.nosqlbench.driver.pulsar.PulsarActivity;
import io.nosqlbench.driver.pulsar.util.AvroUtil;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.transaction.Transaction;

/* loaded from: input_file:io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.class */
public class PulsarProducerOp implements PulsarOp {
    private static final Logger logger = LogManager.getLogger(PulsarProducerOp.class);
    private final Producer<?> producer;
    private final Schema<?> pulsarSchema;
    private final String msgKey;
    private final String msgPayload;
    private final boolean asyncPulsarOp;
    private final Counter bytesCounter;
    private final Histogram messagesizeHistogram;
    private final PulsarActivity pulsarActivity;
    private final boolean useTransaction;
    private final Supplier<Transaction> transactionSupplier;

    public PulsarProducerOp(Producer<?> producer, Schema<?> schema, boolean z, boolean z2, Supplier<Transaction> supplier, String str, String str2, PulsarActivity pulsarActivity) {
        this.producer = producer;
        this.pulsarSchema = schema;
        this.msgKey = str;
        this.msgPayload = str2;
        this.asyncPulsarOp = z;
        this.pulsarActivity = pulsarActivity;
        this.bytesCounter = pulsarActivity.getBytesCounter();
        this.messagesizeHistogram = pulsarActivity.getMessagesizeHistogram();
        this.useTransaction = z2;
        this.transactionSupplier = supplier;
    }

    @Override // io.nosqlbench.driver.pulsar.ops.PulsarOp
    public void run(Runnable runnable) {
        Transaction transaction;
        TypedMessageBuilder newMessage;
        TypedMessageBuilder value;
        int length;
        if (this.msgPayload == null || this.msgPayload.isEmpty()) {
            throw new RuntimeException("Message payload (\"msg-value\") can't be empty!");
        }
        if (this.useTransaction) {
            transaction = this.transactionSupplier.get();
            newMessage = this.producer.newMessage(transaction);
        } else {
            transaction = null;
            newMessage = this.producer.newMessage(this.pulsarSchema);
        }
        if (this.msgKey != null && !this.msgKey.isEmpty()) {
            newMessage = newMessage.key(this.msgKey);
        }
        if (PulsarActivityUtil.isAvroSchemaTypeStr(this.pulsarSchema.getSchemaInfo().getType().name())) {
            value = newMessage.value(AvroUtil.GetGenericRecord_PulsarAvro(this.pulsarSchema, this.pulsarSchema.getSchemaInfo().getSchemaDefinition(), this.msgPayload));
            length = this.msgPayload.length();
        } else {
            byte[] bytes = this.msgPayload.getBytes(StandardCharsets.UTF_8);
            value = newMessage.value(bytes);
            length = bytes.length;
        }
        this.messagesizeHistogram.update(length);
        this.bytesCounter.inc(length);
        if (this.asyncPulsarOp) {
            try {
                CompletableFuture sendAsync = value.sendAsync();
                if (this.useTransaction) {
                    Transaction transaction2 = transaction;
                    sendAsync = sendAsync.thenCompose(obj -> {
                        Timer.Context time = this.pulsarActivity.getCommitTransactionTimer().time();
                        return transaction2.commit().whenComplete((r3, th) -> {
                            time.close();
                        }).thenApply(r32 -> {
                            return obj;
                        });
                    });
                }
                sendAsync.whenComplete((obj2, th) -> {
                    runnable.run();
                }).exceptionally(th2 -> {
                    logger.error("Producing message failed: key - " + this.msgKey + "; payload - " + this.msgPayload);
                    this.pulsarActivity.asyncOperationFailed(th2);
                    return null;
                });
                return;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        try {
            logger.trace("sending message");
            value.send();
            if (this.useTransaction) {
                Timer.Context time = this.pulsarActivity.getCommitTransactionTimer().time();
                try {
                    transaction.commit().get();
                    if (time != null) {
                        time.close();
                    }
                } finally {
                }
            }
            runnable.run();
        } catch (PulsarClientException | InterruptedException | ExecutionException e2) {
            logger.trace("failed sending message");
            throw new RuntimeException((Throwable) e2);
        }
    }
}
