/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.master.api.akka.route.v0;

import akka.http.javadsl.marshalling.sse.EventStreamMarshalling;
import akka.http.javadsl.model.HttpHeader;
import akka.http.javadsl.model.StatusCodes;
import akka.http.javadsl.server.ExceptionHandler;
import akka.http.javadsl.server.PathMatchers;
import akka.http.javadsl.server.Route;
import akka.http.javadsl.unmarshalling.StringUnmarshallers;
import akka.japi.function.Function;
import akka.japi.function.Predicate;
import akka.stream.javadsl.Source;
import com.netflix.spectator.api.Tag;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.master.api.akka.route.handlers.JobDiscoveryRouteHandler;
import io.mantisrx.master.api.akka.route.proto.JobClusterInfo;
import io.mantisrx.master.api.akka.route.proto.JobDiscoveryRouteProto;
import io.mantisrx.master.api.akka.route.utils.StreamingUtils;
import io.mantisrx.master.api.akka.route.v0.BaseRoute;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto;
import io.mantisrx.server.core.JobSchedulingInfo;
import io.mantisrx.server.master.domain.JobId;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.RxReactiveStreams;

public class JobDiscoveryRoute
extends BaseRoute {
    private static final Logger logger = LoggerFactory.getLogger(JobDiscoveryRoute.class);
    private final JobDiscoveryRouteHandler jobDiscoveryRouteHandler;
    private final Metrics metrics;
    private final Counter schedulingInfoStreamGET;
    private final Counter jobClusterInfoStreamGET;
    private static final HttpHeader ACCESS_CONTROL_ALLOW_ORIGIN_HEADER = HttpHeader.parse((String)"Access-Control-Allow-Origin", (String)"*");
    private static final Iterable<HttpHeader> DEFAULT_RESPONSE_HEADERS = Arrays.asList(ACCESS_CONTROL_ALLOW_ORIGIN_HEADER);

    public JobDiscoveryRoute(JobDiscoveryRouteHandler jobDiscoveryRouteHandler) {
        this.jobDiscoveryRouteHandler = jobDiscoveryRouteHandler;
        Metrics m = new Metrics.Builder().id("JobDiscoveryRoute", new Tag[0]).addCounter("schedulingInfoStreamGET").addCounter("jobClusterInfoStreamGET").build();
        this.metrics = MetricsRegistry.getInstance().registerAndGet(m);
        this.schedulingInfoStreamGET = this.metrics.getCounter("schedulingInfoStreamGET");
        this.jobClusterInfoStreamGET = this.metrics.getCounter("jobClusterInfoStreamGET");
    }

    private Route getJobDiscoveryRoutes() {
        return this.route(new Route[]{this.get(() -> this.route(new Route[]{this.path(PathMatchers.segment((String)"assignmentresults").slash(PathMatchers.segment()), jobId -> this.parameterOptional(StringUnmarshallers.BOOLEAN, "sendHB", sendHeartbeats -> {
            logger.debug("/assignmentresults/{} called", jobId);
            this.schedulingInfoStreamGET.increment();
            JobClusterManagerProto.GetJobSchedInfoRequest req = new JobClusterManagerProto.GetJobSchedInfoRequest(JobId.fromId(jobId).get());
            CompletionStage<JobDiscoveryRouteProto.SchedInfoResponse> schedulingInfoRespCS = this.jobDiscoveryRouteHandler.schedulingInfoStream(req, sendHeartbeats.orElse(false));
            return this.completeAsync(schedulingInfoRespCS, r -> {
                Optional<Observable<JobSchedulingInfo>> schedInfoStreamO = r.getSchedInfoStream();
                if (schedInfoStreamO.isPresent()) {
                    Observable<JobSchedulingInfo> schedulingInfoObs = schedInfoStreamO.get();
                    Source schedInfoSource = Source.fromPublisher((Publisher)RxReactiveStreams.toPublisher(schedulingInfoObs)).map((Function & Serializable)j -> StreamingUtils.from(j).orElse(null)).filter((Predicate & Serializable)sse -> sse != null);
                    return this.completeOK(schedInfoSource, EventStreamMarshalling.toEventStream());
                }
                logger.warn("Failed to get sched info stream for job {}", jobId);
                return this.complete(StatusCodes.INTERNAL_SERVER_ERROR, "Failed to get sched info stream for job " + jobId);
            });
        })), this.path(PathMatchers.segment((String)"namedjobs").slash(PathMatchers.segment()), jobCluster -> this.parameterOptional(StringUnmarshallers.BOOLEAN, "sendHB", sendHeartbeats -> {
            logger.debug("/namedjobs/{} called", jobCluster);
            this.jobClusterInfoStreamGET.increment();
            JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest req = new JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest((String)jobCluster);
            CompletionStage<JobDiscoveryRouteProto.JobClusterInfoResponse> jobClusterInfoRespCS = this.jobDiscoveryRouteHandler.lastSubmittedJobIdStream(req, sendHeartbeats.orElse(false));
            return this.completeAsync(jobClusterInfoRespCS, r -> {
                Optional<Observable<JobClusterInfo>> jobClusterInfoO = r.getJobClusterInfoObs();
                if (jobClusterInfoO.isPresent()) {
                    Observable<JobClusterInfo> jobClusterInfoObs = jobClusterInfoO.get();
                    Source source = Source.fromPublisher((Publisher)RxReactiveStreams.toPublisher(jobClusterInfoObs)).map((Function & Serializable)j -> StreamingUtils.from(j).orElse(null)).filter((Predicate & Serializable)sse -> sse != null);
                    return this.completeOK(source, EventStreamMarshalling.toEventStream());
                }
                logger.warn("Failed to get last submitted jobId stream for {}", jobCluster);
                return this.complete(StatusCodes.INTERNAL_SERVER_ERROR, "Failed to get last submitted jobId stream for " + jobCluster);
            });
        }))}))});
    }

    public Route createRoute(java.util.function.Function<Route, Route> routeFilter) {
        logger.info("creating routes");
        ExceptionHandler jsonExceptionHandler = ExceptionHandler.newBuilder().match(Exception.class, x -> {
            logger.error("got exception", (Throwable)x);
            return this.complete(StatusCodes.INTERNAL_SERVER_ERROR, "{\"error\": \"" + x.getMessage() + "\"}");
        }).build();
        return this.respondWithHeaders(DEFAULT_RESPONSE_HEADERS, () -> this.handleExceptions(jsonExceptionHandler, () -> (Route)routeFilter.apply(this.getJobDiscoveryRoutes())));
    }
}

