/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.master.api.akka.route.v1;

import akka.http.javadsl.marshalling.sse.EventStreamMarshalling;
import akka.http.javadsl.model.StatusCodes;
import akka.http.javadsl.server.PathMatcher0;
import akka.http.javadsl.server.PathMatchers;
import akka.http.javadsl.server.Route;
import akka.http.javadsl.unmarshalling.StringUnmarshallers;
import akka.japi.function.Function;
import akka.stream.javadsl.Source;
import io.mantisrx.master.api.akka.route.handlers.JobDiscoveryRouteHandler;
import io.mantisrx.master.api.akka.route.proto.JobClusterInfo;
import io.mantisrx.master.api.akka.route.proto.JobDiscoveryRouteProto;
import io.mantisrx.master.api.akka.route.utils.StreamingUtils;
import io.mantisrx.master.api.akka.route.v1.BaseRoute;
import io.mantisrx.master.api.akka.route.v1.HttpRequestMetrics;
import io.mantisrx.master.api.akka.route.v1.ParamName;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto;
import java.io.Serializable;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.RxReactiveStreams;

public class LastSubmittedJobIdStreamRoute
extends BaseRoute {
    private static final Logger logger = LoggerFactory.getLogger(LastSubmittedJobIdStreamRoute.class);
    private final JobDiscoveryRouteHandler jobDiscoveryRouteHandler;
    private static final PathMatcher0 JOBDISCOVERY_API_PREFIX = PathMatchers.segment((String)"api").slash("v1");

    public LastSubmittedJobIdStreamRoute(JobDiscoveryRouteHandler jobDiscoveryRouteHandler) {
        this.jobDiscoveryRouteHandler = jobDiscoveryRouteHandler;
    }

    @Override
    protected Route constructRoutes() {
        return this.pathPrefix(JOBDISCOVERY_API_PREFIX, () -> this.concat(this.path(PathMatchers.segment((String)"lastSubmittedJobIdStream").slash(PathMatchers.segment()), clusterName -> this.pathEndOrSingleSlash(() -> this.get(() -> this.getLastSubmittedJobIdStreamRoute((String)clusterName)))), new Route[0]));
    }

    @Override
    public Route createRoute(java.util.function.Function<Route, Route> routeFilter) {
        logger.info("creating /api/v1/jobDiscoveryStream routes");
        return super.createRoute(routeFilter);
    }

    private Route getLastSubmittedJobIdStreamRoute(String clusterName) {
        return this.parameterOptional(StringUnmarshallers.BOOLEAN, ParamName.SEND_HEARTBEAT, sendHeartbeats -> {
            logger.info("GET /api/v1/lastSubmittedJobIdStream/{} called", (Object)clusterName);
            CompletionStage<JobDiscoveryRouteProto.JobClusterInfoResponse> jobClusterInfoRespCS = this.jobDiscoveryRouteHandler.lastSubmittedJobIdStream(new JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest(clusterName), sendHeartbeats.orElse(false));
            return this.completeAsync(jobClusterInfoRespCS, resp -> {
                Optional<Observable<JobClusterInfo>> jobClusterInfoO = resp.getJobClusterInfoObs();
                if (jobClusterInfoO.isPresent()) {
                    Observable<JobClusterInfo> jciStream = jobClusterInfoO.get();
                    Source source = Source.fromPublisher((Publisher)RxReactiveStreams.toPublisher(jciStream)).map((Function & Serializable)j -> StreamingUtils.from(j).orElse(null)).filter(Objects::nonNull);
                    return this.completeOK(source, EventStreamMarshalling.toEventStream());
                }
                logger.warn("Failed to get last submitted jobId stream for {}", (Object)clusterName);
                return this.complete(StatusCodes.INTERNAL_SERVER_ERROR, "Failed to get last submitted jobId stream for " + clusterName);
            }, "api.v1.lastSubmittedJobIdStream.instance", HttpRequestMetrics.HttpVerb.GET);
        });
    }
}

