package io.knotx.server;

import io.knotx.dataobjects.ClientResponse;
import io.knotx.dataobjects.KnotContext;
import io.knotx.rxjava.proxy.KnotProxy;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.rxjava.core.Vertx;
import io.vertx.rxjava.core.buffer.Buffer;
import io.vertx.rxjava.core.http.HttpServerResponse;
import io.vertx.rxjava.ext.web.RoutingContext;

/* loaded from: input_file:io/knotx/server/KnotxAssemblerHandler.class */
public class KnotxAssemblerHandler implements Handler<RoutingContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) KnotxAssemblerHandler.class);
    private KnotProxy assembler;
    private KnotxServerConfiguration configuration;

    private KnotxAssemblerHandler(Vertx vertx, KnotxServerConfiguration knotxServerConfiguration) {
        this.configuration = knotxServerConfiguration;
        this.assembler = KnotProxy.createProxy(vertx, knotxServerConfiguration.assemblerAddress());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static KnotxAssemblerHandler create(Vertx vertx, KnotxServerConfiguration knotxServerConfiguration) {
        return new KnotxAssemblerHandler(vertx, knotxServerConfiguration);
    }

    @Override // io.vertx.core.Handler
    public void handle(RoutingContext routingContext) {
        KnotContext knotContext = (KnotContext) routingContext.get("knotContext");
        if (isOkClientResponse(knotContext.getClientResponse())) {
            this.assembler.rxProcess(knotContext).doOnSuccess(this::traceMessage).subscribe(knotContext2 -> {
                if (isOkClientResponse(knotContext2.getClientResponse())) {
                    sendResponse(routingContext, knotContext2.getClientResponse());
                } else {
                    routingContext.fail(knotContext2.getClientResponse().getStatusCode());
                }
            }, th -> {
                LOGGER.error("Error happened while communicating with {} engine", th, this.configuration.splitterAddress());
                routingContext.fail(th);
            });
        } else {
            sendResponse(routingContext, knotContext.getClientResponse());
        }
    }

    private boolean isOkClientResponse(ClientResponse clientResponse) {
        return clientResponse.getStatusCode() == HttpResponseStatus.OK.code();
    }

    private void sendResponse(RoutingContext routingContext, ClientResponse clientResponse) {
        HttpServerResponse response = routingContext.response();
        writeHeaders(routingContext.response(), clientResponse);
        response.setStatusCode(clientResponse.getStatusCode());
        if (isOkClientResponse(clientResponse)) {
            response.end(Buffer.newInstance(clientResponse.getBody()));
        } else {
            response.end();
        }
    }

    private void writeHeaders(HttpServerResponse httpServerResponse, ClientResponse clientResponse) {
        clientResponse.getHeaders().names().stream().filter(this::headerFilter).forEach(str -> {
            clientResponse.getHeaders().getAll(str).forEach(str -> {
                httpServerResponse.headers().add(str, str);
            });
        });
        httpServerResponse.headers().remove(HttpHeaders.CONTENT_LENGTH.toString());
    }

    private Boolean headerFilter(String str) {
        return Boolean.valueOf(this.configuration.allowedResponseHeaders().contains(str.toLowerCase()));
    }

    private void traceMessage(KnotContext knotContext) {
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("Got message from <fragment-assembler> with value <{}>", knotContext.toJson().encode());
        }
    }
}
