package io.kubemq.sdk.queues;

import io.grpc.stub.StreamObserver;
import io.kubemq.sdk.client.KubeMQClient;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import kubemq.Kubemq;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/kubemq/sdk/queues/QueueDownstreamHandler.class */
public class QueueDownstreamHandler {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(QueueDownstreamHandler.class);
    private final KubeMQClient kubeMQClient;
    private final AtomicBoolean isConnected = new AtomicBoolean(false);
    private final Object sendRequestLock = new Object();
    private volatile StreamObserver<Kubemq.QueuesDownstreamRequest> requestsObserver = null;
    private volatile StreamObserver<Kubemq.QueuesDownstreamResponse> responsesObserver = null;
    private final Map<String, CompletableFuture<QueuesPollResponse>> pendingResponses = new ConcurrentHashMap();
    private final Map<String, QueuesPollRequest> pendingRequests = new ConcurrentHashMap();

    public QueueDownstreamHandler(KubeMQClient kubeMQClient) {
        this.kubeMQClient = kubeMQClient;
    }

    public void connect() {
        if (this.isConnected.get()) {
            return;
        }
        synchronized (this) {
            if (this.isConnected.get()) {
                return;
            }
            try {
                this.responsesObserver = new StreamObserver<Kubemq.QueuesDownstreamResponse>() { // from class: io.kubemq.sdk.queues.QueueDownstreamHandler.1
                    public void onNext(Kubemq.QueuesDownstreamResponse queuesDownstreamResponse) {
                        String refRequestId = queuesDownstreamResponse.getRefRequestId();
                        CompletableFuture completableFuture = (CompletableFuture) QueueDownstreamHandler.this.pendingResponses.remove(refRequestId);
                        QueuesPollRequest queuesPollRequest = (QueuesPollRequest) QueueDownstreamHandler.this.pendingRequests.remove(refRequestId);
                        if (completableFuture == null || queuesPollRequest == null) {
                            return;
                        }
                        QueuesPollResponse build = QueuesPollResponse.builder().refRequestId(refRequestId).activeOffsets(queuesDownstreamResponse.getActiveOffsetsList()).receiverClientId(queuesDownstreamResponse.getTransactionId()).isTransactionCompleted(queuesDownstreamResponse.getTransactionComplete()).transactionId(queuesDownstreamResponse.getTransactionId()).error(queuesDownstreamResponse.getError()).isError(queuesDownstreamResponse.getIsError()).build();
                        Iterator<Kubemq.QueueMessage> it = queuesDownstreamResponse.getMessagesList().iterator();
                        while (it.hasNext()) {
                            build.getMessages().add(new QueueMessageReceived().decode(it.next(), build.getTransactionId(), build.isTransactionCompleted(), build.getReceiverClientId(), queuesPollRequest.getVisibilitySeconds(), queuesPollRequest.isAutoAckMessages(), build));
                        }
                        completableFuture.complete(build);
                    }

                    public void onError(Throwable th) {
                        QueueDownstreamHandler.log.error("Error in QueuesDownstreamResponse StreamObserver: ", th);
                        QueueDownstreamHandler.this.closeStreamWithError(th.getMessage(), th);
                    }

                    public void onCompleted() {
                        QueueDownstreamHandler.log.info("QueuesDownstreamResponse onCompleted.");
                        QueueDownstreamHandler.this.closeStreamWithError("Stream completed", null);
                    }
                };
                this.requestsObserver = this.kubeMQClient.getAsyncClient().queuesDownstream(this.responsesObserver);
                this.isConnected.set(true);
            } catch (Exception e) {
                log.error("Error in QueuesDownstreamResponse StreamObserver: ", e);
                this.isConnected.set(false);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeStreamWithError(String str, Throwable th) {
        this.isConnected.set(false);
        this.pendingResponses.forEach((str2, completableFuture) -> {
            completableFuture.complete(QueuesPollResponse.builder().error(str).isError(true).build());
        });
        this.pendingRequests.clear();
        this.pendingResponses.clear();
    }

    private CompletableFuture<QueuesPollResponse> receiveQueuesMessagesAsync(QueuesPollRequest queuesPollRequest) {
        String generateRequestId = generateRequestId();
        CompletableFuture<QueuesPollResponse> completableFuture = new CompletableFuture<>();
        this.pendingResponses.put(generateRequestId, completableFuture);
        this.pendingRequests.put(generateRequestId, queuesPollRequest);
        try {
            sendRequest(queuesPollRequest.encode(this.kubeMQClient.getClientId()).toBuilder().setRequestID(generateRequestId).build());
        } catch (Exception e) {
            this.pendingRequests.remove(generateRequestId);
            this.pendingResponses.remove(generateRequestId);
            log.error("Error polling message: ", e);
            completableFuture.completeExceptionally(e);
        }
        return completableFuture;
    }

    public QueuesPollResponse receiveQueuesMessages(QueuesPollRequest queuesPollRequest) {
        try {
            QueuesPollResponse queuesPollResponse = receiveQueuesMessagesAsync(queuesPollRequest).get();
            queuesPollResponse.complete(this::sendRequest);
            return queuesPollResponse;
        } catch (Exception e) {
            log.error("Error waiting for Queue Message response: ", e);
            throw new RuntimeException("Failed to get response", e);
        }
    }

    private void sendRequest(Kubemq.QueuesDownstreamRequest queuesDownstreamRequest) {
        if (!this.isConnected.get()) {
            connect();
        }
        Kubemq.QueuesDownstreamRequest build = queuesDownstreamRequest.toBuilder().setClientID(this.kubeMQClient.getClientId()).build();
        synchronized (this.sendRequestLock) {
            if (this.requestsObserver != null) {
                this.requestsObserver.onNext(build);
            } else {
                log.warn("RequestsObserver is null; unable to send request.");
            }
        }
    }

    private String generateRequestId() {
        return UUID.randomUUID().toString();
    }
}
