package fi.evolver.ai.spring.provider.perplexity;

import fi.evolver.ai.spring.ApiResponseException;
import fi.evolver.ai.spring.ContentSubscriber;
import fi.evolver.ai.spring.chat.ChatResponse;
import fi.evolver.ai.spring.chat.FunctionCall;
import fi.evolver.ai.spring.chat.prompt.ChatPrompt;
import fi.evolver.ai.spring.chat.prompt.Message;
import fi.evolver.ai.spring.provider.perplexity.response.chat.PChatResult;
import fi.evolver.ai.spring.util.RateLimitHeaders;
import java.util.ArrayList;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:fi/evolver/ai/spring/provider/perplexity/PerplexityStreamingChatResponse.class */
public class PerplexityStreamingChatResponse extends ChatResponse {
    private static final Logger LOG = LoggerFactory.getLogger(PerplexityStreamingChatResponse.class);
    private final Deque<PChatResult> results;
    private List<ContentSubscriber> subscribers;
    private final CountDownLatch responseCompleteLatch;
    private Optional<String> content;
    private Message message;
    private volatile Throwable responseException;

    public PerplexityStreamingChatResponse(ChatPrompt chatPrompt) {
        super(chatPrompt);
        this.results = new ConcurrentLinkedDeque();
        this.subscribers = new ArrayList();
        this.responseCompleteLatch = new CountDownLatch(1);
        this.content = Optional.empty();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void addResult(PChatResult pChatResult) {
        this.results.add(pChatResult);
        if (pChatResult.choices().isEmpty()) {
            return;
        }
        Optional<String> content = getContent(pChatResult);
        for (ContentSubscriber contentSubscriber : this.subscribers) {
            try {
                Objects.requireNonNull(contentSubscriber);
                content.ifPresent(contentSubscriber::onContent);
            } catch (RuntimeException e) {
                LOG.error("Subscriber failed handling content update", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void handleError(Throwable th) {
        this.responseException = th;
        this.responseCompleteLatch.countDown();
        Iterator<ContentSubscriber> it = this.subscribers.iterator();
        while (it.hasNext()) {
            try {
                it.next().onError(th);
            } catch (RuntimeException e) {
                LOG.error("Subscriber failed handling stream error ({})", th.toString(), e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void handleStreamEnd() {
        String orElse = getFinishReason().orElse(null);
        if (orElse == null) {
            handleError(new IllegalStateException("Stream ended without finish reason"));
            return;
        }
        if (this.responseCompleteLatch.getCount() == 0) {
            return;
        }
        this.content = Optional.of((String) this.results.stream().map(PerplexityStreamingChatResponse::getContent).filter((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        }).collect(Collectors.joining()));
        this.message = Message.assistant(this.content.orElseThrow());
        this.responseCompleteLatch.countDown();
        Iterator<ContentSubscriber> it = this.subscribers.iterator();
        while (it.hasNext()) {
            try {
                it.next().onComplete(orElse);
            } catch (RuntimeException e) {
                LOG.error("Subscriber failed handling stream completion", e);
            }
        }
    }

    private static Optional<String> getContent(PChatResult pChatResult) {
        return Optional.of((String) pChatResult.choices().stream().map((v0) -> {
            return v0.delta();
        }).map((v0) -> {
            return v0.content();
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.joining())).filter(str -> {
            return !str.isEmpty();
        });
    }

    @Override // fi.evolver.ai.spring.chat.ChatResponse
    public synchronized void addSubscriber(ContentSubscriber contentSubscriber) {
        this.subscribers.add(contentSubscriber);
        Iterator<PChatResult> it = this.results.iterator();
        while (it.hasNext()) {
            Optional<String> content = getContent(it.next());
            Objects.requireNonNull(contentSubscriber);
            content.ifPresent(contentSubscriber::onContent);
        }
        if (this.responseException != null) {
            contentSubscriber.onError(this.responseException);
            return;
        }
        Optional<String> finishReason = getFinishReason();
        Objects.requireNonNull(contentSubscriber);
        finishReason.ifPresent(contentSubscriber::onComplete);
    }

    @Override // fi.evolver.ai.spring.chat.ChatResponse
    public String getResultState() {
        try {
            getResponseMessage();
            return getFinishReason().orElse("error");
        } catch (RuntimeException e) {
            return "error";
        }
    }

    @Override // fi.evolver.ai.spring.chat.ChatResponse
    public boolean isSuccess() {
        return PerplexityService.FINISH_REASONS_OK.contains(getResultState());
    }

    private Optional<String> getFinishReason() {
        return Optional.ofNullable(this.results.peekLast()).flatMap(PerplexityStreamingChatResponse::getFinishReason);
    }

    private static Optional<String> getFinishReason(PChatResult pChatResult) {
        return pChatResult.choices().stream().map((v0) -> {
            return v0.finishReason();
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).findFirst();
    }

    @Override // fi.evolver.ai.spring.chat.ChatResponse
    public Message getResponseMessage() {
        try {
            this.responseCompleteLatch.await();
            if (this.responseException != null) {
                throw new ApiResponseException(this.responseException, "Reading message failed unexpectedly", new Object[0]);
            }
            return this.message;
        } catch (InterruptedException e) {
            throw new ApiResponseException(e, "Interrupted while waiting for response", new Object[0]);
        }
    }

    @Override // fi.evolver.ai.spring.chat.ChatResponse
    public Optional<String> getTextContent() {
        getResponseMessage();
        return this.content;
    }

    @Override // fi.evolver.ai.spring.chat.ChatResponse
    public RateLimitHeaders getRateLimitHeaders() {
        return RateLimitHeaders.EMPTY;
    }

    @Override // fi.evolver.ai.spring.chat.ChatResponse
    public Optional<FunctionCall> getFunctionCall() {
        return Optional.empty();
    }

    @Override // fi.evolver.ai.spring.chat.ChatResponse
    public List<FunctionCall> getFunctionCalls() {
        return List.of();
    }
}
