package io.axual.client.producer.generic;

import io.axual.client.exception.ProduceFailedException;
import io.axual.client.producer.ProduceCallback;
import io.axual.client.producer.ProducedMessage;
import io.axual.client.producer.ProducerMessage;
import io.axual.client.proxy.generic.producer.ProducerProxy;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/axual/client/producer/generic/ProduceJob.class */
public class ProduceJob<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger(ProduceJob.class);
    private final ProducerMessage<K, V> message;
    private boolean executed = false;
    private final ProduceFuture<K, V> produceFuture = new ProduceFuture<>();
    private final ProduceCallback<K, V> produceCallback;

    public ProduceJob(ProducerMessage<K, V> producerMessage, ProduceCallback<K, V> produceCallback) {
        this.message = producerMessage;
        this.produceCallback = produceCallback;
    }

    public ProduceFuture<K, V> execute(ProducerProxy<K, V> producerProxy) {
        if (this.executed) {
            throw new ProduceFailedException(this.message.getKey(), this.message.getValue(), System.currentTimeMillis(), null);
        }
        this.executed = true;
        try {
            long currentTimeMillis = System.currentTimeMillis();
            producerProxy.send(this.message.getProducerRecord(), (recordMetadata, exc) -> {
                if (exc == null) {
                    LOG.trace("Successfully sent record: key={}, value={}, stream={}, partition={}, offset={}", new Object[]{this.message.getKey(), this.message.getValue(), recordMetadata.topic(), Integer.valueOf(recordMetadata.partition()), Long.valueOf(recordMetadata.offset())});
                    completeProduce(new GenericProducedMessage(this.message, recordMetadata));
                } else {
                    if (LOG.isTraceEnabled()) {
                        LOG.warn("Failed to send record with messageId {}: key={}, value={}, ", new Object[]{this.message.getMessageId(), this.message.getKey(), this.message.getValue()});
                    } else {
                        LOG.warn("Failed to send record with messageId {}", this.message.getMessageId());
                    }
                    completeProduce((Throwable) new ProduceFailedException(this.message.getKey(), this.message.getValue(), currentTimeMillis, exc));
                }
            });
        } catch (Exception e) {
            if (LOG.isTraceEnabled()) {
                LOG.error("Error during produce to stream {}:\n  Key: {} \n  Value: {} ", new Object[]{this.message.getStream(), this.message.getKey(), this.message.getValue()});
            } else {
                LOG.error("Error during produce to stream {}: ", this.message.getStream(), e);
            }
            completeProduce(e);
        }
        return this.produceFuture;
    }

    public ProduceFuture<K, V> getFuture() {
        return this.produceFuture;
    }

    public ProducerMessage<K, V> getMessage() {
        return this.message;
    }

    public void completeProduce(Throwable th) {
        completeProduce(th.getMessage(), th);
    }

    public void completeProduce(String str, Throwable th) {
        ExecutionException executionException = new ExecutionException(str, th);
        if (this.produceCallback != null) {
            this.produceCallback.onError(this.message, executionException);
        }
        this.produceFuture.complete(executionException);
    }

    private void completeProduce(ProducedMessage<K, V> producedMessage) {
        if (this.produceCallback != null) {
            this.produceCallback.onComplete(producedMessage);
        }
        this.produceFuture.complete(producedMessage);
    }
}
