package io.axual.client.producer.generic;

import io.axual.client.config.BaseProducerConfig;
import io.axual.client.config.DeliveryStrategy;
import io.axual.client.config.OrderingStrategy;
import io.axual.client.exception.BufferFullException;
import io.axual.client.exception.ProducerWorkerCancelledException;
import io.axual.client.producer.ProducedMessage;
import io.axual.client.proxy.axual.producer.AxualProducer;
import io.axual.client.proxy.generic.producer.ProducerProxy;
import io.axual.common.config.ClientConfig;
import io.axual.common.tools.KafkaUtil;
import io.axual.common.tools.SleepUtil;
import java.time.Duration;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/axual/client/producer/generic/ProducerWorker.class */
public class ProducerWorker<K, V> extends Thread {
    private static final Logger LOG = LoggerFactory.getLogger(ProducerWorker.class);
    private static final AtomicInteger WORKERCOUNT = new AtomicInteger(0);
    private static final String THREE_MINUTES = "180000";
    private final AtomicBoolean isCancelled = new AtomicBoolean(false);
    private final ClientConfig clientConfig;
    private final BaseProducerConfig producerConfig;
    private final boolean blockedMessageInsert;
    private final int messageBufferSize;
    private final long messageBufferWaitTimeout;
    private final ArrayBlockingQueue<ProduceJob<K, V>> messageBuffer;
    private final ProducerProxy<K, V> producer;
    private final AtomicBoolean isRunning;

    public ProducerWorker(ClientConfig clientConfig, BaseProducerConfig baseProducerConfig) {
        LOG.info("Creating new ProducerWorker");
        this.clientConfig = clientConfig;
        this.producerConfig = baseProducerConfig;
        this.messageBufferSize = baseProducerConfig.getMessageBufferSize();
        this.messageBufferWaitTimeout = baseProducerConfig.getMessageBufferWaitTimeout();
        this.blockedMessageInsert = baseProducerConfig.isBlocking();
        this.messageBuffer = new ArrayBlockingQueue<>(this.messageBufferSize);
        this.producer = createNewKafkaProducer();
        this.isRunning = new AtomicBoolean(false);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private ProducerProxy<K, V> createNewKafkaProducer() {
        Map kafkaConfigs = KafkaUtil.getKafkaConfigs(this.clientConfig);
        Long l = 2L;
        Long l2 = 30000L;
        Long valueOf = Long.valueOf(l.longValue() * (l2.longValue() + this.producerConfig.getLingerMs()));
        kafkaConfigs.put("key.serializer", this.producerConfig.getKeySerializer());
        kafkaConfigs.put("value.serializer", this.producerConfig.getValueSerializer());
        kafkaConfigs.put("acks", this.producerConfig.getDeliveryStrategy() == DeliveryStrategy.AT_LEAST_ONCE ? "-1" : "0");
        kafkaConfigs.put("retries", Long.toString(l.longValue()));
        kafkaConfigs.put("reconnect.backoff.ms", "1000");
        kafkaConfigs.put("retry.backoff.ms", "1000");
        kafkaConfigs.put("max.in.flight.requests.per.connection", this.producerConfig.getOrderingStrategy() == OrderingStrategy.LOSING_ORDER ? "10" : "1");
        kafkaConfigs.put("linger.ms", Long.valueOf(this.producerConfig.getLingerMs()));
        kafkaConfigs.put("batch.size", this.producerConfig.getBatchSize());
        kafkaConfigs.put("connections.max.idle.ms", THREE_MINUTES);
        kafkaConfigs.put("metadata.max.age.ms", THREE_MINUTES);
        kafkaConfigs.put("metadata.max.idle.ms", THREE_MINUTES);
        kafkaConfigs.put("max.request.size", "1000000");
        kafkaConfigs.put("request.timeout.ms", l2.toString());
        kafkaConfigs.put("delivery.timeout.ms", valueOf.toString());
        kafkaConfigs.put("axualproducer.chain", this.producerConfig.getProxyChain());
        LOG.info("Creating a new Axual producer with properties: {}", kafkaConfigs);
        AxualProducer axualProducer = new AxualProducer(kafkaConfigs);
        LOG.info("Created a new Axual producer");
        return axualProducer;
    }

    @Override // java.lang.Thread
    public synchronized void start() {
        this.isRunning.set(true);
        super.start();
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        Thread.currentThread().setName(ProducerWorker.class.getSimpleName() + Integer.toString(WORKERCOUNT.getAndIncrement()));
        LOG.info("Starting ProducerWorker thread {}", Thread.currentThread().getName());
        while (!this.isCancelled.get()) {
            try {
                try {
                    takeJob(this.producer);
                } catch (Exception e) {
                    LOG.error("ProducerWorker caught exception", e);
                    this.isRunning.set(false);
                    this.producer.close();
                }
            } catch (Throwable th) {
                this.isRunning.set(false);
                this.producer.close();
                throw th;
            }
        }
        LOG.info("ProducerWorker cancelled, flushing {} messages in buffer", Integer.valueOf(this.messageBuffer.size()));
        Iterator<ProduceJob<K, V>> it = this.messageBuffer.iterator();
        while (it.hasNext()) {
            it.next().execute(this.producer);
        }
        this.messageBuffer.clear();
        this.isRunning.set(false);
        this.producer.close();
        LOG.info("Exiting ProducerWorker thread {}", Thread.currentThread().getName());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void cancel() {
        LOG.info("ProducerWorker cancel requested");
        this.isCancelled.set(true);
    }

    private void takeJob(ProducerProxy<K, V> producerProxy) {
        LOG.debug("Taking new job from queue");
        try {
            ProduceJob<K, V> poll = this.messageBuffer.poll(100L, TimeUnit.MILLISECONDS);
            if (poll != null) {
                executeJob(producerProxy, poll);
            }
        } catch (InterruptedException e) {
            if (this.isCancelled.get()) {
                LOG.info("The ProducerWorker got cancelled and the take from messageBuffer was interrupted");
            } else {
                LOG.warn("Take from messageBuffer was interrupted while the ProducerWorker is not cancelled", e);
            }
            Thread.currentThread().interrupt();
        }
    }

    private void executeJob(ProducerProxy<K, V> producerProxy, ProduceJob<K, V> produceJob) {
        try {
            LOG.debug("Producing message");
            ProduceFuture<K, V> execute = produceJob.execute(producerProxy);
            if (this.producerConfig.getOrderingStrategy() == OrderingStrategy.KEEPING_ORDER) {
                while (!execute.isDone()) {
                    SleepUtil.sleep(Duration.ofMillis(25L));
                }
            }
        } catch (Exception e) {
            LOG.error("Produce message failed", e);
            produceJob.completeProduce("Produce message failed", e);
        }
    }

    public Future<ProducedMessage<K, V>> queueJob(ProduceJob<K, V> produceJob) {
        if (this.isCancelled.get()) {
            LOG.error("Trying to produce while the ProducerWorker is cancelled");
            produceJob.completeProduce((Throwable) new ProducerWorkerCancelledException("Cannot produce messages on cancelled ProducerWorker"));
            return produceJob.getFuture();
        }
        if (!this.isRunning.get()) {
            LOG.error("Trying to produce while the ProducerWorker is stopped or failed");
            produceJob.completeProduce((Throwable) new ProducerWorkerCancelledException("Cannot produce messages on stopped ProducerWorker"));
            return produceJob.getFuture();
        }
        try {
            if (this.blockedMessageInsert) {
                this.messageBuffer.put(produceJob);
                return produceJob.getFuture();
            }
            if (this.messageBufferWaitTimeout < 1) {
                if (!this.messageBuffer.offer(produceJob)) {
                    produceJob.completeProduce((Throwable) new BufferFullException(String.format("Could not add message to buffer. Buffer maximum size is %d", Integer.valueOf(this.messageBufferSize))));
                }
                return produceJob.getFuture();
            }
            if (!this.messageBuffer.offer(produceJob, this.messageBufferWaitTimeout, TimeUnit.MILLISECONDS)) {
                produceJob.completeProduce((Throwable) new BufferFullException(String.format("Could not add message to buffer. Buffer wait timeout is %d. Buffer maximum size is %d", Long.valueOf(this.messageBufferWaitTimeout), Integer.valueOf(this.messageBufferSize))));
            }
            return produceJob.getFuture();
        } catch (InterruptedException e) {
            LOG.warn("Put message into buffer was interrupted");
            produceJob.completeProduce("Put message into buffer was interrupted", e);
            Thread.currentThread().interrupt();
            return produceJob.getFuture();
        }
    }
}
