package korolev.server.internal.services;

import korolev.Qsid;
import korolev.data.Bytes;
import korolev.data.Bytes$;
import korolev.effect.Effect;
import korolev.effect.Effect$;
import korolev.effect.Queue;
import korolev.effect.Queue$;
import korolev.effect.Reporter;
import korolev.effect.Stream;
import korolev.effect.Stream$;
import korolev.effect.syntax$;
import korolev.internal.ApplicationInstance;
import korolev.internal.Frontend$;
import korolev.web.Request;
import korolev.web.Response;
import korolev.web.Response$;
import korolev.web.Response$Status$;
import scala.MatchError;
import scala.None$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Some$;
import scala.Tuple2;
import scala.collection.concurrent.TrieMap;
import scala.collection.concurrent.TrieMap$;
import scala.collection.immutable.Seq;
import scala.package$;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;

/* compiled from: MessagingService.scala */
/* loaded from: input_file:korolev/server/internal/services/MessagingService.class */
public final class MessagingService<F> {
    private final SessionsService<F, ?, ?> sessionsService;
    private final Effect<F> evidence$1;
    private final TrieMap<Qsid, Queue<F, String>> longPollingTopics = TrieMap$.MODULE$.empty();
    private final Seq<Tuple2<String, String>> commonResponseHeaders = package$.MODULE$.Seq().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((String) Predef$.MODULE$.ArrowAssoc("cache-control"), "no-cache"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((String) Predef$.MODULE$.ArrowAssoc("content-type"), "application/json")}));
    private final Response<Stream<F, Bytes>> commonOkResponse;
    private final Response<Stream<F, Bytes>> commonGoneResponse;

    public static <F> Object SomeReloadMessageF(Effect<F> effect) {
        return MessagingService$.MODULE$.SomeReloadMessageF(effect);
    }

    public MessagingService(Reporter reporter, CommonService<F> commonService, SessionsService<F, ?, ?> sessionsService, Effect<F> effect) {
        this.sessionsService = sessionsService;
        this.evidence$1 = effect;
        this.commonOkResponse = Response$.MODULE$.apply(Response$Status$.MODULE$.Ok(), Stream$.MODULE$.empty(effect), this.commonResponseHeaders, Some$.MODULE$.apply(BoxesRunTime.boxToLong(0L)));
        this.commonGoneResponse = Response$.MODULE$.apply(Response$Status$.MODULE$.Gone(), Stream$.MODULE$.empty(effect), this.commonResponseHeaders, Some$.MODULE$.apply(BoxesRunTime.boxToLong(0L)));
    }

    public F longPollingSubscribe(Qsid qsid, Request.Head head) {
        return (F) syntax$.MODULE$.EffectOps(this.sessionsService.createAppIfNeeded(qsid, head, createTopic(qsid)), this.evidence$1).flatMap(boxedUnit -> {
            return syntax$.MODULE$.EffectOps(this.sessionsService.getApp(qsid), this.evidence$1).flatMap(option -> {
                return syntax$.MODULE$.EffectOps(option.fold(this::longPollingSubscribe$$anonfun$1$$anonfun$1$$anonfun$1, applicationInstance -> {
                    return applicationInstance.frontend().outgoingMessages().pull();
                }), this.evidence$1).flatMap(option -> {
                    Object HttpResponse;
                    syntax$ syntax_ = syntax$.MODULE$;
                    if (None$.MODULE$.equals(option)) {
                        HttpResponse = Effect$.MODULE$.apply(this.evidence$1).pure(this.commonGoneResponse);
                    } else {
                        if (!(option instanceof Some)) {
                            throw new MatchError(option);
                        }
                        HttpResponse = korolev.server.internal.package$.MODULE$.HttpResponse(Response$Status$.MODULE$.Ok(), (String) ((Some) option).value(), this.commonResponseHeaders, this.evidence$1);
                    }
                    return syntax_.EffectOps(HttpResponse, this.evidence$1).map(response -> {
                        return response;
                    });
                });
            });
        });
    }

    public F longPollingPublish(Qsid qsid, Stream<F, Bytes> stream) {
        return (F) syntax$.MODULE$.EffectOps(takeTopic(qsid), this.evidence$1).flatMap(queue -> {
            return syntax$.MODULE$.EffectOps(syntax$.MODULE$.EffectOps(stream.fold(Bytes$.MODULE$.empty(), (bytes, bytes2) -> {
                return bytes.$plus$plus(bytes2);
            }), this.evidence$1).map(bytes3 -> {
                return bytes3.asUtf8String();
            }), this.evidence$1).flatMap(str -> {
                return syntax$.MODULE$.EffectOps(queue.enqueue(str), this.evidence$1).map(boxedUnit -> {
                    return this.commonOkResponse;
                });
            });
        });
    }

    public F webSocketMessaging(Qsid qsid, Request.Head head, Stream<F, String> stream) {
        return (F) syntax$.MODULE$.EffectOps(this.sessionsService.createAppIfNeeded(qsid, head, stream), this.evidence$1).flatMap(boxedUnit -> {
            return syntax$.MODULE$.EffectOps(this.sessionsService.getApp(qsid), this.evidence$1).flatMap(option -> {
                if (option instanceof Some) {
                    return Effect$.MODULE$.apply(this.evidence$1).pure(Response$.MODULE$.apply(Response$Status$.MODULE$.Ok(), ((ApplicationInstance) ((Some) option).value()).frontend().outgoingMessages(), package$.MODULE$.Nil(), None$.MODULE$));
                }
                if (None$.MODULE$.equals(option)) {
                    return syntax$.MODULE$.EffectOps(Stream$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{Frontend$.MODULE$.ReloadMessage()})).mat(this.evidence$1), this.evidence$1).map(stream2 -> {
                        return Response$.MODULE$.apply(Response$Status$.MODULE$.Ok(), stream2, package$.MODULE$.Nil(), None$.MODULE$);
                    });
                }
                throw new MatchError(option);
            });
        });
    }

    private F takeTopic(Qsid qsid) {
        return (F) Effect$.MODULE$.apply(this.evidence$1).delay(() -> {
            return r1.takeTopic$$anonfun$1(r2);
        });
    }

    private Stream<F, String> createTopic(Qsid qsid) {
        Queue apply = Queue$.MODULE$.apply(Queue$.MODULE$.apply$default$1(), this.evidence$1);
        this.longPollingTopics.putIfAbsent(qsid, apply);
        return apply.stream();
    }

    private final Object longPollingSubscribe$$anonfun$1$$anonfun$1$$anonfun$1() {
        return MessagingService$.MODULE$.SomeReloadMessageF(this.evidence$1);
    }

    private final Queue takeTopic$$anonfun$1(Qsid qsid) {
        if (this.longPollingTopics.contains(qsid)) {
            return (Queue) this.longPollingTopics.apply(qsid);
        }
        throw new Exception(new StringBuilder(40).append("There is no long-polling topic matching ").append(qsid).toString());
    }
}
