package ch.squaredesk.nova.metrics.elastic;

import ch.squaredesk.nova.metrics.MetricsConverter;
import ch.squaredesk.nova.metrics.MetricsDump;
import ch.squaredesk.nova.metrics.SerializableMetricsDump;
import io.reactivex.Observable;
import io.reactivex.Single;
import io.reactivex.functions.Consumer;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ch/squaredesk/nova/metrics/elastic/ElasticMetricsReporter.class */
public class ElasticMetricsReporter implements Consumer<MetricsDump> {
    private static final Logger logger = LoggerFactory.getLogger(ElasticMetricsReporter.class);
    private final Consumer<Throwable> defaultExceptionHandler;
    private final String elasticServer;
    private final int elasticPort;
    private final String indexName;
    private final Map<String, Object> additionalMetricAttributes;
    private final ZoneId zoneForTimestamps;
    private RestClient restClient;
    private RestHighLevelClient client;

    public ElasticMetricsReporter(String str, int i, String str2) {
        this(str, i, str2, Collections.emptyMap());
    }

    public ElasticMetricsReporter(String str, int i, String str2, Map<String, Object> map) {
        this.zoneForTimestamps = ZoneId.of("UTC");
        this.elasticServer = str;
        this.elasticPort = i;
        this.indexName = str2;
        this.additionalMetricAttributes = map == null ? Collections.emptyMap() : map;
        this.defaultExceptionHandler = th -> {
            logger.error("Unable to upload metrics to index " + str2, th);
        };
    }

    public void accept(MetricsDump metricsDump) {
        accept(metricsDump, this.defaultExceptionHandler);
    }

    public void accept(MetricsDump metricsDump, Consumer<Throwable> consumer) {
        if (this.client == null) {
            throw new IllegalStateException("not started yet");
        }
        fireRequest(requestFor(metricsDump), consumer);
    }

    public void accept(SerializableMetricsDump serializableMetricsDump) {
        accept(serializableMetricsDump, this.defaultExceptionHandler);
    }

    public void accept(SerializableMetricsDump serializableMetricsDump, Consumer<Throwable> consumer) {
        if (this.client == null) {
            throw new IllegalStateException("not started yet");
        }
        fireRequest(requestFor(serializableMetricsDump), consumer);
    }

    public void startup() {
        logger.info("Connecting to Elasticsearch @ " + this.elasticServer + ":" + this.elasticPort);
        try {
            RestClientBuilder builder = RestClient.builder(new HttpHost[]{new HttpHost(this.elasticServer, this.elasticPort, "http")});
            this.restClient = builder.build();
            this.client = new RestHighLevelClient(builder);
        } catch (Exception e) {
            logger.error("Unable to connect to Elastic @ " + this.elasticServer + ":" + this.elasticPort, e);
        }
        logger.info("\tsuccessfully established connection to Elastic @ " + this.elasticServer + ":" + this.elasticPort + " :-)");
    }

    public void shutdown() {
        if (this.client != null) {
            logger.info("Shutting down connection to Elasticsearch");
            try {
                this.restClient.close();
                logger.info("\tsuccessfully shutdown connection to Elasticsearch :-)");
            } catch (IOException e) {
                logger.info("Error, trying to close connection to ElasticSearch", e);
            }
        }
    }

    void fireRequest(Single<BulkRequest> single, Consumer<Throwable> consumer) {
        Objects.requireNonNull(consumer, "exceptionHandler must not be null");
        single.subscribe(bulkRequest -> {
            BulkResponse bulk = this.client.bulk(bulkRequest, new Header[0]);
            if (bulk.hasFailures()) {
                logger.warn("Error uploading metrics: " + bulk.buildFailureMessage());
            } else {
                logger.trace("Successfully uploaded {} metric(s)", Integer.valueOf(bulk.getItems().length));
            }
        }, consumer);
    }

    private long timestampInUtc(long j) {
        return Instant.ofEpochMilli(j).atZone(this.zoneForTimestamps).toInstant().toEpochMilli();
    }

    private Single<BulkRequest> requestFor(Observable<Map<String, Object>> observable) {
        return observable.map(map -> {
            return new IndexRequest().index(this.indexName).type("doc").source(map);
        }).reduce(Requests.bulkRequest(), (bulkRequest, indexRequest) -> {
            bulkRequest.add(indexRequest);
            return bulkRequest;
        });
    }

    Single<BulkRequest> requestFor(Map<String, Object> map) {
        Long l = (Long) map.remove("timestamp");
        long timestampInUtc = l == null ? timestampInUtc(System.currentTimeMillis()) : timestampInUtc(l.longValue());
        String str = (String) map.remove("hostName");
        String str2 = (String) map.remove("hostAddress");
        return Observable.fromIterable(map.entrySet()).filter(entry -> {
            return entry.getValue() instanceof Map;
        }).map(entry2 -> {
            Map map2 = (Map) entry2.getValue();
            map2.put("name", entry2.getKey());
            return map2;
        }).map(map2 -> {
            map2.put("@timestamp", Long.valueOf(timestampInUtc));
            map2.put("host", str);
            map2.put("hostAddress", str2);
            map2.putAll(this.additionalMetricAttributes);
            return new IndexRequest().index(this.indexName).type("doc").source(map2);
        }).reduce(Requests.bulkRequest(), (bulkRequest, indexRequest) -> {
            bulkRequest.add(indexRequest);
            return bulkRequest;
        });
    }

    Single<BulkRequest> requestFor(MetricsDump metricsDump) {
        HashMap hashMap = new HashMap();
        hashMap.put("@timestamp", Long.valueOf(timestampInUtc(metricsDump.timestamp)));
        hashMap.put("host", metricsDump.hostName);
        hashMap.put("hostAddress", metricsDump.hostAddress);
        hashMap.putAll(this.additionalMetricAttributes);
        return requestFor(Observable.fromIterable(MetricsConverter.convert(metricsDump.metrics, hashMap).entrySet()).map(entry -> {
            Map map = (Map) entry.getValue();
            map.put("name", entry.getKey());
            return map;
        }));
    }

    Single<BulkRequest> requestFor(SerializableMetricsDump serializableMetricsDump) {
        HashMap hashMap = new HashMap();
        hashMap.put("@timestamp", Long.valueOf(timestampInUtc(serializableMetricsDump.timestamp)));
        hashMap.put("host", serializableMetricsDump.hostName);
        hashMap.put("hostAddress", serializableMetricsDump.hostAddress);
        hashMap.putAll(this.additionalMetricAttributes);
        return requestFor(Observable.fromIterable(serializableMetricsDump.metrics.entrySet()).map(entry -> {
            Map map = (Map) entry.getValue();
            map.put("name", entry.getKey());
            map.putAll(hashMap);
            return map;
        }));
    }
}
