/*
 * Decompiled with CFR 0.152.
 */
package com.networknt.kafka.producer;

import com.networknt.config.Config;
import com.networknt.httpstring.HttpStringConstants;
import com.networknt.kafka.common.FlinkKafkaProducer;
import com.networknt.kafka.common.KafkaProducerConfig;
import com.networknt.kafka.common.TransactionalKafkaException;
import com.networknt.kafka.producer.QueuedLightProducer;
import com.networknt.server.ServerConfig;
import io.undertow.server.HttpServerExchange;
import java.nio.charset.StandardCharsets;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransactionalProducer
implements Runnable,
QueuedLightProducer {
    private static final Logger logger;
    static String callerId;
    static final KafkaProducerConfig config;
    static final String topic;
    private BlockingQueue<ProducerRecord<byte[], byte[]>> txQueue = new LinkedBlockingQueue<ProducerRecord<byte[], byte[]>>();
    private final BlockingDeque<String> availableTransactionalIds = new LinkedBlockingDeque<String>();
    private KafkaTransactionState currentTransaction;
    private transient Callback callback;
    private volatile transient Exception asyncException;
    private volatile long transactionTimeout;
    private final AtomicLong pendingRecords = new AtomicLong();
    private final AtomicBoolean stopped = new AtomicBoolean(false);

    @Override
    public BlockingQueue<ProducerRecord<byte[], byte[]>> getTxQueue() {
        return this.txQueue;
    }

    public TransactionalProducer() {
        logger.info("The TransactionalProducer is created");
    }

    @Override
    public void run() {
        while (!this.stopped.get()) {
            try {
                this.currentTransaction = this.beginTransaction();
                ArrayList buffer = new ArrayList();
                while (!this.stopped.get()) {
                    int added = TransactionalProducer.drain(this.txQueue, buffer, 5000, 1000L, TimeUnit.MILLISECONDS);
                    if (logger.isTraceEnabled() && added > 0) {
                        logger.trace("drained transactions = " + added);
                    }
                    for (int j = 0; j < added; ++j) {
                        ProducerRecord record = (ProducerRecord)buffer.get(j);
                        this.invoke(this.currentTransaction, topic, (ProducerRecord<byte[], byte[]>)record);
                    }
                    if (added <= 0) continue;
                    break;
                }
                long producerId = this.currentTransaction.producer.getProducerId();
                short epoch = this.currentTransaction.producer.getEpoch();
                if (logger.isDebugEnabled()) {
                    logger.debug("producerId = " + producerId + " epoch = " + epoch);
                }
                this.commit(this.currentTransaction);
            }
            catch (InterruptedException e) {
                logger.error("InterruptedException", (Throwable)e);
            }
            catch (TransactionalKafkaException e) {
                logger.error("TransactionalKafkaException", (Throwable)e);
                this.abort(this.currentTransaction);
            }
            catch (AuthorizationException | OutOfOrderSequenceException | ProducerFencedException e) {
                logger.error("One of the three exceptions", e);
                try {
                    this.close();
                }
                catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            catch (KafkaException e) {
                logger.error("KafkaException", (Throwable)e);
                this.abort(this.currentTransaction);
                if (!(e instanceof ConfigException)) continue;
                throw new RuntimeException("Kafka is down!");
            }
        }
    }

    public static <E> int drain(BlockingQueue<E> q, Collection<? super E> buffer, int numElements, long timeout, TimeUnit unit) throws InterruptedException {
        long deadline = System.nanoTime() + unit.toNanos(timeout);
        int added = 0;
        while (added < numElements) {
            if ((added += q.drainTo(buffer, numElements - added)) >= numElements) continue;
            E e = q.poll(deadline - System.nanoTime(), TimeUnit.NANOSECONDS);
            if (e == null) break;
            buffer.add(e);
            ++added;
        }
        return added;
    }

    public void invoke(KafkaTransactionState transaction, String topic, ProducerRecord<byte[], byte[]> record) throws TransactionalKafkaException {
        this.pendingRecords.incrementAndGet();
        transaction.producer.send(record, this.callback);
    }

    @Override
    public void open() {
        this.callback = new Callback(){

            public void onCompletion(RecordMetadata metadata, Exception e) {
                if (e != null) {
                    logger.error("Error while sending record to Kafka: " + e.getMessage(), (Throwable)e);
                }
                TransactionalProducer.this.acknowledgeMessage();
            }
        };
    }

    protected void preCommit(KafkaTransactionState transaction) throws TransactionalKafkaException {
        this.flush(transaction);
    }

    protected void commit(KafkaTransactionState transaction) {
        transaction.producer.commitTransaction();
        this.recycleTransactionalProducer(transaction.producer);
    }

    protected void recoverAndCommit(KafkaTransactionState transaction) {
        try (FlinkKafkaProducer<byte[], byte[]> producer = this.initTransactionalProducer(transaction.transactionalId);){
            producer.resumeTransaction(transaction.producerId, transaction.epoch);
            producer.commitTransaction();
        }
        catch (InvalidTxnStateException | ProducerFencedException ex) {
            logger.warn("Encountered error {} while recovering transaction {}. Presumably this transaction has been already committed before", (Object)ex, (Object)transaction);
        }
    }

    protected void abort(KafkaTransactionState transaction) {
        if (transaction != null) {
            transaction.producer.abortTransaction();
            this.recycleTransactionalProducer(transaction.producer);
        }
    }

    private void recycleTransactionalProducer(FlinkKafkaProducer<byte[], byte[]> producer) {
        this.availableTransactionalIds.add(producer.getTransactionalId());
        producer.close();
    }

    protected void recoverAndAbort(KafkaTransactionState transaction) {
        try (FlinkKafkaProducer<byte[], byte[]> producer = this.initTransactionalProducer(transaction.transactionalId);){
            producer.initTransactions();
        }
    }

    private void acknowledgeMessage() {
        this.pendingRecords.decrementAndGet();
    }

    @Override
    public void close() {
        KafkaTransactionState currentTransaction = this.currentTransaction();
        if (currentTransaction != null) {
            this.flush(currentTransaction);
            this.commit(currentTransaction);
        }
        this.stopped.getAndSet(true);
    }

    private void flush(KafkaTransactionState transaction) {
        long pendingRecordsCount;
        if (transaction.producer != null) {
            transaction.producer.flush();
        }
        if ((pendingRecordsCount = this.pendingRecords.get()) != 0L) {
            throw new IllegalStateException("Pending record count must be zero at this point: " + pendingRecordsCount);
        }
    }

    public KafkaTransactionState currentTransaction() {
        return this.currentTransaction;
    }

    public KafkaTransactionState beginTransaction() throws TransactionalKafkaException {
        FlinkKafkaProducer<byte[], byte[]> producer = this.createTransactionalProducer();
        producer.beginTransaction();
        return new KafkaTransactionState(producer.getTransactionalId(), producer);
    }

    private FlinkKafkaProducer<byte[], byte[]> createTransactionalProducer() throws TransactionalKafkaException {
        FlinkKafkaProducer<byte[], byte[]> producer = this.initTransactionalProducer((String)config.getProperties().get("transactional.id"));
        producer.initTransactions();
        return producer;
    }

    private FlinkKafkaProducer<byte[], byte[]> initTransactionalProducer(String transactionalId) {
        config.getProperties().put("transactional.id", transactionalId);
        return this.initProducer();
    }

    private FlinkKafkaProducer<byte[], byte[]> initProducer() {
        FlinkKafkaProducer producer = new FlinkKafkaProducer(config.getProperties());
        logger.info("Starting FlinkKafkaProducer");
        return producer;
    }

    @Override
    public void propagateHeaders(ProducerRecord record, HttpServerExchange exchange) {
        Headers headers = record.headers();
        String token = exchange.getRequestHeaders().getFirst("authorization");
        headers.add("authorization", token.getBytes(StandardCharsets.UTF_8));
        String cid = exchange.getRequestHeaders().getFirst(HttpStringConstants.CORRELATION_ID);
        headers.add("X-Correlation-Id", cid.getBytes(StandardCharsets.UTF_8));
        String tid = exchange.getRequestHeaders().getFirst(HttpStringConstants.TRACEABILITY_ID);
        if (tid != null) {
            headers.add("X-Traceability-Id", tid.getBytes(StandardCharsets.UTF_8));
        }
        if (config.isInjectCallerId()) {
            headers.add("caller_id", callerId.getBytes(StandardCharsets.UTF_8));
        }
    }

    public static int addressToPartition(String address) {
        String bankId = address.substring(0, 4);
        return Integer.valueOf(bankId);
    }

    static {
        ServerConfig serverConfig;
        logger = LoggerFactory.getLogger(TransactionalProducer.class);
        callerId = "unknown";
        config = (KafkaProducerConfig)Config.getInstance().getJsonObjectConfig("kafka-producer", KafkaProducerConfig.class);
        if (config.isInjectCallerId() && (serverConfig = ServerConfig.getInstance()) != null) {
            callerId = serverConfig.getServiceId();
        }
        topic = config.getTopic();
    }

    public static final class TransactionHolder<KafkaTransactionState> {
        private final KafkaTransactionState handle;
        private final long transactionStartTime;

        public TransactionHolder(KafkaTransactionState handle, long transactionStartTime) {
            this.handle = handle;
            this.transactionStartTime = transactionStartTime;
        }

        long elapsedTime(Clock clock) {
            return clock.millis() - this.transactionStartTime;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            TransactionHolder that = (TransactionHolder)o;
            if (this.transactionStartTime != that.transactionStartTime) {
                return false;
            }
            return this.handle != null ? this.handle.equals(that.handle) : that.handle == null;
        }

        public int hashCode() {
            int result = this.handle != null ? this.handle.hashCode() : 0;
            result = 31 * result + (int)(this.transactionStartTime ^ this.transactionStartTime >>> 32);
            return result;
        }

        public String toString() {
            return "TransactionHolder{handle=" + this.handle + ", transactionStartTime=" + this.transactionStartTime + "}";
        }
    }

    public static class NextTransactionalIdHint {
        public int lastParallelism = 0;
        public long nextFreeTransactionalId = 0L;

        public NextTransactionalIdHint() {
            this(0, 0L);
        }

        public NextTransactionalIdHint(int parallelism, long nextFreeTransactionalId) {
            this.lastParallelism = parallelism;
            this.nextFreeTransactionalId = nextFreeTransactionalId;
        }
    }

    static class KafkaTransactionState {
        private final transient FlinkKafkaProducer<byte[], byte[]> producer;
        final String transactionalId;
        final long producerId;
        final short epoch;

        KafkaTransactionState(String transactionalId, FlinkKafkaProducer<byte[], byte[]> producer) {
            this(transactionalId, producer.getProducerId(), producer.getEpoch(), producer);
        }

        KafkaTransactionState(FlinkKafkaProducer<byte[], byte[]> producer) {
            this(null, -1L, -1, producer);
        }

        KafkaTransactionState(String transactionalId, long producerId, short epoch, FlinkKafkaProducer<byte[], byte[]> producer) {
            this.transactionalId = transactionalId;
            this.producerId = producerId;
            this.epoch = epoch;
            this.producer = producer;
        }

        public String toString() {
            return String.format("%s [transactionalId=%s, producerId=%s, epoch=%s]", this.getClass().getSimpleName(), this.transactionalId, this.producerId, this.epoch);
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            KafkaTransactionState that = (KafkaTransactionState)o;
            if (this.producerId != that.producerId) {
                return false;
            }
            if (this.epoch != that.epoch) {
                return false;
            }
            return this.transactionalId != null ? this.transactionalId.equals(that.transactionalId) : that.transactionalId == null;
        }

        public int hashCode() {
            int result = this.transactionalId != null ? this.transactionalId.hashCode() : 0;
            result = 31 * result + (int)(this.producerId ^ this.producerId >>> 32);
            result = 31 * result + this.epoch;
            return result;
        }
    }
}

