package org.swisspush.gateleen.queue.queuing;

import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.Message;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.gateleen.core.http.HttpRequest;
import org.swisspush.gateleen.core.http.RequestLoggerFactory;
import org.swisspush.gateleen.core.util.Address;
import org.swisspush.gateleen.core.util.StatusCode;
import org.swisspush.gateleen.monitoring.MonitoringHandler;
import org.swisspush.gateleen.queue.expiry.ExpiryCheckHandler;

/* loaded from: input_file:org/swisspush/gateleen/queue/queuing/QueueProcessor.class */
public class QueueProcessor {
    private HttpClient httpClient;

    public QueueProcessor(final Vertx vertx, final HttpClient httpClient, final MonitoringHandler monitoringHandler) {
        this.httpClient = httpClient;
        vertx.eventBus().localConsumer(Address.queueProcessorAddress(), new Handler<Message<JsonObject>>() { // from class: org.swisspush.gateleen.queue.queuing.QueueProcessor.1
            public void handle(final Message<JsonObject> message) {
                HttpRequest httpRequest = null;
                try {
                    httpRequest = new HttpRequest(new JsonObject(((JsonObject) message.body()).getString("payload")));
                } catch (Exception e) {
                    LoggerFactory.getLogger(QueueProcessor.class).error("QueueProcessor QUEUE_ERROR: Could not build request: " + ((JsonObject) message.body()).toString());
                }
                final HttpRequest httpRequest2 = httpRequest;
                final Logger logger = RequestLoggerFactory.getLogger(QueueProcessor.class, httpRequest2.getHeaders());
                if (logger.isTraceEnabled()) {
                    logger.trace("QueueProcessor process message: " + message);
                }
                vertx.eventBus().send(Address.redisquesAddress(), RedisquesAPI.buildGetLockOperation(((JsonObject) message.body()).getString("queue")), new Handler<AsyncResult<Message<JsonObject>>>() { // from class: org.swisspush.gateleen.queue.queuing.QueueProcessor.1.1
                    public void handle(AsyncResult<Message<JsonObject>> asyncResult) {
                        if (!RedisquesAPI.NO_SUCH_LOCK.equals(((JsonObject) ((Message) asyncResult.result()).body()).getString(RedisquesAPI.STATUS))) {
                            logger.warn("Queue {} is locked!", ((JsonObject) message.body()).getString("queue"));
                            message.reply(new JsonObject().put(RedisquesAPI.STATUS, RedisquesAPI.ERROR).put(RedisquesAPI.MESSAGE, "queue locked"));
                            return;
                        }
                        logger.debug("QueueProcessor Performing request " + httpRequest2.getMethod() + " " + httpRequest2.getUri());
                        if (ExpiryCheckHandler.isExpired(httpRequest2)) {
                            logger.debug("QueueProcessor request expired to " + httpRequest2.getUri());
                            message.reply(new JsonObject().put(RedisquesAPI.STATUS, RedisquesAPI.OK));
                            return;
                        }
                        HttpClient httpClient2 = httpClient;
                        HttpMethod method = httpRequest2.getMethod();
                        String uri = httpRequest2.getUri();
                        Logger logger2 = logger;
                        HttpRequest httpRequest3 = httpRequest2;
                        Message message2 = message;
                        MonitoringHandler monitoringHandler2 = monitoringHandler;
                        HttpClientRequest request = httpClient2.request(method, uri, httpClientResponse -> {
                            if (logger2.isTraceEnabled()) {
                                logger2.trace("QueueProcessor response: " + httpClientResponse.statusCode());
                            }
                            if ((httpClientResponse.statusCode() < 200 || httpClientResponse.statusCode() >= 300) && httpClientResponse.statusCode() != 409) {
                                logger2.error("QueueProcessor QUEUE_ERROR: Failed request to " + httpRequest3.getUri() + ": " + httpClientResponse.statusCode() + " " + httpClientResponse.statusMessage());
                                message2.reply(new JsonObject().put(RedisquesAPI.STATUS, RedisquesAPI.ERROR).put(RedisquesAPI.MESSAGE, httpClientResponse.statusCode() + " " + httpClientResponse.statusMessage()));
                            } else {
                                if (httpClientResponse.statusCode() != StatusCode.CONFLICT.getStatusCode()) {
                                    logger2.debug("Successful request to " + httpRequest3.getUri());
                                } else {
                                    logger2.warn("Ignoring request conflict to " + httpRequest3.getUri() + ": " + httpClientResponse.statusCode() + " " + httpClientResponse.statusMessage());
                                }
                                message2.reply(new JsonObject().put(RedisquesAPI.STATUS, RedisquesAPI.OK));
                                monitoringHandler2.updateDequeue();
                            }
                            httpClientResponse.bodyHandler(buffer -> {
                                logger2.debug("Discarding backend body");
                            });
                            httpClientResponse.endHandler(r4 -> {
                                logger2.debug("Backend response end");
                            });
                            httpClientResponse.exceptionHandler(th -> {
                                logger2.warn("QueueProcessor QUEUE_ERROR: Exception on response from " + httpRequest3.getUri() + ": " + th.getMessage());
                                message2.reply(new JsonObject().put(RedisquesAPI.STATUS, RedisquesAPI.ERROR).put(RedisquesAPI.MESSAGE, th.getMessage()));
                            });
                        });
                        if (httpRequest2.getHeaders() != null && !httpRequest2.getHeaders().isEmpty()) {
                            request.headers().setAll(httpRequest2.getHeaders());
                        }
                        Logger logger3 = logger;
                        HttpRequest httpRequest4 = httpRequest2;
                        Message message3 = message;
                        request.exceptionHandler(th -> {
                            logger3.warn("QueueProcessor QUEUE_ERROR: Failed request to " + httpRequest4.getUri() + ": " + th.getMessage());
                            message3.reply(new JsonObject().put(RedisquesAPI.STATUS, RedisquesAPI.ERROR).put(RedisquesAPI.MESSAGE, th.getMessage()));
                        });
                        request.setTimeout(120000L);
                        if (httpRequest2.getPayload() != null) {
                            request.end(Buffer.buffer(httpRequest2.getPayload()));
                        } else {
                            request.end();
                        }
                    }
                });
            }
        });
    }

    public HttpClient getHttpClient() {
        return this.httpClient;
    }
}
