/*
 * Decompiled with CFR 0.152.
 */
package com.networknt.tram.producer;

import com.networknt.config.Config;
import com.networknt.tram.producer.KafkaProducerConfig;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TramKafkaProducer<K, V>
implements Producer<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger(TramKafkaProducer.class);
    private final KafkaProducer<K, V> kafkaProducer;
    private final String transactionalId;

    public TramKafkaProducer(Properties properties) {
        this.transactionalId = properties.getProperty("transactional.id");
        this.kafkaProducer = new KafkaProducer(properties);
    }

    public TramKafkaProducer() {
        Properties properties = new Properties();
        KafkaProducerConfig config = (KafkaProducerConfig)Config.getInstance().getJsonObjectConfig("kafka-producer", KafkaProducerConfig.class);
        properties.setProperty("bootstrap.servers", config.getBootstrapServers());
        properties.put("transactional.id", config.getTransactionId());
        properties.put("key.serializer", config.getKeySerializer());
        properties.put("value.serializer", config.getValueSerializer());
        this.transactionalId = properties.getProperty("transactional.id");
        this.kafkaProducer = new KafkaProducer(properties);
    }

    public void initTransactions() {
        this.kafkaProducer.initTransactions();
    }

    public void beginTransaction() throws ProducerFencedException {
        this.kafkaProducer.beginTransaction();
    }

    public void commitTransaction() throws ProducerFencedException {
        this.kafkaProducer.commitTransaction();
    }

    public void abortTransaction() throws ProducerFencedException {
        this.kafkaProducer.abortTransaction();
    }

    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException {
        this.kafkaProducer.sendOffsetsToTransaction(offsets, consumerGroupId);
    }

    public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
        return this.kafkaProducer.send(record);
    }

    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        return this.kafkaProducer.send(record, callback);
    }

    public List<PartitionInfo> partitionsFor(String topic) {
        return this.kafkaProducer.partitionsFor(topic);
    }

    public Map<MetricName, ? extends Metric> metrics() {
        return this.kafkaProducer.metrics();
    }

    public void close() {
        this.kafkaProducer.close();
    }

    public void close(long timeout, TimeUnit unit) {
        this.kafkaProducer.close(timeout, unit);
    }

    public void flush() {
        this.kafkaProducer.flush();
        if (this.transactionalId != null) {
            this.flushNewPartitions();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void resumeTransaction(long producerId, short epoch) {
        Object transactionManager;
        LOG.info("Attempting to resume transaction {} with producerId {} and epoch {}", new Object[]{this.transactionalId, producerId, epoch});
        Object object = transactionManager = TramKafkaProducer.getValue(this.kafkaProducer, "transactionManager");
        synchronized (object) {
            Object nextSequence = TramKafkaProducer.getValue(transactionManager, "nextSequence");
            TramKafkaProducer.invoke(transactionManager, "transitionTo", TramKafkaProducer.getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
            TramKafkaProducer.invoke(nextSequence, "clear", new Object[0]);
            Object producerIdAndEpoch = TramKafkaProducer.getValue(transactionManager, "producerIdAndEpoch");
            TramKafkaProducer.setValue(producerIdAndEpoch, "producerId", producerId);
            TramKafkaProducer.setValue(producerIdAndEpoch, "epoch", epoch);
            TramKafkaProducer.invoke(transactionManager, "transitionTo", TramKafkaProducer.getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
            TramKafkaProducer.invoke(transactionManager, "transitionTo", TramKafkaProducer.getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
            TramKafkaProducer.setValue(transactionManager, "transactionStarted", true);
        }
    }

    public String getTransactionalId() {
        return this.transactionalId;
    }

    public long getProducerId() {
        Object transactionManager = TramKafkaProducer.getValue(this.kafkaProducer, "transactionManager");
        Object producerIdAndEpoch = TramKafkaProducer.getValue(transactionManager, "producerIdAndEpoch");
        return (Long)TramKafkaProducer.getValue(producerIdAndEpoch, "producerId");
    }

    public short getEpoch() {
        Object transactionManager = TramKafkaProducer.getValue(this.kafkaProducer, "transactionManager");
        Object producerIdAndEpoch = TramKafkaProducer.getValue(transactionManager, "producerIdAndEpoch");
        return (Short)TramKafkaProducer.getValue(producerIdAndEpoch, "epoch");
    }

    public int getTransactionCoordinatorId() {
        Object transactionManager = TramKafkaProducer.getValue(this.kafkaProducer, "transactionManager");
        Node node = (Node)TramKafkaProducer.invoke(transactionManager, "coordinator", FindCoordinatorRequest.CoordinatorType.TRANSACTION);
        return node.id();
    }

    private void flushNewPartitions() {
        LOG.info("Flushing new partitions");
        TransactionalRequestResult result = this.enqueueNewPartitions();
        Object sender = TramKafkaProducer.getValue(this.kafkaProducer, "sender");
        TramKafkaProducer.invoke(sender, "wakeup", new Object[0]);
        result.await();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private TransactionalRequestResult enqueueNewPartitions() {
        Object transactionManager;
        Object object = transactionManager = TramKafkaProducer.getValue(this.kafkaProducer, "transactionManager");
        synchronized (object) {
            Object txnRequestHandler = TramKafkaProducer.invoke(transactionManager, "addPartitionsToTransactionHandler", new Object[0]);
            TramKafkaProducer.invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler});
            TransactionalRequestResult result = (TransactionalRequestResult)TramKafkaProducer.getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result");
            return result;
        }
    }

    private static Enum<?> getEnum(String enumFullName) {
        String[] x = enumFullName.split("\\.(?=[^\\.]+$)");
        if (x.length == 2) {
            String enumClassName = x[0];
            String enumName = x[1];
            try {
                Class<?> cl = Class.forName(enumClassName);
                return Enum.valueOf(cl, enumName);
            }
            catch (ClassNotFoundException e) {
                throw new RuntimeException("Incompatible KafkaProducer version", e);
            }
        }
        return null;
    }

    private static Object invoke(Object object, String methodName, Object ... args) {
        Class[] argTypes = new Class[args.length];
        for (int i = 0; i < args.length; ++i) {
            argTypes[i] = args[i].getClass();
        }
        return TramKafkaProducer.invoke(object, methodName, argTypes, args);
    }

    private static Object invoke(Object object, String methodName, Class<?>[] argTypes, Object[] args) {
        try {
            Method method = object.getClass().getDeclaredMethod(methodName, argTypes);
            method.setAccessible(true);
            return method.invoke(object, args);
        }
        catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
            throw new RuntimeException("Incompatible KafkaProducer version", e);
        }
    }

    private static Object getValue(Object object, String fieldName) {
        return TramKafkaProducer.getValue(object, object.getClass(), fieldName);
    }

    private static Object getValue(Object object, Class<?> clazz, String fieldName) {
        try {
            Field field = clazz.getDeclaredField(fieldName);
            field.setAccessible(true);
            return field.get(object);
        }
        catch (IllegalAccessException | NoSuchFieldException e) {
            throw new RuntimeException("Incompatible KafkaProducer version", e);
        }
    }

    private static void setValue(Object object, String fieldName, Object value) {
        try {
            Field field = object.getClass().getDeclaredField(fieldName);
            field.setAccessible(true);
            field.set(object, value);
        }
        catch (IllegalAccessException | NoSuchFieldException e) {
            throw new RuntimeException("Incompatible KafkaProducer version", e);
        }
    }
}

