package net.acesinc.data.json.generator.log;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.metamx.common.Granularity;
import com.metamx.tranquility.beam.ClusteredBeamTuning;
import com.metamx.tranquility.druid.DruidBeams;
import com.metamx.tranquility.druid.DruidDimensions;
import com.metamx.tranquility.druid.DruidLocation;
import com.metamx.tranquility.druid.DruidRollup;
import com.metamx.tranquility.druid.DruidSpatialDimension;
import com.metamx.tranquility.typeclass.Timestamper;
import com.twitter.finagle.Service;
import com.twitter.util.Await;
import com.twitter.util.Future;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import net.acesinc.data.json.util.JsonUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.joda.time.DateTime;
import org.joda.time.Period;

/* loaded from: input_file:net/acesinc/data/json/generator/log/TranquilityLogger.class */
public class TranquilityLogger implements EventLogger {
    private static final Logger log = LogManager.getLogger(TranquilityLogger.class);
    public static final String OVERLORD_NAME_PROP_NAME = "overlord.name";
    public static final String FIREHOSE_PATTERN_PROP_NAME = "firehose.pattern";
    public static final String DISCOVERY_PATH_PROP_NAME = "discovery.path";
    public static final String DATASOURCE_NAME_PROP_NAME = "datasource.name";
    public static final String DIMENSIONS_PROP_NAME = "dimensions";
    public static final String GEOSPATIAL_DIMENSIONS_PROP_NAME = "geo.dimensions";
    public static final String TIMESTAMP_NAME_PROP_NAME = "timestamp.name";
    public static final String TIMESTAMP_FORMAT_PROP_NAME = "timestamp.format";
    public static final String SEGMENT_GRANULARITY_PROP_NAME = "segment.granularity";
    public static final String QUERY_GRANULARITY_PROP_NAME = "query.granularity";
    public static final String ZOOKEEPER_HOST_PROP_NAME = "zookeeper.host";
    public static final String ZOOKEEPER_PORT_PROP_NAME = "zookeeper.port";
    public static final String FLATTEN_PROP_NAME = "flatten";
    public static final String SYNC_PROP_NAME = "sync";
    private final String indexService;
    private final String firehosePattern;
    private final String discoveryPath;
    private final String dataSourceName;
    private final String dimensionNames;
    private final String geoSpatialDims;
    private final String timestampName;
    private final String timestampFormat;
    private final String segmentGranularity;
    private final String queryGranularity;
    private final String zookeeperHost;
    private final Integer zookeeperPort;
    private final boolean flatten;
    private final boolean sync;
    private DruidDimensions druidDimensions;
    private final List<AggregatorFactory> aggregators;
    private final Timestamper<Map<String, Object>> timestamper;
    private final CuratorFramework curator;
    private final TimestampSpec timestampSpec;
    private final Service<List<Map<String, Object>>, Integer> druidService;
    private JsonUtils jsonUtils = new JsonUtils();
    private ObjectMapper mapper = new ObjectMapper();
    private final List<String> dimensions = new ArrayList();

    public TranquilityLogger(Map<String, Object> map) {
        this.indexService = (String) map.get(OVERLORD_NAME_PROP_NAME);
        this.firehosePattern = (String) map.get(FIREHOSE_PATTERN_PROP_NAME);
        this.discoveryPath = (String) map.get(DISCOVERY_PATH_PROP_NAME);
        this.dataSourceName = (String) map.get(DATASOURCE_NAME_PROP_NAME);
        this.dimensionNames = (String) map.get(DIMENSIONS_PROP_NAME);
        this.geoSpatialDims = (String) map.get(GEOSPATIAL_DIMENSIONS_PROP_NAME);
        this.timestampName = (String) map.get(TIMESTAMP_NAME_PROP_NAME);
        this.timestampFormat = (String) map.getOrDefault(TIMESTAMP_FORMAT_PROP_NAME, "auto");
        this.segmentGranularity = ((String) map.getOrDefault(SEGMENT_GRANULARITY_PROP_NAME, "hour")).toUpperCase();
        this.queryGranularity = ((String) map.getOrDefault(QUERY_GRANULARITY_PROP_NAME, "minute")).toUpperCase();
        this.zookeeperHost = (String) map.get(ZOOKEEPER_HOST_PROP_NAME);
        this.zookeeperPort = (Integer) map.get(ZOOKEEPER_PORT_PROP_NAME);
        this.flatten = ((Boolean) map.getOrDefault(FLATTEN_PROP_NAME, true)).booleanValue();
        this.sync = ((Boolean) map.getOrDefault(SYNC_PROP_NAME, false)).booleanValue();
        if (this.dimensionNames != null && !this.dimensionNames.isEmpty()) {
            for (String str : this.dimensionNames.split(",")) {
                this.dimensions.add(str.trim());
            }
        }
        if (this.dimensions.isEmpty()) {
            log.debug("Configuring Tranquility with Schemaless ingestion");
            this.druidDimensions = DruidDimensions.schemaless();
        } else {
            log.debug("Configuring Tranqulity with the following dimensions: " + this.dimensions.toString());
            this.druidDimensions = DruidDimensions.specific(this.dimensions);
        }
        ArrayList arrayList = new ArrayList();
        if (this.geoSpatialDims != null && !this.geoSpatialDims.isEmpty()) {
            for (String str2 : this.geoSpatialDims.split(",")) {
                arrayList.add(str2.trim());
            }
        }
        if (!arrayList.isEmpty()) {
            log.debug("Adding Geospatial Dimensions: " + arrayList.toString());
            this.druidDimensions = this.druidDimensions.withSpatialDimensions(Lists.newArrayList(new DruidSpatialDimension[]{DruidSpatialDimension.multipleField("geo", arrayList)}));
        }
        this.aggregators = ImmutableList.of(new CountAggregatorFactory("events"));
        this.timestamper = new Timestamper<Map<String, Object>>() { // from class: net.acesinc.data.json.generator.log.TranquilityLogger.1
            public DateTime timestamp(Map<String, Object> map2) {
                return new DateTime(map2.get(TranquilityLogger.this.timestampName));
            }
        };
        this.curator = CuratorFrameworkFactory.builder().connectString(this.zookeeperHost + ":" + this.zookeeperPort.toString()).retryPolicy(new ExponentialBackoffRetry(1000, 20, 30000)).build();
        this.curator.start();
        log.debug("Confiuring Tranqulity Timestamp Spec with { name: " + this.timestampName + ", format: " + this.timestampFormat + " }");
        this.timestampSpec = new TimestampSpec(this.timestampName, this.timestampFormat);
        log.debug("Creating Druid Beam for DataSource [ " + this.dataSourceName + " ]");
        this.druidService = DruidBeams.builder(this.timestamper).curator(this.curator).discoveryPath(this.discoveryPath).location(DruidLocation.create(this.indexService, this.firehosePattern, this.dataSourceName)).timestampSpec(this.timestampSpec).rollup(DruidRollup.create(this.druidDimensions, this.aggregators, QueryGranularity.fromString(this.queryGranularity))).tuning(ClusteredBeamTuning.builder().segmentGranularity(Granularity.valueOf(this.segmentGranularity)).windowPeriod(new Period("PT10M")).partitions(1).replicants(1).build()).buildJavaService();
    }

    @Override // net.acesinc.data.json.generator.log.EventLogger
    public void logEvent(String str, Map<String, Object> map) {
        logEvent(str);
    }

    private void logEvent(String str) {
        try {
            String str2 = str;
            if (this.flatten) {
                try {
                    str2 = this.jsonUtils.flattenJson(str);
                } catch (IOException e) {
                    log.error("Error flattening json. Unable to send event [ " + str + " ]", e);
                    return;
                }
            }
            log.debug("Sending event to Tranquility: [ " + str2 + " ]");
            Future apply = this.druidService.apply(ImmutableList.of((Map) this.mapper.readValue(str2, Map.class)));
            if (this.sync) {
                log.debug("Waiting for ACK");
                log.debug("ACK recieved! Continue on");
            }
        } catch (Exception e2) {
            log.error("Error sending event to Druid", e2);
        }
    }

    @Override // net.acesinc.data.json.generator.log.EventLogger
    public void shutdown() {
        try {
            log.info("Shutting down Tranquility Logger");
            Await.result(this.druidService.close());
            this.curator.close();
            log.info("Successfully Shutdown Tranquility Logger");
        } catch (Exception e) {
            log.error("Error shutting down Tranquility Logger", e);
        }
    }
}
