package io.kubemq.sdk.queues;

import io.grpc.stub.StreamObserver;
import io.kubemq.sdk.client.KubeMQClient;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import kubemq.Kubemq;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/kubemq/sdk/queues/QueueStreamHelper.class */
public class QueueStreamHelper {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(QueueStreamHelper.class);
    private StreamObserver<Kubemq.QueuesUpstreamRequest> queuesUpStreamHandler = null;
    private StreamObserver<Kubemq.QueuesDownstreamRequest> queuesDownstreamHandler = null;
    private CompletableFuture<QueueSendResult> futureResponse = new CompletableFuture<>();

    public QueueSendResult sendMessage(KubeMQClient kubeMQClient, Kubemq.QueuesUpstreamRequest queuesUpstreamRequest) {
        if (this.queuesUpStreamHandler == null) {
            this.queuesUpStreamHandler = kubeMQClient.getAsyncClient().queuesUpstream(new StreamObserver<Kubemq.QueuesUpstreamResponse>() { // from class: io.kubemq.sdk.queues.QueueStreamHelper.1
                public void onNext(Kubemq.QueuesUpstreamResponse queuesUpstreamResponse) {
                    QueueStreamHelper.log.debug("QueuesUpstreamResponse Received Message send result: '{}'", queuesUpstreamResponse);
                    QueueSendResult queueSendResult = new QueueSendResult();
                    if (queuesUpstreamResponse.getIsError()) {
                        queueSendResult.setIsError(true);
                        queueSendResult.setError(queuesUpstreamResponse.getError());
                    } else {
                        queueSendResult.decode(queuesUpstreamResponse.getResults(0));
                    }
                    QueueStreamHelper.this.futureResponse.complete(queueSendResult);
                }

                public void onError(Throwable th) {
                    QueueStreamHelper.log.error("Error in QueuesUpstreamResponse Error message sending: ", th);
                    QueueStreamHelper.this.futureResponse.complete(QueueSendResult.builder().error(th.getMessage()).isError(true).build());
                }

                public void onCompleted() {
                    QueueStreamHelper.log.debug("QueuesUpstreamResponse onCompleted.");
                }
            });
        }
        synchronized (this) {
            try {
                log.debug("Sending message");
                this.queuesUpStreamHandler.onNext(queuesUpstreamRequest);
            } catch (Exception e) {
                log.error("Error sending message: ", e);
                throw new RuntimeException("Failed to send message", e);
            }
        }
        try {
            log.debug("Retrieving response from futureResponse.get()");
            return this.futureResponse.get();
        } catch (Exception e2) {
            log.error("Error waiting for response: ", e2);
            throw new RuntimeException("Failed to get response", e2);
        }
    }

    public QueuesPollResponse receiveMessage(KubeMQClient kubeMQClient, QueuesPollRequest queuesPollRequest) {
        final CompletableFuture completableFuture = new CompletableFuture();
        if (this.queuesDownstreamHandler == null) {
            this.queuesDownstreamHandler = kubeMQClient.getAsyncClient().queuesDownstream(new StreamObserver<Kubemq.QueuesDownstreamResponse>() { // from class: io.kubemq.sdk.queues.QueueStreamHelper.2
                public void onNext(Kubemq.QueuesDownstreamResponse queuesDownstreamResponse) {
                    QueueStreamHelper.log.debug("QueuesDownstreamResponse Received Metadata: '{}'", queuesDownstreamResponse);
                    QueuesPollResponse build = QueuesPollResponse.builder().refRequestId(queuesDownstreamResponse.getRefRequestId()).activeOffsets(queuesDownstreamResponse.getActiveOffsetsList()).receiverClientId(queuesDownstreamResponse.getTransactionId()).isTransactionCompleted(queuesDownstreamResponse.getTransactionComplete()).transactionId(queuesDownstreamResponse.getTransactionId()).error(queuesDownstreamResponse.getError()).isError(queuesDownstreamResponse.getIsError()).build();
                    Iterator<Kubemq.QueueMessage> it = queuesDownstreamResponse.getMessagesList().iterator();
                    while (it.hasNext()) {
                        build.getMessages().add(QueueMessageReceived.decode(it.next(), build.getTransactionId(), build.isTransactionCompleted(), build.getReceiverClientId(), QueueStreamHelper.this.queuesDownstreamHandler));
                    }
                    completableFuture.complete(build);
                }

                public void onError(Throwable th) {
                    QueueStreamHelper.log.error("Error in QueuesDownstreamResponse StreamObserver: ", th);
                    completableFuture.complete(QueuesPollResponse.builder().error(th.getMessage()).isError(true).build());
                }

                public void onCompleted() {
                    QueueStreamHelper.log.debug("QueuesDownstreamResponse StreamObserver completed.");
                }
            });
        }
        synchronized (this) {
            try {
                this.queuesDownstreamHandler.onNext(queuesPollRequest.encode(kubeMQClient.getClientId()));
            } catch (Exception e) {
                log.error("Error polling message: ", e);
                throw new RuntimeException("Failed to polling message", e);
            }
        }
        try {
            return (QueuesPollResponse) completableFuture.get();
        } catch (Exception e2) {
            log.error("Error waiting for Queue Message response: ", e2);
            throw new RuntimeException("Failed to get response", e2);
        }
    }
}
