package com.geneea.celery;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.geneea.celery.backends.rabbit.RabbitBackend;
import com.geneea.celery.spi.Backend;
import com.google.common.base.Splitter;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.MoreCollectors;
import com.google.common.collect.Streams;
import com.google.common.primitives.Primitives;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.text.MessageFormat;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;

/* loaded from: input_file:com/geneea/celery/CeleryWorker.class */
public class CeleryWorker extends DefaultConsumer {
    private final ObjectMapper jsonMapper;
    private final Lock taskRunning;
    private final Backend backend;
    private static final Logger LOG = Logger.getLogger(CeleryWorker.class.getName());

    /* loaded from: input_file:com/geneea/celery/CeleryWorker$Args.class */
    private static class Args {

        @Parameter(names = {"--queue"}, description = "Celery queue to watch")
        private String queue;

        @Parameter(names = {"--concurrency"}, description = "Number of concurrent tasks to process")
        private int numWorkers;

        @Parameter(names = {"--broker"}, description = "Broker URL, e. g. amqp://localhost//")
        private String broker;

        private Args() {
            this.queue = "celery";
            this.numWorkers = 2;
            this.broker = "amqp://localhost/%2F";
        }
    }

    public CeleryWorker(Channel channel, Backend backend) {
        super(channel);
        this.taskRunning = new ReentrantLock();
        this.backend = backend;
        this.jsonMapper = new ObjectMapper();
    }

    public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
        String obj = basicProperties.getHeaders().get("id").toString();
        this.taskRunning.lock();
        try {
            try {
                try {
                    try {
                        try {
                            Stopwatch createStarted = Stopwatch.createStarted();
                            JsonNode readTree = this.jsonMapper.readTree(new String(bArr, basicProperties.getContentEncoding()));
                            String obj2 = basicProperties.getHeaders().get("task").toString();
                            Object processTask = processTask(obj2, (ArrayNode) readTree.get(0), (ObjectNode) readTree.get(1));
                            LOG.info(String.format("CeleryTask %s[%s] succeeded in %s. Result was: %s", obj2, obj, createStarted, processTask));
                            this.backend.reportResult(obj, basicProperties.getReplyTo(), basicProperties.getCorrelationId(), processTask);
                            getChannel().basicAck(envelope.getDeliveryTag(), false);
                            this.taskRunning.unlock();
                        } catch (RuntimeException e) {
                            LOG.log(Level.SEVERE, String.format("CeleryTask %s - %s", obj, e), (Throwable) e);
                            this.backend.reportException(obj, basicProperties.getReplyTo(), basicProperties.getCorrelationId(), e.getCause() != null ? e.getCause() : e);
                            getChannel().basicNack(envelope.getDeliveryTag(), false, false);
                            this.taskRunning.unlock();
                        }
                    } catch (InvocationTargetException e2) {
                        LOG.log(Level.WARNING, String.format("CeleryTask %s error", obj), e2.getCause());
                        this.backend.reportException(obj, basicProperties.getReplyTo(), basicProperties.getCorrelationId(), e2.getCause());
                        getChannel().basicAck(envelope.getDeliveryTag(), false);
                        this.taskRunning.unlock();
                    }
                } catch (JsonProcessingException e3) {
                    LOG.log(Level.SEVERE, String.format("CeleryTask %s - %s", obj, e3), e3.getCause());
                    this.backend.reportException(obj, basicProperties.getReplyTo(), basicProperties.getCorrelationId(), e3);
                    getChannel().basicNack(envelope.getDeliveryTag(), false, false);
                    this.taskRunning.unlock();
                }
            } catch (DispatchException e4) {
                LOG.log(Level.SEVERE, String.format("CeleryTask %s dispatch error", obj), e4.getCause());
                this.backend.reportException(obj, basicProperties.getReplyTo(), basicProperties.getCorrelationId(), e4);
                getChannel().basicAck(envelope.getDeliveryTag(), false);
                this.taskRunning.unlock();
            }
        } catch (Throwable th) {
            this.taskRunning.unlock();
            throw th;
        }
    }

    private Object processTask(String str, ArrayNode arrayNode, ObjectNode objectNode) throws DispatchException, InvocationTargetException {
        ImmutableList copyOf = ImmutableList.copyOf(Splitter.on("#").split(str).iterator());
        if (copyOf.size() != 2) {
            throw new DispatchException(MessageFormat.format("This worker can only process tasks with name in form package.ClassName#method, got {}", str));
        }
        Object task = TaskRegistry.getTask((String) copyOf.get(0));
        if (task == null) {
            throw new DispatchException(String.format("CeleryTask %s not registered.", str));
        }
        Method method = (Method) Arrays.stream(task.getClass().getDeclaredMethods()).filter(method2 -> {
            return method2.getName().equals(copyOf.get(1));
        }).collect(MoreCollectors.onlyElement());
        try {
            return method.invoke(task, ((List) Streams.mapWithIndex(Arrays.stream(method.getParameterTypes()), (cls, j) -> {
                return this.jsonMapper.convertValue(arrayNode.get((int) j), cls);
            }).collect(Collectors.toList())).toArray());
        } catch (IllegalAccessException e) {
            throw new DispatchException(String.format("Error calling %s", method), e);
        } catch (IllegalArgumentException e2) {
            throw new AssertionError(String.format("Error calling %s", method), e2);
        }
    }

    public void close() throws IOException {
        getChannel().abort();
        this.backend.close();
    }

    public void join() {
        this.taskRunning.lock();
        this.taskRunning.unlock();
    }

    private static Optional<Method> findRunMethod(Class<?> cls, List<Class<?>> list) {
        return Arrays.stream(cls.getDeclaredMethods()).filter(method -> {
            return method.getName().equals("run");
        }).filter(method2 -> {
            Class<?>[] parameterTypes = method2.getParameterTypes();
            if (parameterTypes.length != list.size()) {
                return false;
            }
            for (int i = 0; i < list.size(); i++) {
                if (!Primitives.wrap(parameterTypes[i]).isAssignableFrom((Class) list.get(i))) {
                    return false;
                }
            }
            return true;
        }).findAny();
    }

    public static CeleryWorker create(String str, Connection connection) throws IOException {
        Channel createChannel = connection.createChannel();
        createChannel.basicQos(2);
        createChannel.queueDeclare(str, true, false, false, (Map) null);
        CeleryWorker celeryWorker = new CeleryWorker(createChannel, new RabbitBackend(createChannel));
        createChannel.basicConsume(str, false, "", true, false, (Map) null, celeryWorker);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                celeryWorker.close();
                celeryWorker.join();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }));
        return celeryWorker;
    }

    public static void main(String[] strArr) throws Exception {
        Args args = new Args();
        JCommander.newBuilder().addObject(args).build().parse(strArr);
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri(args.broker);
        Connection newConnection = connectionFactory.newConnection(Executors.newCachedThreadPool());
        for (int i = 0; i < args.numWorkers; i++) {
            create(args.queue, newConnection);
        }
        System.out.println(String.format("Started consuming tasks from queue %s.", args.queue));
        System.out.println("Known tasks:");
        for (String str : TaskRegistry.getRegisteredTaskNames()) {
            System.out.print("  - ");
            System.out.println(str);
        }
    }
}
