/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.master.api.akka.route.v1;

import akka.http.javadsl.marshalling.sse.EventStreamMarshalling;
import akka.http.javadsl.model.StatusCodes;
import akka.http.javadsl.server.PathMatcher0;
import akka.http.javadsl.server.PathMatchers;
import akka.http.javadsl.server.Route;
import akka.http.javadsl.unmarshalling.StringUnmarshallers;
import akka.japi.function.Function;
import akka.stream.javadsl.Source;
import io.mantisrx.master.api.akka.route.handlers.JobDiscoveryRouteHandler;
import io.mantisrx.master.api.akka.route.proto.JobDiscoveryRouteProto;
import io.mantisrx.master.api.akka.route.utils.StreamingUtils;
import io.mantisrx.master.api.akka.route.v1.BaseRoute;
import io.mantisrx.master.api.akka.route.v1.HttpRequestMetrics;
import io.mantisrx.master.api.akka.route.v1.ParamName;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto;
import io.mantisrx.server.core.JobSchedulingInfo;
import io.mantisrx.server.master.domain.JobId;
import java.io.Serializable;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.RxReactiveStreams;

public class JobDiscoveryStreamRoute
extends BaseRoute {
    private static final Logger logger = LoggerFactory.getLogger(JobDiscoveryStreamRoute.class);
    private final JobDiscoveryRouteHandler jobDiscoveryRouteHandler;
    private static final PathMatcher0 JOBDISCOVERY_API_PREFIX = PathMatchers.segment((String)"api").slash("v1");

    public JobDiscoveryStreamRoute(JobDiscoveryRouteHandler jobDiscoveryRouteHandler) {
        this.jobDiscoveryRouteHandler = jobDiscoveryRouteHandler;
    }

    @Override
    protected Route constructRoutes() {
        return this.pathPrefix(JOBDISCOVERY_API_PREFIX, () -> this.concat(this.path(PathMatchers.segment((String)"jobDiscoveryStream").slash(PathMatchers.segment()), jobId -> this.pathEndOrSingleSlash(() -> this.get(() -> this.getJobDiscoveryStreamRoute((String)jobId)))), new Route[0]));
    }

    @Override
    public Route createRoute(java.util.function.Function<Route, Route> routeFilter) {
        logger.info("creating /api/v1/jobDiscoveryStream routes");
        return super.createRoute(routeFilter);
    }

    private Route getJobDiscoveryStreamRoute(String jobId) {
        return this.parameterOptional(StringUnmarshallers.BOOLEAN, ParamName.SEND_HEARTBEAT, sendHeartbeats -> {
            logger.info("GET /api/v1/jobStatusStream/{} called", (Object)jobId);
            CompletionStage<JobDiscoveryRouteProto.SchedInfoResponse> schedulingInfoRespCS = this.jobDiscoveryRouteHandler.schedulingInfoStream(new JobClusterManagerProto.GetJobSchedInfoRequest(JobId.fromId(jobId).get()), sendHeartbeats.orElse(false));
            return this.completeAsync(schedulingInfoRespCS, resp -> {
                Optional<Observable<JobSchedulingInfo>> siStream = resp.getSchedInfoStream();
                if (siStream.isPresent()) {
                    Observable<JobSchedulingInfo> schedulingInfoObs = siStream.get();
                    Source schedInfoSource = Source.fromPublisher((Publisher)RxReactiveStreams.toPublisher(schedulingInfoObs)).map((Function & Serializable)j -> StreamingUtils.from(j).orElse(null)).filter(Objects::nonNull);
                    return this.completeOK(schedInfoSource, EventStreamMarshalling.toEventStream());
                }
                logger.warn("Failed to get sched info stream for job {}", (Object)jobId);
                return this.complete(StatusCodes.INTERNAL_SERVER_ERROR, "Failed to get sched info stream for job " + jobId);
            }, "api.v1.jobStatusStream.instance", HttpRequestMetrics.HttpVerb.GET);
        });
    }
}

