/*
 * Decompiled with CFR 0.152.
 */
package net.acesinc.data.json.generator.log;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.metamx.common.Granularity;
import com.metamx.tranquility.beam.ClusteredBeamTuning;
import com.metamx.tranquility.druid.DruidBeams;
import com.metamx.tranquility.druid.DruidDimensions;
import com.metamx.tranquility.druid.DruidLocation;
import com.metamx.tranquility.druid.DruidRollup;
import com.metamx.tranquility.druid.DruidSpatialDimension;
import com.metamx.tranquility.typeclass.Timestamper;
import com.twitter.finagle.Service;
import com.twitter.util.Await;
import com.twitter.util.Awaitable;
import com.twitter.util.Future;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import net.acesinc.data.json.generator.log.EventLogger;
import net.acesinc.data.json.util.JsonUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.joda.time.DateTime;
import org.joda.time.Period;

public class TranquilityLogger
implements EventLogger {
    private static final Logger log = LogManager.getLogger(TranquilityLogger.class);
    public static final String OVERLORD_NAME_PROP_NAME = "overlord.name";
    public static final String FIREHOSE_PATTERN_PROP_NAME = "firehose.pattern";
    public static final String DISCOVERY_PATH_PROP_NAME = "discovery.path";
    public static final String DATASOURCE_NAME_PROP_NAME = "datasource.name";
    public static final String DIMENSIONS_PROP_NAME = "dimensions";
    public static final String GEOSPATIAL_DIMENSIONS_PROP_NAME = "geo.dimensions";
    public static final String TIMESTAMP_NAME_PROP_NAME = "timestamp.name";
    public static final String TIMESTAMP_FORMAT_PROP_NAME = "timestamp.format";
    public static final String SEGMENT_GRANULARITY_PROP_NAME = "segment.granularity";
    public static final String QUERY_GRANULARITY_PROP_NAME = "query.granularity";
    public static final String ZOOKEEPER_HOST_PROP_NAME = "zookeeper.host";
    public static final String ZOOKEEPER_PORT_PROP_NAME = "zookeeper.port";
    public static final String FLATTEN_PROP_NAME = "flatten";
    public static final String SYNC_PROP_NAME = "sync";
    private final String indexService;
    private final String firehosePattern;
    private final String discoveryPath;
    private final String dataSourceName;
    private final String dimensionNames;
    private final String geoSpatialDims;
    private final String timestampName;
    private final String timestampFormat;
    private final String segmentGranularity;
    private final String queryGranularity;
    private final String zookeeperHost;
    private final Integer zookeeperPort;
    private final boolean flatten;
    private final boolean sync;
    private final List<String> dimensions;
    private DruidDimensions druidDimensions;
    private final List<AggregatorFactory> aggregators;
    private final Timestamper<Map<String, Object>> timestamper;
    private final CuratorFramework curator;
    private final TimestampSpec timestampSpec;
    private final Service<List<Map<String, Object>>, Integer> druidService;
    private JsonUtils jsonUtils = new JsonUtils();
    private ObjectMapper mapper = new ObjectMapper();

    public TranquilityLogger(Map<String, Object> props) {
        this.indexService = (String)props.get(OVERLORD_NAME_PROP_NAME);
        this.firehosePattern = (String)props.get(FIREHOSE_PATTERN_PROP_NAME);
        this.discoveryPath = (String)props.get(DISCOVERY_PATH_PROP_NAME);
        this.dataSourceName = (String)props.get(DATASOURCE_NAME_PROP_NAME);
        this.dimensionNames = (String)props.get(DIMENSIONS_PROP_NAME);
        this.geoSpatialDims = (String)props.get(GEOSPATIAL_DIMENSIONS_PROP_NAME);
        this.timestampName = (String)props.get(TIMESTAMP_NAME_PROP_NAME);
        this.timestampFormat = (String)props.getOrDefault(TIMESTAMP_FORMAT_PROP_NAME, "auto");
        this.segmentGranularity = ((String)props.getOrDefault(SEGMENT_GRANULARITY_PROP_NAME, "hour")).toUpperCase();
        this.queryGranularity = ((String)props.getOrDefault(QUERY_GRANULARITY_PROP_NAME, "minute")).toUpperCase();
        this.zookeeperHost = (String)props.get(ZOOKEEPER_HOST_PROP_NAME);
        this.zookeeperPort = (Integer)props.get(ZOOKEEPER_PORT_PROP_NAME);
        this.flatten = (Boolean)props.getOrDefault(FLATTEN_PROP_NAME, true);
        this.sync = (Boolean)props.getOrDefault(SYNC_PROP_NAME, false);
        this.dimensions = new ArrayList<String>();
        if (this.dimensionNames != null && !this.dimensionNames.isEmpty()) {
            String[] dims;
            for (String s : dims = this.dimensionNames.split(",")) {
                this.dimensions.add(s.trim());
            }
        }
        if (this.dimensions.isEmpty()) {
            log.debug("Configuring Tranquility with Schemaless ingestion");
            this.druidDimensions = DruidDimensions.schemaless();
        } else {
            log.debug("Configuring Tranqulity with the following dimensions: " + this.dimensions.toString());
            this.druidDimensions = DruidDimensions.specific(this.dimensions);
        }
        ArrayList<String> geoDims = new ArrayList<String>();
        if (this.geoSpatialDims != null && !this.geoSpatialDims.isEmpty()) {
            String[] dims;
            for (String s : dims = this.geoSpatialDims.split(",")) {
                geoDims.add(s.trim());
            }
        }
        if (!geoDims.isEmpty()) {
            log.debug("Adding Geospatial Dimensions: " + ((Object)geoDims).toString());
            this.druidDimensions = this.druidDimensions.withSpatialDimensions((List)Lists.newArrayList((Object[])new DruidSpatialDimension[]{DruidSpatialDimension.multipleField((String)"geo", geoDims)}));
        }
        this.aggregators = ImmutableList.of((Object)new CountAggregatorFactory("events"));
        this.timestamper = new Timestamper<Map<String, Object>>(){

            public DateTime timestamp(Map<String, Object> theMap) {
                return new DateTime(theMap.get(TranquilityLogger.this.timestampName));
            }
        };
        this.curator = CuratorFrameworkFactory.builder().connectString(this.zookeeperHost + ":" + this.zookeeperPort.toString()).retryPolicy((RetryPolicy)new ExponentialBackoffRetry(1000, 20, 30000)).build();
        this.curator.start();
        log.debug("Confiuring Tranqulity Timestamp Spec with { name: " + this.timestampName + ", format: " + this.timestampFormat + " }");
        this.timestampSpec = new TimestampSpec(this.timestampName, this.timestampFormat);
        log.debug("Creating Druid Beam for DataSource [ " + this.dataSourceName + " ]");
        this.druidService = DruidBeams.builder(this.timestamper).curator(this.curator).discoveryPath(this.discoveryPath).location(DruidLocation.create((String)this.indexService, (String)this.firehosePattern, (String)this.dataSourceName)).timestampSpec(this.timestampSpec).rollup(DruidRollup.create((DruidDimensions)this.druidDimensions, this.aggregators, (QueryGranularity)QueryGranularity.fromString((String)this.queryGranularity))).tuning(ClusteredBeamTuning.builder().segmentGranularity(Granularity.valueOf((String)this.segmentGranularity)).windowPeriod(new Period((Object)"PT10M")).partitions(1).replicants(1).build()).buildJavaService();
    }

    @Override
    public void logEvent(String event, Map<String, Object> producerConfig) {
        this.logEvent(event);
    }

    private void logEvent(String event) {
        try {
            String output = event;
            if (this.flatten) {
                try {
                    output = this.jsonUtils.flattenJson(event);
                }
                catch (IOException ex) {
                    log.error("Error flattening json. Unable to send event [ " + event + " ]", (Throwable)ex);
                    return;
                }
            }
            log.debug("Sending event to Tranquility: [ " + output + " ]");
            Map map = (Map)this.mapper.readValue(output, Map.class);
            ImmutableList listOfEvents = ImmutableList.of((Object)map);
            Future numSentFuture = this.druidService.apply((Object)listOfEvents);
            if (this.sync) {
                log.debug("Waiting for ACK");
                Integer numSent = (Integer)Await.result((Awaitable)numSentFuture);
                log.debug("ACK recieved! Continue on");
            }
        }
        catch (Exception ex) {
            log.error("Error sending event to Druid", (Throwable)ex);
        }
    }

    @Override
    public void shutdown() {
        try {
            log.info("Shutting down Tranquility Logger");
            Await.result((Awaitable)this.druidService.close());
            this.curator.close();
            log.info("Successfully Shutdown Tranquility Logger");
        }
        catch (Exception ex) {
            log.error("Error shutting down Tranquility Logger", (Throwable)ex);
        }
    }
}

