package io.hoplin.batch;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import io.hoplin.MessagePayload;
import io.hoplin.json.JsonCodec;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/hoplin/batch/BatchReplyConsumer.class */
public class BatchReplyConsumer extends DefaultConsumer {
    private static final Logger log = LoggerFactory.getLogger(BatchReplyConsumer.class);
    private final ConcurrentHashMap<UUID, CompletableFutureWrapperBatchContext> batches;
    private JsonCodec codec;
    private final Executor executor;

    public BatchReplyConsumer(Channel channel, ConcurrentHashMap<UUID, CompletableFutureWrapperBatchContext> concurrentHashMap, Executor executor) {
        super(channel);
        this.executor = (Executor) Objects.requireNonNull(executor);
        this.batches = (ConcurrentHashMap) Objects.requireNonNull(concurrentHashMap);
        this.codec = new JsonCodec();
    }

    public BatchReplyConsumer(Channel channel, ConcurrentHashMap<UUID, CompletableFutureWrapperBatchContext> concurrentHashMap) {
        this(channel, concurrentHashMap, Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
    }

    public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) {
        log.info("received : {}", basicProperties);
        Map headers = basicProperties.getHeaders();
        UUID fromString = UUID.fromString(headers.getOrDefault("x-batch-id", "").toString());
        UUID fromString2 = UUID.fromString(headers.getOrDefault("x-batch-correlationId", "").toString());
        CompletableFutureWrapperBatchContext completableFutureWrapperBatchContext = this.batches.get(fromString);
        BatchContext context = completableFutureWrapperBatchContext.getContext();
        CompletableFuture<BatchContext> future = completableFutureWrapperBatchContext.getFuture();
        boolean z = false;
        Iterator<BatchContextTask> it = context.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            if (it.next().getTaskId().equals(fromString2)) {
                long decrementAndGetTaskCount = context.decrementAndGetTaskCount();
                z = true;
                if (log.isDebugEnabled()) {
                    log.debug("Reminding task count[batch] : {} {}", fromString, Long.valueOf(decrementAndGetTaskCount));
                }
            }
        }
        if (!z) {
            throw new IllegalStateException("not found : " + fromString2);
        }
        if (context.isCompleted()) {
            future.complete(context);
        }
    }

    private void handleReply(byte[] bArr, CompletableFuture<Object> completableFuture) {
        CompletableFuture.runAsync(() -> {
            if (log.isDebugEnabled()) {
                log.debug("reply body : {}", new String(bArr));
            }
            try {
                completableFuture.complete(deserializeReplyPayload(bArr).getPayload());
            } catch (Exception e) {
                log.error("Unable to complete reply action", e);
                completableFuture.completeExceptionally(e);
            }
        }, this.executor);
    }

    private MessagePayload<?> deserializeReplyPayload(byte[] bArr) {
        return (MessagePayload) this.codec.deserialize(bArr, MessagePayload.class);
    }
}
