/*
 * Decompiled with CFR 0.152.
 */
package io.axual.client.producer.generic;

import io.axual.client.exception.ProduceFailedException;
import io.axual.client.producer.ProduceCallback;
import io.axual.client.producer.ProducedMessage;
import io.axual.client.producer.ProducerMessage;
import io.axual.client.producer.generic.GenericProducedMessage;
import io.axual.client.producer.generic.ProduceFuture;
import io.axual.client.proxy.generic.producer.ProducerProxy;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProduceJob<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger(ProduceJob.class);
    private final ProducerMessage<K, V> message;
    private boolean executed = false;
    private final ProduceFuture<K, V> produceFuture = new ProduceFuture();
    private final ProduceCallback<K, V> produceCallback;

    public ProduceJob(ProducerMessage<K, V> message, ProduceCallback<K, V> produceCallback) {
        this.message = message;
        this.produceCallback = produceCallback;
    }

    public ProduceFuture<K, V> execute(ProducerProxy<K, V> producer) {
        if (this.executed) {
            throw new ProduceFailedException(this.message.getKey(), this.message.getValue(), System.currentTimeMillis(), null);
        }
        this.executed = true;
        try {
            long produceTime = System.currentTimeMillis();
            producer.send(this.message.getProducerRecord(), (metadata, exception) -> {
                if (exception != null) {
                    if (LOG.isTraceEnabled()) {
                        LOG.warn("Failed to send record with messageId {}: key={}, value={}, ", new Object[]{this.message.getMessageId(), this.message.getKey(), this.message.getValue()});
                    } else {
                        LOG.warn("Failed to send record with messageId {}", (Object)this.message.getMessageId());
                    }
                    this.completeProduce((Throwable)((Object)new ProduceFailedException(this.message.getKey(), this.message.getValue(), produceTime, exception)));
                } else {
                    LOG.trace("Successfully sent record: key={}, value={}, stream={}, partition={}, offset={}", new Object[]{this.message.getKey(), this.message.getValue(), metadata.topic(), metadata.partition(), metadata.offset()});
                    this.completeProduce(new GenericProducedMessage<K, V>(this.message, metadata));
                }
            });
        }
        catch (Exception e) {
            if (LOG.isTraceEnabled()) {
                LOG.error("Error during produce to stream {}:\n  Key: {} \n  Value: {} ", new Object[]{this.message.getStream(), this.message.getKey(), this.message.getValue()});
            } else {
                LOG.error("Error during produce to stream {}: ", (Object)this.message.getStream(), (Object)e);
            }
            this.completeProduce(e);
        }
        return this.produceFuture;
    }

    public ProduceFuture<K, V> getFuture() {
        return this.produceFuture;
    }

    public ProducerMessage<K, V> getMessage() {
        return this.message;
    }

    public void completeProduce(Throwable cause) {
        this.completeProduce(cause.getMessage(), cause);
    }

    public void completeProduce(String message, Throwable cause) {
        ExecutionException executionException = new ExecutionException(message, cause);
        if (this.produceCallback != null) {
            this.produceCallback.onError(this.message, executionException);
        }
        this.produceFuture.complete(executionException);
    }

    private void completeProduce(ProducedMessage<K, V> producedMessage) {
        if (this.produceCallback != null) {
            this.produceCallback.onComplete(producedMessage);
        }
        this.produceFuture.complete(producedMessage);
    }
}

