package io.kubemq.sdk.queues;

import io.grpc.stub.StreamObserver;
import io.kubemq.sdk.client.KubeMQClient;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import kubemq.Kubemq;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/kubemq/sdk/queues/QueueUpstreamHandler.class */
public class QueueUpstreamHandler {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(QueueUpstreamHandler.class);
    private final KubeMQClient kubeMQClient;
    private volatile StreamObserver<Kubemq.QueuesUpstreamRequest> requestsObserver;
    private volatile StreamObserver<Kubemq.QueuesUpstreamResponse> responsesObserver;
    private final AtomicBoolean isConnected = new AtomicBoolean(false);
    private final Object sendRequestLock = new Object();
    private final Map<String, CompletableFuture<QueueSendResult>> pendingResponses = new ConcurrentHashMap();

    public QueueUpstreamHandler(KubeMQClient kubeMQClient) {
        this.kubeMQClient = kubeMQClient;
    }

    public void connect() {
        if (this.isConnected.get()) {
            return;
        }
        synchronized (this) {
            if (this.isConnected.get()) {
                return;
            }
            try {
                this.responsesObserver = new StreamObserver<Kubemq.QueuesUpstreamResponse>() { // from class: io.kubemq.sdk.queues.QueueUpstreamHandler.1
                    public void onNext(Kubemq.QueuesUpstreamResponse queuesUpstreamResponse) {
                        String refRequestID = queuesUpstreamResponse.getRefRequestID();
                        CompletableFuture completableFuture = (CompletableFuture) QueueUpstreamHandler.this.pendingResponses.remove(refRequestID);
                        if (completableFuture != null) {
                            if (queuesUpstreamResponse.getIsError()) {
                                completableFuture.complete(QueueSendResult.builder().id(refRequestID).isError(true).error(queuesUpstreamResponse.getError()).build());
                            } else if (queuesUpstreamResponse.getResultsCount() == 0) {
                                completableFuture.complete(QueueSendResult.builder().id(refRequestID).isError(true).error("no results").build());
                            } else {
                                completableFuture.complete(new QueueSendResult().decode(queuesUpstreamResponse.getResults(0)));
                            }
                        }
                    }

                    public void onError(Throwable th) {
                        QueueUpstreamHandler.log.error("Error in QueuesUpstreamResponse StreamObserver: ", th);
                        QueueUpstreamHandler.this.closeStreamWithError(th.getMessage());
                    }

                    public void onCompleted() {
                        QueueUpstreamHandler.log.info("QueuesUpstreamResponse onCompleted.");
                        QueueUpstreamHandler.this.closeStreamWithError("Stream completed");
                    }
                };
                this.requestsObserver = this.kubeMQClient.getAsyncClient().queuesUpstream(this.responsesObserver);
                this.isConnected.set(true);
            } catch (Exception e) {
                log.error("Error initializing QueuesUpstreamResponse StreamObserver: ", e);
                this.isConnected.set(false);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeStreamWithError(String str) {
        this.isConnected.set(false);
        this.pendingResponses.forEach((str2, completableFuture) -> {
            completableFuture.complete(QueueSendResult.builder().error(str).isError(true).build());
        });
        this.pendingResponses.clear();
    }

    private CompletableFuture<QueueSendResult> sendQueuesMessageAsync(QueueMessage queueMessage) {
        String generateRequestId = generateRequestId();
        CompletableFuture<QueueSendResult> completableFuture = new CompletableFuture<>();
        this.pendingResponses.put(generateRequestId, completableFuture);
        try {
            sendRequest(queueMessage.encode(this.kubeMQClient.getClientId()).toBuilder().setRequestID(generateRequestId).build());
        } catch (Exception e) {
            this.pendingResponses.remove(generateRequestId);
            log.error("Error sending queue message: ", e);
            completableFuture.completeExceptionally(e);
        }
        return completableFuture;
    }

    public QueueSendResult sendQueuesMessage(QueueMessage queueMessage) {
        try {
            return sendQueuesMessageAsync(queueMessage).get();
        } catch (Exception e) {
            log.error("Error waiting for send Queue Message response: ", e);
            throw new RuntimeException("Failed to get response", e);
        }
    }

    private void sendRequest(Kubemq.QueuesUpstreamRequest queuesUpstreamRequest) {
        if (!this.isConnected.get()) {
            connect();
        }
        synchronized (this.sendRequestLock) {
            if (this.requestsObserver != null) {
                this.requestsObserver.onNext(queuesUpstreamRequest);
            } else {
                log.warn("RequestsObserver is null; unable to send request.");
            }
        }
    }

    private String generateRequestId() {
        return UUID.randomUUID().toString();
    }
}
