package io.atomix.messaging.state;

import io.atomix.copycat.server.Commit;
import io.atomix.copycat.server.session.ServerSession;
import io.atomix.copycat.server.session.SessionListener;
import io.atomix.copycat.session.Session;
import io.atomix.messaging.DistributedTaskQueue;
import io.atomix.messaging.state.TaskQueueCommands;
import io.atomix.resource.ResourceStateMachine;
import io.atomix.resource.ResourceType;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingDeque;

/* loaded from: input_file:io/atomix/messaging/state/TaskQueueState.class */
public class TaskQueueState extends ResourceStateMachine implements SessionListener {
    private final Map<Long, Commit<TaskQueueCommands.Subscribe>> workers;
    private final Queue<ServerSession> workerQueue;
    private final LinkedBlockingDeque<Commit<TaskQueueCommands.Submit>> taskQueue;
    private final Map<Long, Commit<TaskQueueCommands.Submit>> processing;

    public TaskQueueState() {
        super(new ResourceType(DistributedTaskQueue.class));
        this.workers = new HashMap();
        this.workerQueue = new ArrayDeque();
        this.taskQueue = new LinkedBlockingDeque<>();
        this.processing = new HashMap();
    }

    public void close(ServerSession serverSession) {
        Commit<TaskQueueCommands.Subscribe> remove = this.workers.remove(Long.valueOf(serverSession.id()));
        if (remove != null) {
            remove.close();
        }
        this.workerQueue.remove(serverSession);
        Commit<TaskQueueCommands.Submit> remove2 = this.processing.remove(Long.valueOf(serverSession.id()));
        if (remove2 != null) {
            ServerSession poll = this.workerQueue.poll();
            if (poll != null) {
                poll.publish("process", remove2.operation().task());
            } else {
                this.taskQueue.addFirst(remove2);
            }
        }
    }

    public void subscribe(Commit<TaskQueueCommands.Subscribe> commit) {
        this.workers.put(Long.valueOf(commit.session().id()), commit);
        Commit<TaskQueueCommands.Submit> poll = this.taskQueue.poll();
        if (poll == null) {
            this.workerQueue.add(commit.session());
        } else {
            this.processing.put(Long.valueOf(commit.session().id()), poll);
            commit.session().publish("process", poll.operation().task());
        }
    }

    public void unsubscribe(Commit<TaskQueueCommands.Unsubscribe> commit) {
        close(commit.session());
    }

    public void submit(Commit<TaskQueueCommands.Submit> commit) {
        try {
            ServerSession poll = this.workerQueue.poll();
            if (poll != null) {
                poll.publish("process", commit.operation().task());
                this.processing.put(Long.valueOf(poll.id()), commit);
            } else {
                this.taskQueue.add(commit);
            }
        } catch (Exception e) {
            commit.close();
            throw e;
        }
    }

    public Object ack(Commit<TaskQueueCommands.Ack> commit) {
        try {
            Commit<TaskQueueCommands.Submit> remove = this.processing.remove(Long.valueOf(commit.session().id()));
            if (remove == null) {
                throw new IllegalStateException("unknown task");
            }
            if (remove.operation().ack() && remove.session().state() == Session.State.OPEN) {
                remove.session().publish("ack", Long.valueOf(remove.operation().id()));
            }
            remove.close();
            Commit<TaskQueueCommands.Submit> poll = this.taskQueue.poll();
            if (poll == null) {
                this.workerQueue.add(commit.session());
                commit.close();
                return null;
            }
            this.processing.put(Long.valueOf(commit.session().id()), poll);
            Object task = poll.operation().task();
            commit.close();
            return task;
        } catch (Throwable th) {
            commit.close();
            throw th;
        }
    }
}
