package org.apache.pulsar.io.http;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.avro.generic.GenericRecord;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;

/* loaded from: input_file:org/apache/pulsar/io/http/HttpSink.class */
public class HttpSink implements Sink<GenericObject> {
    HttpSinkConfig httpSinkConfig;
    private HttpClient httpClient;
    private ObjectMapper mapper;
    private URI uri;

    @Override // org.apache.pulsar.io.core.Sink
    public void open(Map<String, Object> map, SinkContext sinkContext) throws Exception {
        this.httpSinkConfig = HttpSinkConfig.load(map);
        this.uri = new URI(this.httpSinkConfig.getUrl());
        this.httpClient = HttpClient.newHttpClient();
        this.mapper = new ObjectMapper().registerModule(new JavaTimeModule());
    }

    @Override // org.apache.pulsar.io.core.Sink
    public void write(Record<GenericObject> record) throws Exception {
        HttpRequest.Builder POST = HttpRequest.newBuilder().uri(this.uri).POST(HttpRequest.BodyPublishers.ofByteArray(this.mapper.writeValueAsBytes(toJsonSerializable(record.getSchema(), record.getValue().getNativeObject()))));
        Map<String, String> headers = this.httpSinkConfig.getHeaders();
        Objects.requireNonNull(POST);
        headers.forEach(POST::header);
        record.getProperties().forEach((str, str2) -> {
            POST.header("PulsarProperties-" + str, str2);
        });
        record.getTopicName().ifPresent(str3 -> {
            POST.header("PulsarTopic", str3);
        });
        record.getEventTime().ifPresent(l -> {
            POST.header("PulsarEventTime", l.toString());
        });
        record.getKey().ifPresent(str4 -> {
            POST.header("PulsarKey", str4);
        });
        record.getMessage().ifPresent(message -> {
            if (message.getMessageId() != null) {
                POST.header("PulsarMessageId", Base64.getEncoder().encodeToString(message.getMessageId().toByteArray()));
            }
            if (message.getPublishTime() != 0) {
                POST.header("PulsarPublishTime", String.valueOf(message.getPublishTime()));
            }
        });
        POST.header("Content-Type", "application/json");
        HttpResponse send = this.httpClient.send(POST.build(), HttpResponse.BodyHandlers.ofString());
        if (send.statusCode() < 200 || send.statusCode() >= 300) {
            throw new IOException(String.format("HTTP call to %s failed with status code %s", this.uri, Integer.valueOf(send.statusCode())));
        }
    }

    private static Object toJsonSerializable(Schema<?> schema, Object obj) {
        if (schema == null || schema.getSchemaInfo().getType().isPrimitive()) {
            return obj;
        }
        switch (schema.getSchemaInfo().getType()) {
            case KEY_VALUE:
                KeyValueSchema keyValueSchema = (KeyValueSchema) schema;
                KeyValue keyValue = (KeyValue) obj;
                HashMap hashMap = new HashMap();
                Object key = keyValue.getKey();
                Object value = keyValue.getValue();
                hashMap.put(TypedMessageBuilder.CONF_KEY, toJsonSerializable(keyValueSchema.getKeySchema(), key instanceof GenericObject ? ((GenericObject) key).getNativeObject() : key));
                hashMap.put("value", toJsonSerializable(keyValueSchema.getValueSchema(), value instanceof GenericObject ? ((GenericObject) value).getNativeObject() : value));
                return hashMap;
            case AVRO:
                return JsonConverter.toJson((GenericRecord) obj);
            case JSON:
                return obj;
            default:
                throw new UnsupportedOperationException("Unsupported schema type =" + schema.getSchemaInfo().getType());
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
    }
}
