/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.master.events;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.dispatch.BoundedMessageQueueSemantics;
import akka.dispatch.RequiresMessageQueue;
import io.mantisrx.master.api.akka.route.proto.JobStatus;
import io.mantisrx.master.events.LifecycleEventsProto;
import io.mantisrx.master.jobcluster.job.JobState;
import io.mantisrx.server.core.Status;
import io.mantisrx.shaded.com.google.common.collect.EvictingQueue;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StatusEventBrokerActor
extends AbstractActor
implements RequiresMessageQueue<BoundedMessageQueueSemantics> {
    private final Logger logger = LoggerFactory.getLogger(StatusEventBrokerActor.class);
    private final Map<String, Set<ActorRef>> jobIdToActorMap = new HashMap<String, Set<ActorRef>>();
    private final Map<ActorRef, String> actorToJobIdMap = new HashMap<ActorRef, String>();
    private final Map<String, EvictingQueue<Status>> jobIdToStatusEventsBuf = new HashMap<String, EvictingQueue<Status>>();
    public static final int MAX_STATUS_HISTORY_PER_JOB = 100;

    public static Props props() {
        return Props.create(StatusEventBrokerActor.class, (Object[])new Object[0]);
    }

    private void onJobStatusRequest(JobStatusRequest jsr) {
        this.logger.debug("got request {}", (Object)jsr);
        ActorRef sender = this.sender();
        this.jobIdToActorMap.computeIfAbsent(jsr.jobId, jobId -> new HashSet());
        this.jobIdToActorMap.get(jsr.jobId).add(sender);
        this.actorToJobIdMap.put(sender, jsr.jobId);
        this.getContext().watch(sender);
        EvictingQueue<Status> statusEventsBuf = this.jobIdToStatusEventsBuf.get(jsr.jobId);
        if (statusEventsBuf != null) {
            statusEventsBuf.forEach(se -> sender.tell((Object)new JobStatus((Status)se), ActorRef.noSender()));
        }
    }

    private void cleanupIfTerminalState(LifecycleEventsProto.StatusEvent se) {
        LifecycleEventsProto.JobStatusEvent jse;
        if (se instanceof LifecycleEventsProto.JobStatusEvent && JobState.isTerminalState((jse = (LifecycleEventsProto.JobStatusEvent)se).getJobState())) {
            this.jobIdToStatusEventsBuf.remove(jse.getJobId());
        }
    }

    private void onStatusEvent(LifecycleEventsProto.StatusEvent se) {
        Status status = LifecycleEventsProto.from(se);
        String jobId = status.getJobId();
        this.jobIdToStatusEventsBuf.computeIfAbsent(jobId, j -> EvictingQueue.create((int)100)).add((Object)status);
        this.cleanupIfTerminalState(se);
        Set<ActorRef> jobStatusActiveConnections = this.jobIdToActorMap.get(jobId);
        if (jobStatusActiveConnections != null && !jobStatusActiveConnections.isEmpty()) {
            this.logger.debug("Sending job status {}", (Object)se);
            jobStatusActiveConnections.forEach(connActor -> connActor.tell((Object)new JobStatus(status), this.self()));
        } else {
            this.logger.debug("Job status dropped, no active subscribers for {}", (Object)jobId);
        }
    }

    private void onTerminated(Terminated t) {
        this.logger.info("actor terminated {}", (Object)t);
        ActorRef terminatedActor = t.actor();
        String jobId = this.actorToJobIdMap.get(terminatedActor);
        if (jobId != null) {
            this.jobIdToActorMap.get(jobId).remove(terminatedActor);
        }
        this.actorToJobIdMap.remove(terminatedActor);
    }

    public AbstractActor.Receive createReceive() {
        return this.receiveBuilder().match(JobStatusRequest.class, jsr -> this.onJobStatusRequest((JobStatusRequest)jsr)).match(LifecycleEventsProto.StatusEvent.class, js -> this.onStatusEvent((LifecycleEventsProto.StatusEvent)js)).match(Terminated.class, t -> this.onTerminated((Terminated)t)).build();
    }

    public static class JobStatusRequest {
        private final String jobId;

        public JobStatusRequest(String jobId) {
            this.jobId = jobId;
        }

        public String getJobId() {
            return this.jobId;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            JobStatusRequest that = (JobStatusRequest)o;
            return Objects.equals(this.jobId, that.jobId);
        }

        public int hashCode() {
            return Objects.hash(this.jobId);
        }

        public String toString() {
            return "JobStatusRequest{jobId='" + this.jobId + '\'' + '}';
        }
    }
}

