package com.networknt.kafka.common;

import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/networknt/kafka/common/FlinkKafkaProducer.class */
public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
    private static final Logger logger = LoggerFactory.getLogger(FlinkKafkaProducer.class);
    private final KafkaProducer<K, V> kafkaProducer;
    private final String transactionalId;

    public FlinkKafkaProducer(Properties properties) {
        this.transactionalId = properties.getProperty("transactional.id");
        this.kafkaProducer = new KafkaProducer<>(properties);
    }

    public void initTransactions() {
        this.kafkaProducer.initTransactions();
    }

    public void beginTransaction() throws ProducerFencedException {
        this.kafkaProducer.beginTransaction();
    }

    public void commitTransaction() throws ProducerFencedException {
        this.kafkaProducer.commitTransaction();
    }

    public void abortTransaction() throws ProducerFencedException {
        this.kafkaProducer.abortTransaction();
    }

    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> map, String str) throws ProducerFencedException {
        this.kafkaProducer.sendOffsetsToTransaction(map, str);
    }

    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> map, ConsumerGroupMetadata consumerGroupMetadata) throws ProducerFencedException {
        this.kafkaProducer.sendOffsetsToTransaction(map, consumerGroupMetadata);
    }

    public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord) {
        return this.kafkaProducer.send(producerRecord);
    }

    public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord, Callback callback) {
        return this.kafkaProducer.send(producerRecord, callback);
    }

    public List<PartitionInfo> partitionsFor(String str) {
        return this.kafkaProducer.partitionsFor(str);
    }

    public Map<MetricName, ? extends Metric> metrics() {
        return this.kafkaProducer.metrics();
    }

    public void close() {
        this.kafkaProducer.close();
    }

    public void close(Duration duration) {
        this.kafkaProducer.close(duration);
    }

    public void flush() {
        this.kafkaProducer.flush();
        if (this.transactionalId != null) {
            flushNewPartitions();
        }
    }

    public void resumeTransaction(long j, short s) {
        logger.info("Attempting to resume transaction {} with producerId {} and epoch {}", new Object[]{this.transactionalId, Long.valueOf(j), Short.valueOf(s)});
        Object value = getValue(this.kafkaProducer, "transactionManager");
        synchronized (value) {
            Object value2 = getValue(value, "sequenceNumbers");
            invoke(value, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
            invoke(value2, "clear", new Object[0]);
            Object value3 = getValue(value, "producerIdAndEpoch");
            setValue(value3, "producerId", Long.valueOf(j));
            setValue(value3, "epoch", Short.valueOf(s));
            invoke(value, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
            invoke(value, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
            setValue(value, "transactionStarted", true);
        }
    }

    public String getTransactionalId() {
        return this.transactionalId;
    }

    public long getProducerId() {
        return ((Long) getValue(getValue(getValue(this.kafkaProducer, "transactionManager"), "producerIdAndEpoch"), "producerId")).longValue();
    }

    public short getEpoch() {
        return ((Short) getValue(getValue(getValue(this.kafkaProducer, "transactionManager"), "producerIdAndEpoch"), "epoch")).shortValue();
    }

    public int getTransactionCoordinatorId() {
        return ((Node) invoke(getValue(this.kafkaProducer, "transactionManager"), "coordinator", FindCoordinatorRequest.CoordinatorType.TRANSACTION)).id();
    }

    private void flushNewPartitions() {
        logger.info("Flushing new partitions");
        TransactionalRequestResult enqueueNewPartitions = enqueueNewPartitions();
        invoke(getValue(this.kafkaProducer, "sender"), "wakeup", new Object[0]);
        enqueueNewPartitions.await();
    }

    private TransactionalRequestResult enqueueNewPartitions() {
        TransactionalRequestResult transactionalRequestResult;
        Object value = getValue(this.kafkaProducer, "transactionManager");
        synchronized (value) {
            Object invoke = invoke(value, "addPartitionsToTransactionHandler", new Object[0]);
            invoke(value, "enqueueRequest", new Class[]{invoke.getClass().getSuperclass()}, new Object[]{invoke});
            transactionalRequestResult = (TransactionalRequestResult) getValue(invoke, invoke.getClass().getSuperclass(), "result");
        }
        return transactionalRequestResult;
    }

    private static Enum<?> getEnum(String str) {
        String[] split = str.split("\\.(?=[^\\.]+$)");
        if (split.length != 2) {
            return null;
        }
        String str2 = split[0];
        try {
            return Enum.valueOf(Class.forName(str2), split[1]);
        } catch (ClassNotFoundException e) {
            throw new RuntimeException("Incompatible KafkaProducer version", e);
        }
    }

    private static Object invoke(Object obj, String str, Object... objArr) {
        Class[] clsArr = new Class[objArr.length];
        for (int i = 0; i < objArr.length; i++) {
            clsArr[i] = objArr[i].getClass();
        }
        return invoke(obj, str, clsArr, objArr);
    }

    private static Object invoke(Object obj, String str, Class<?>[] clsArr, Object[] objArr) {
        try {
            Method declaredMethod = obj.getClass().getDeclaredMethod(str, clsArr);
            declaredMethod.setAccessible(true);
            return declaredMethod.invoke(obj, objArr);
        } catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
            throw new RuntimeException("Incompatible KafkaProducer version", e);
        }
    }

    private static Object getValue(Object obj, String str) {
        return getValue(obj, obj.getClass(), str);
    }

    private static Object getValue(Object obj, Class<?> cls, String str) {
        try {
            Field declaredField = cls.getDeclaredField(str);
            declaredField.setAccessible(true);
            return declaredField.get(obj);
        } catch (IllegalAccessException | NoSuchFieldException e) {
            throw new RuntimeException("Incompatible KafkaProducer version", e);
        }
    }

    private static void setValue(Object obj, String str, Object obj2) {
        try {
            Field declaredField = obj.getClass().getDeclaredField(str);
            declaredField.setAccessible(true);
            declaredField.set(obj, obj2);
        } catch (IllegalAccessException | NoSuchFieldException e) {
            throw new RuntimeException("Incompatible KafkaProducer version", e);
        }
    }
}
