/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.master.api.akka.route.handlers;

import akka.actor.ActorRef;
import akka.pattern.PatternsCS;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.netflix.spectator.api.Tag;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.master.api.akka.route.handlers.JobDiscoveryRouteHandler;
import io.mantisrx.master.api.akka.route.proto.JobClusterInfo;
import io.mantisrx.master.api.akka.route.proto.JobDiscoveryRouteProto;
import io.mantisrx.master.api.akka.route.utils.JobDiscoveryHeartbeats;
import io.mantisrx.master.jobcluster.proto.BaseResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto;
import io.mantisrx.server.core.JobSchedulingInfo;
import io.mantisrx.server.master.config.ConfigurationProvider;
import io.mantisrx.server.master.domain.JobId;
import java.time.Duration;
import java.util.HashMap;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.subjects.BehaviorSubject;

public class JobDiscoveryRouteHandlerAkkaImpl
implements JobDiscoveryRouteHandler {
    private static final Logger logger = LoggerFactory.getLogger(JobDiscoveryRouteHandlerAkkaImpl.class);
    private final ActorRef jobClustersManagerActor;
    private final Duration askTimeout;
    private final Duration serverIdleConnectionTimeout;
    private final Counter schedInfoStreamErrors;
    private final Counter lastSubmittedJobIdStreamErrors;
    private final AsyncLoadingCache<JobClusterManagerProto.GetJobSchedInfoRequest, JobClusterManagerProto.GetJobSchedInfoResponse> schedInfoCache;
    private final AsyncLoadingCache<JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest, JobClusterManagerProto.GetLastSubmittedJobIdStreamResponse> lastSubmittedJobIdStreamRespCache;

    public JobDiscoveryRouteHandlerAkkaImpl(ActorRef jobClustersManagerActor, Duration serverIdleTimeout) {
        this.jobClustersManagerActor = jobClustersManagerActor;
        long timeoutMs = Optional.ofNullable(ConfigurationProvider.getConfig().getMasterApiAskTimeoutMs()).orElse(1000L);
        this.askTimeout = Duration.ofMillis(timeoutMs);
        this.serverIdleConnectionTimeout = serverIdleTimeout;
        this.schedInfoCache = Caffeine.newBuilder().expireAfterWrite(5L, TimeUnit.SECONDS).maximumSize(500L).buildAsync(this::jobSchedInfo);
        this.lastSubmittedJobIdStreamRespCache = Caffeine.newBuilder().expireAfterWrite(5L, TimeUnit.SECONDS).maximumSize(500L).buildAsync(this::lastSubmittedJobId);
        Metrics m = new Metrics.Builder().id("JobDiscoveryRouteHandlerAkkaImpl", new Tag[0]).addCounter("schedInfoStreamErrors").addCounter("lastSubmittedJobIdStreamErrors").build();
        this.schedInfoStreamErrors = m.getCounter("schedInfoStreamErrors");
        this.lastSubmittedJobIdStreamErrors = m.getCounter("lastSubmittedJobIdStreamErrors");
    }

    private CompletableFuture<JobClusterManagerProto.GetJobSchedInfoResponse> jobSchedInfo(JobClusterManagerProto.GetJobSchedInfoRequest request, Executor executor) {
        return PatternsCS.ask((ActorRef)this.jobClustersManagerActor, (Object)request, (Duration)this.askTimeout).thenApply(JobClusterManagerProto.GetJobSchedInfoResponse.class::cast).toCompletableFuture();
    }

    @Override
    public CompletionStage<JobDiscoveryRouteProto.SchedInfoResponse> schedulingInfoStream(JobClusterManagerProto.GetJobSchedInfoRequest request, boolean sendHeartbeats) {
        CompletableFuture response = this.schedInfoCache.get((Object)request);
        try {
            AtomicBoolean isJobCompleted = new AtomicBoolean(false);
            String jobId = request.getJobId().getId();
            JobSchedulingInfo completedJobSchedulingInfo = new JobSchedulingInfo(jobId, new HashMap());
            CompletionStage<JobDiscoveryRouteProto.SchedInfoResponse> jobSchedInfoObsCS = response.thenApply(getJobSchedInfoResp -> {
                Optional<BehaviorSubject<JobSchedulingInfo>> jobStatusSubjectO = getJobSchedInfoResp.getJobSchedInfoSubject();
                if (getJobSchedInfoResp.responseCode.equals((Object)BaseResponse.ResponseCode.SUCCESS) && jobStatusSubjectO.isPresent()) {
                    BehaviorSubject<JobSchedulingInfo> jobSchedulingInfoObs = jobStatusSubjectO.get();
                    Observable heartbeats = Observable.interval((long)5L, (long)(this.serverIdleConnectionTimeout.getSeconds() - 1L), (TimeUnit)TimeUnit.SECONDS).map(x -> {
                        if (!isJobCompleted.get()) {
                            return JobDiscoveryHeartbeats.SCHED_INFO_HB_INSTANCE;
                        }
                        return completedJobSchedulingInfo;
                    }).takeWhile(x -> sendHeartbeats);
                    Observable jobSchedulingInfoWithHBObs = Observable.merge((Observable)jobSchedulingInfoObs.doOnCompleted(() -> isJobCompleted.set(true)), (Observable)heartbeats);
                    return new JobDiscoveryRouteProto.SchedInfoResponse(getJobSchedInfoResp.requestId, getJobSchedInfoResp.responseCode, getJobSchedInfoResp.message, (Observable<JobSchedulingInfo>)jobSchedulingInfoWithHBObs);
                }
                logger.info("Failed to get Sched info stream for {}", (Object)request.getJobId().getId());
                this.schedInfoStreamErrors.increment();
                return new JobDiscoveryRouteProto.SchedInfoResponse(getJobSchedInfoResp.requestId, getJobSchedInfoResp.responseCode, getJobSchedInfoResp.message);
            });
            return jobSchedInfoObsCS;
        }
        catch (Exception e) {
            logger.error("caught exception fetching sched info stream for {}", (Object)request.getJobId().getId(), (Object)e);
            this.schedInfoStreamErrors.increment();
            return CompletableFuture.completedFuture(new JobDiscoveryRouteProto.SchedInfoResponse(0L, BaseResponse.ResponseCode.SERVER_ERROR, "Failed to get SchedulingInfo stream for jobId " + request.getJobId().getId() + " error: " + e.getMessage()));
        }
    }

    private CompletableFuture<JobClusterManagerProto.GetLastSubmittedJobIdStreamResponse> lastSubmittedJobId(JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest request, Executor executor) {
        return PatternsCS.ask((ActorRef)this.jobClustersManagerActor, (Object)request, (Duration)this.askTimeout).thenApply(JobClusterManagerProto.GetLastSubmittedJobIdStreamResponse.class::cast).toCompletableFuture();
    }

    @Override
    public CompletionStage<JobDiscoveryRouteProto.JobClusterInfoResponse> lastSubmittedJobIdStream(JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest request, boolean sendHeartbeats) {
        CompletableFuture response = this.lastSubmittedJobIdStreamRespCache.get((Object)request);
        try {
            return response.thenApply(lastSubmittedJobIdResp -> {
                Optional<BehaviorSubject<JobId>> jobIdSubjectO = lastSubmittedJobIdResp.getjobIdBehaviorSubject();
                if (lastSubmittedJobIdResp.responseCode.equals((Object)BaseResponse.ResponseCode.SUCCESS) && jobIdSubjectO.isPresent()) {
                    Observable jobClusterInfoObs = jobIdSubjectO.get().map(jobId -> new JobClusterInfo(jobId.getCluster(), jobId.getId()));
                    Observable heartbeats = Observable.interval((long)5L, (long)(this.serverIdleConnectionTimeout.getSeconds() - 1L), (TimeUnit)TimeUnit.SECONDS).map(x -> JobDiscoveryHeartbeats.JOB_CLUSTER_INFO_HB_INSTANCE).takeWhile(x -> sendHeartbeats);
                    Observable jobClusterInfoWithHB = Observable.merge((Observable)jobClusterInfoObs, (Observable)heartbeats);
                    return new JobDiscoveryRouteProto.JobClusterInfoResponse(lastSubmittedJobIdResp.requestId, lastSubmittedJobIdResp.responseCode, lastSubmittedJobIdResp.message, (Observable<JobClusterInfo>)jobClusterInfoWithHB);
                }
                logger.info("Failed to get lastSubmittedJobId stream for job cluster {}", (Object)request.getClusterName());
                this.lastSubmittedJobIdStreamErrors.increment();
                return new JobDiscoveryRouteProto.JobClusterInfoResponse(lastSubmittedJobIdResp.requestId, lastSubmittedJobIdResp.responseCode, lastSubmittedJobIdResp.message);
            });
        }
        catch (Exception e) {
            logger.error("caught exception fetching lastSubmittedJobId stream for {}", (Object)request.getClusterName(), (Object)e);
            this.lastSubmittedJobIdStreamErrors.increment();
            return CompletableFuture.completedFuture(new JobDiscoveryRouteProto.JobClusterInfoResponse(0L, BaseResponse.ResponseCode.SERVER_ERROR, "Failed to get last submitted jobId stream for " + request.getClusterName() + " error: " + e.getMessage()));
        }
    }
}

