/*
 * Decompiled with CFR 0.152.
 */
package org.enodeframework.kafka;

import java.util.concurrent.CompletableFuture;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.enodeframework.commanding.CommandResult;
import org.enodeframework.commanding.CommandReturnType;
import org.enodeframework.commanding.ICommand;
import org.enodeframework.common.utilities.Ensure;
import org.enodeframework.kafka.KafkaTool;
import org.enodeframework.kafka.SendMessageService;
import org.enodeframework.queue.QueueMessage;
import org.enodeframework.queue.command.AbstractCommandService;
import org.springframework.kafka.core.KafkaTemplate;

public class KafkaCommandService
extends AbstractCommandService {
    private KafkaTemplate<String, String> producer;

    public KafkaTemplate<String, String> getProducer() {
        return this.producer;
    }

    public void setProducer(KafkaTemplate<String, String> producer) {
        this.producer = producer;
    }

    public CompletableFuture<Void> sendAsync(ICommand command) {
        return SendMessageService.sendMessageAsync(this.producer, this.buildKafkaMessage(command, false));
    }

    public CompletableFuture<CommandResult> executeAsync(ICommand command) {
        return this.executeAsync(command, CommandReturnType.CommandExecuted);
    }

    public CompletableFuture<CommandResult> executeAsync(ICommand command, CommandReturnType commandReturnType) {
        CompletableFuture<CommandResult> taskCompletionSource = new CompletableFuture<CommandResult>();
        try {
            Ensure.notNull((Object)this.commandResultProcessor, (String)"commandResultProcessor");
            this.commandResultProcessor.registerProcessingCommand(command, commandReturnType, taskCompletionSource);
            CompletableFuture<Void> sendMessageAsync = SendMessageService.sendMessageAsync(this.producer, this.buildKafkaMessage(command, true));
            ((CompletableFuture)sendMessageAsync.thenAccept(sendResult -> {})).exceptionally(ex -> {
                this.commandResultProcessor.processFailedSendingCommand(command);
                taskCompletionSource.completeExceptionally((Throwable)ex);
                return null;
            });
        }
        catch (Exception ex2) {
            taskCompletionSource.completeExceptionally(ex2);
        }
        return taskCompletionSource;
    }

    protected ProducerRecord<String, String> buildKafkaMessage(ICommand command, boolean needReply) {
        QueueMessage queueMessage = this.buildCommandMessage(command, needReply);
        return KafkaTool.covertToProducerRecord(queueMessage);
    }
}

