package ai.yda.framework.rag.generator.shared;

import com.azure.ai.openai.assistants.AssistantsClient;
import com.azure.ai.openai.assistants.AssistantsClientBuilder;
import com.azure.ai.openai.assistants.models.AssistantThread;
import com.azure.ai.openai.assistants.models.AssistantThreadCreationOptions;
import com.azure.ai.openai.assistants.models.CreateRunOptions;
import com.azure.ai.openai.assistants.models.MessageDeltaChunk;
import com.azure.ai.openai.assistants.models.MessageDeltaTextContentObject;
import com.azure.ai.openai.assistants.models.MessageRole;
import com.azure.ai.openai.assistants.models.MessageTextContent;
import com.azure.ai.openai.assistants.models.RunStatus;
import com.azure.ai.openai.assistants.models.StreamMessageUpdate;
import com.azure.ai.openai.assistants.models.ThreadMessage;
import com.azure.ai.openai.assistants.models.ThreadMessageOptions;
import com.azure.ai.openai.assistants.models.ThreadRun;
import com.azure.core.credential.KeyCredential;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:ai/yda/framework/rag/generator/shared/AzureOpenAiAssistantService.class */
public class AzureOpenAiAssistantService {
    private final AssistantsClient assistantsClient;

    public AzureOpenAiAssistantService(String str) {
        this.assistantsClient = new AssistantsClientBuilder().credential(new KeyCredential(str)).buildClient();
    }

    public AssistantThread createThread(String str) {
        return this.assistantsClient.createThread(new AssistantThreadCreationOptions().setMessages(List.of(new ThreadMessageOptions(MessageRole.USER, str))));
    }

    public void addMessageToThread(String str, String str2) {
        this.assistantsClient.createMessage(str, new ThreadMessageOptions(MessageRole.USER, str2));
    }

    public String createRunAndWaitForResponse(String str, String str2, String str3) {
        return getLastMessage(waitForRunToFinish(this.assistantsClient.createRun(str, new CreateRunOptions(str2).setAdditionalInstructions(str3))).getThreadId());
    }

    public Flux<String> createRunStream(String str, String str2, String str3) {
        return Flux.fromIterable(this.assistantsClient.createRunStream(str, new CreateRunOptions(str2).setAdditionalInstructions(str3))).subscribeOn(Schedulers.boundedElastic()).filter(streamUpdate -> {
            return streamUpdate instanceof StreamMessageUpdate;
        }).map(streamUpdate2 -> {
            return extractDeltaContent(((StreamMessageUpdate) streamUpdate2).getMessage());
        });
    }

    private ThreadRun waitForRunToFinish(ThreadRun threadRun) {
        AtomicReference atomicReference = new AtomicReference(threadRun);
        try {
            ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
            newSingleThreadScheduledExecutor.scheduleAtFixedRate(() -> {
                ThreadRun run = this.assistantsClient.getRun(threadRun.getThreadId(), threadRun.getId());
                RunStatus status = run.getStatus();
                if (status == RunStatus.QUEUED || status == RunStatus.IN_PROGRESS) {
                    return;
                }
                atomicReference.set(run);
                newSingleThreadScheduledExecutor.shutdown();
            }, 1L, 1L, TimeUnit.SECONDS).get();
            newSingleThreadScheduledExecutor.shutdown();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(String.format("Error while waiting for thread run: threadId - %s, runId - %s", threadRun.getThreadId(), threadRun.getId()), e);
        } catch (CancellationException e2) {
        }
        return (ThreadRun) atomicReference.get();
    }

    private String getLastMessage(String str) {
        return (String) ((ThreadMessage) this.assistantsClient.listMessages(str).getData().get(0)).getContent().stream().map(messageContent -> {
            return ((MessageTextContent) messageContent).getText().getValue();
        }).collect(Collectors.joining(". "));
    }

    private String extractDeltaContent(MessageDeltaChunk messageDeltaChunk) {
        return (String) messageDeltaChunk.getDelta().getContent().parallelStream().map(messageDeltaContent -> {
            return ((MessageDeltaTextContentObject) messageDeltaContent).getText().getValue();
        }).collect(Collectors.joining(". "));
    }
}
