package io.fluxcapacitor.proxy;

import io.fluxcapacitor.common.Guarantee;
import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.Registration;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.javaclient.common.serialization.Serializer;
import io.fluxcapacitor.javaclient.configuration.client.Client;
import io.fluxcapacitor.javaclient.tracking.ConsumerConfiguration;
import io.fluxcapacitor.javaclient.tracking.client.DefaultTracker;
import io.fluxcapacitor.javaclient.web.WebRequest;
import io.fluxcapacitor.javaclient.web.WebRequestSettings;
import io.fluxcapacitor.javaclient.web.WebResponse;
import io.fluxcapacitor.javaclient.web.WebUtils;
import java.beans.ConstructorProperties;
import java.io.ByteArrayInputStream;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/fluxcapacitor/proxy/ReverseProxyConsumer.class */
public class ReverseProxyConsumer implements Consumer<List<SerializedMessage>> {

    @Generated
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ReverseProxyConsumer.class);
    private static final HttpClient httpClient = HttpClient.newBuilder().followRedirects(HttpClient.Redirect.NORMAL).connectTimeout(Duration.ofSeconds(5)).build();
    protected static final WebRequestSettings defaultSettings = WebRequestSettings.builder().build();
    protected static final Serializer serializer = new ProxySerializer();
    private final Client client;
    private final String consumerName;
    private final Long minIndex;
    protected final Map<String, Registration> runningConsumers = new ConcurrentHashMap();
    private final AtomicReference<Object> mainConsumer = new AtomicReference<>();

    public static Registration start(Client client) {
        ReverseProxyConsumer reverseProxyConsumer = new ReverseProxyConsumer(client, defaultSettings.getConsumer(), null);
        reverseProxyConsumer.runningConsumers.computeIfAbsent(defaultSettings.getConsumer(), str -> {
            return reverseProxyConsumer.start();
        });
        return () -> {
            Collection<Registration> values = reverseProxyConsumer.runningConsumers.values();
            values.forEach((v0) -> {
                v0.cancel();
            });
            values.clear();
        };
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Registration start() {
        log.info(isMainConsumer() ? "Starting consumer {}" : "Starting consumer {} at {}", this.consumerName, this.minIndex);
        return DefaultTracker.start(this, MessageType.WEBREQUEST, ConsumerConfiguration.builder().name(this.consumerName).minIndex(this.minIndex).threads(4).build(), this.client);
    }

    @Override // java.util.function.Consumer
    public void accept(List<SerializedMessage> list) {
        for (SerializedMessage serializedMessage : list) {
            try {
                WebRequestSettings settings = getSettings(serializedMessage);
                if (this.consumerName.equals(settings.getConsumer())) {
                    URI create = URI.create(WebRequest.getUrl(serializedMessage.getMetadata()));
                    if (create.isAbsolute()) {
                        handle(serializedMessage, create, settings);
                    }
                } else if (isMainConsumer()) {
                    this.runningConsumers.computeIfAbsent(settings.getConsumer(), str -> {
                        return new ReverseProxyConsumer(this.client, str, serializedMessage.getIndex()).start();
                    });
                }
            } catch (Throwable th) {
                log.error("Failed to handle external request {}. Continuing..", serializedMessage.getMessageId(), th);
                try {
                    sendResponse(asWebResponse(th), serializedMessage);
                } catch (Throwable th2) {
                    th2.addSuppressed(th);
                    log.error("Failed to send error response. Continuing..", th2);
                }
            }
        }
    }

    void handle(SerializedMessage serializedMessage, URI uri, WebRequestSettings webRequestSettings) {
        sendResponse(executeRequest(asHttpRequest(serializedMessage, uri, webRequestSettings)), serializedMessage);
    }

    HttpRequest asHttpRequest(SerializedMessage serializedMessage, URI uri, WebRequestSettings webRequestSettings) {
        HttpRequest.Builder timeout = HttpRequest.newBuilder().version(HttpClient.Version.valueOf(webRequestSettings.getHttpVersion().name())).timeout(webRequestSettings.getTimeout());
        WebRequest.getHeaders(serializedMessage.getMetadata()).forEach((str, list) -> {
            list.forEach(str -> {
                timeout.header(str, str);
            });
        });
        timeout.uri(uri).method(WebRequest.getMethod(serializedMessage.getMetadata()).name(), getBodyPublisher(serializedMessage));
        return timeout.build();
    }

    protected WebRequestSettings getSettings(SerializedMessage serializedMessage) {
        return (WebRequestSettings) Optional.ofNullable((WebRequestSettings) serializedMessage.getMetadata().get("settings", WebRequestSettings.class)).orElse(defaultSettings);
    }

    WebResponse executeRequest(HttpRequest httpRequest) {
        try {
            return asWebResponse(httpClient.send(httpRequest, HttpResponse.BodyHandlers.ofByteArray()));
        } catch (Throwable th) {
            log.error("Failed to handle external request. Returning error.. ", th);
            return asWebResponse(th);
        }
    }

    void sendResponse(WebResponse webResponse, SerializedMessage serializedMessage) {
        SerializedMessage serialize = webResponse.serialize(serializer);
        serialize.setRequestId(serializedMessage.getRequestId());
        serialize.setTarget(serializedMessage.getSource());
        this.client.getGatewayClient(MessageType.WEBRESPONSE).append(Guarantee.NONE, serialize);
    }

    WebResponse asWebResponse(HttpResponse<byte[]> httpResponse) {
        WebResponse.Builder builder = WebResponse.builder();
        httpResponse.headers().map().forEach((str, list) -> {
            list.forEach(str -> {
                builder.header(WebUtils.fixHeaderName(str), str);
            });
        });
        return builder.status(Integer.valueOf(httpResponse.statusCode())).payload(httpResponse.body()).build();
    }

    WebResponse asWebResponse(Throwable th) {
        return WebResponse.builder().status(502).payload(((String) Optional.ofNullable(th.getMessage()).orElse("Exception while handling request in proxy")).getBytes()).build();
    }

    HttpRequest.BodyPublisher getBodyPublisher(SerializedMessage serializedMessage) {
        String type = serializedMessage.getData().getType();
        return (type == null || Void.class.getName().equals(type) || serializedMessage.getData().getValue().length == 0) ? HttpRequest.BodyPublishers.noBody() : HttpRequest.BodyPublishers.ofInputStream(() -> {
            return new ByteArrayInputStream(serializedMessage.data().getValue());
        });
    }

    @Generated
    @ConstructorProperties({"client", "consumerName", "minIndex"})
    private ReverseProxyConsumer(Client client, String str, Long l) {
        this.client = client;
        this.consumerName = str;
        this.minIndex = l;
    }

    @Generated
    protected boolean isMainConsumer() {
        Object obj = this.mainConsumer.get();
        if (obj == null) {
            synchronized (this.mainConsumer) {
                obj = this.mainConsumer.get();
                if (obj == null) {
                    obj = Boolean.valueOf(this.minIndex == null);
                    this.mainConsumer.set(obj);
                }
            }
        }
        return ((Boolean) obj).booleanValue();
    }
}
