package io.atomix.core.queue.impl;

import com.google.common.util.concurrent.MoreExecutors;
import io.atomix.core.queue.AsyncWorkQueue;
import io.atomix.core.utils.EventLog;
import io.atomix.core.utils.EventManager;
import io.atomix.primitive.resource.PrimitiveResource;
import java.util.function.Consumer;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/atomix/core/queue/impl/WorkQueueResource.class */
public class WorkQueueResource implements PrimitiveResource {
    private static final Logger LOGGER = LoggerFactory.getLogger(WorkQueueResource.class);
    private final AsyncWorkQueue<String> workQueue;

    public WorkQueueResource(AsyncWorkQueue<String> asyncWorkQueue) {
        this.workQueue = asyncWorkQueue;
    }

    @POST
    @Consumes({"text/plain"})
    public void add(String str, @Suspended AsyncResponse asyncResponse) {
        this.workQueue.addOne(str).whenComplete((r5, th) -> {
            if (th == null) {
                asyncResponse.resume(Response.ok().build());
            } else {
                LOGGER.warn("{}", th);
                asyncResponse.resume(Response.serverError().build());
            }
        });
    }

    @GET
    @Produces({"application/json"})
    public void take(@Context EventManager eventManager, @Suspended AsyncResponse asyncResponse) {
        EventLog<Consumer<String>, String> orCreateEventLog = eventManager.getOrCreateEventLog(AsyncWorkQueue.class, this.workQueue.name(), eventLog -> {
            return str -> {
                eventLog.addEvent(str);
            };
        });
        if (orCreateEventLog.open()) {
            this.workQueue.registerTaskProcessor(orCreateEventLog.listener(), 1, MoreExecutors.directExecutor()).whenComplete((r7, th) -> {
                if (th == null) {
                    takeTask(orCreateEventLog, asyncResponse);
                } else {
                    LOGGER.warn("{}", th);
                    asyncResponse.resume(Response.serverError().build());
                }
            });
        } else {
            takeTask(orCreateEventLog, asyncResponse);
        }
    }

    private void takeTask(EventLog<Consumer<String>, String> eventLog, AsyncResponse asyncResponse) {
        eventLog.nextEvent().whenComplete((str, th) -> {
            if (th == null) {
                asyncResponse.resume(Response.ok(str).build());
            } else {
                asyncResponse.resume(Response.status(Response.Status.NOT_FOUND).build());
            }
        });
    }
}
