package io.fluxcapacitor.javaclient.publishing;

import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.javaclient.common.ClientUtils;
import io.fluxcapacitor.javaclient.common.Message;
import io.fluxcapacitor.javaclient.common.serialization.Serializer;
import io.fluxcapacitor.javaclient.configuration.client.Client;
import io.fluxcapacitor.javaclient.tracking.ConsumerConfiguration;
import io.fluxcapacitor.javaclient.tracking.client.DefaultTracker;
import java.beans.ConstructorProperties;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/fluxcapacitor/javaclient/publishing/DefaultRequestHandler.class */
public class DefaultRequestHandler implements RequestHandler {
    private static final Logger log = LoggerFactory.getLogger(DefaultRequestHandler.class);
    private final Serializer serializer;
    private final Client client;
    private final Map<Integer, CompletableFuture<Message>> callbacks = new ConcurrentHashMap();
    private final AtomicInteger nextId = new AtomicInteger();
    private final AtomicBoolean started = new AtomicBoolean();

    @Override // io.fluxcapacitor.javaclient.publishing.RequestHandler
    public CompletableFuture<Message> sendRequest(SerializedMessage serializedMessage, Consumer<SerializedMessage> consumer) {
        if (this.started.compareAndSet(false, true)) {
            DefaultTracker.start((Consumer<List<SerializedMessage>>) this::handleMessages, ConsumerConfiguration.getDefault(MessageType.RESULT), this.client);
        }
        CompletableFuture<Message> completableFuture = new CompletableFuture<>();
        int andIncrement = this.nextId.getAndIncrement();
        this.callbacks.put(Integer.valueOf(andIncrement), completableFuture);
        serializedMessage.setRequestId(Integer.valueOf(andIncrement));
        serializedMessage.setSource(this.client.id());
        consumer.accept(serializedMessage);
        return completableFuture;
    }

    @Override // io.fluxcapacitor.javaclient.publishing.RequestHandler, java.lang.AutoCloseable
    public void close() {
        ClientUtils.waitForResults(Duration.ofSeconds(2L), this.callbacks.values());
    }

    protected void handleMessages(List<SerializedMessage> list) {
        list.forEach(serializedMessage -> {
            try {
                CompletableFuture<Message> completableFuture = this.callbacks.get(serializedMessage.getRequestId());
                if (completableFuture == null) {
                    log.warn("Received response with index {} for unknown request {}", serializedMessage.getIndex(), serializedMessage.getRequestId());
                    this.callbacks.remove(serializedMessage.getRequestId());
                    return;
                }
                try {
                    Object deserialize = this.serializer.deserialize(serializedMessage.getData());
                    try {
                        if (deserialize instanceof Throwable) {
                            completableFuture.completeExceptionally((Exception) deserialize);
                        } else {
                            completableFuture.complete(new Message(deserialize, serializedMessage.getMetadata()));
                        }
                    } catch (Exception e) {
                        log.error("Failed to complete request with id {}", serializedMessage.getRequestId(), e);
                    }
                    this.callbacks.remove(serializedMessage.getRequestId());
                } catch (Exception e2) {
                    log.error("Failed to deserialize result with id {}. Continuing with next result", serializedMessage.getRequestId(), e2);
                    completableFuture.completeExceptionally(e2);
                    this.callbacks.remove(serializedMessage.getRequestId());
                }
            } catch (Throwable th) {
                this.callbacks.remove(serializedMessage.getRequestId());
                throw th;
            }
        });
    }

    @ConstructorProperties({"serializer", "client"})
    public DefaultRequestHandler(Serializer serializer, Client client) {
        this.serializer = serializer;
        this.client = client;
    }
}
