/*
 * Decompiled with CFR 0.152.
 */
package com.salesforce.cantor.s3;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.encoder.PatternLayoutEncoder;
import ch.qos.logback.classic.sift.MDCBasedDiscriminator;
import ch.qos.logback.classic.sift.SiftingAppender;
import ch.qos.logback.core.Appender;
import ch.qos.logback.core.Context;
import ch.qos.logback.core.FileAppender;
import ch.qos.logback.core.encoder.Encoder;
import ch.qos.logback.core.sift.Discriminator;
import ch.qos.logback.core.util.Duration;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.transfer.MultipleFileUpload;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.salesforce.cantor.Events;
import com.salesforce.cantor.common.EventsPreconditions;
import com.salesforce.cantor.s3.AbstractBaseS3Namespaceable;
import com.salesforce.cantor.s3.S3Utils;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Scanner;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public class EventsOnS3
extends AbstractBaseS3Namespaceable
implements Events {
    private static final Logger logger = LoggerFactory.getLogger(EventsOnS3.class);
    private static final String defaultBufferDirectory = "/tmp/cantor-events-s3-buffer";
    private static final long defaultFlushIntervalSeconds = 30L;
    private static final String dimensionKeyPayloadOffset = ".payload-offset";
    private static final String dimensionKeyPayloadLength = ".payload-length";
    private final AtomicReference<String> currentFlushCycleGuid = new AtomicReference();
    private final Gson parser = new GsonBuilder().create();
    private final Logger siftingLogger = this.initSiftingLogger();
    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
    private final String bufferDirectory;
    private final long maxFlushIntervalSeconds;
    private static final String objectKeyPrefix = "cantor-events-%s/";
    private final DateFormat cycleNameFormatter = new SimpleDateFormat("YYYY-MM-dd_HH-mm-ss");
    private final DateFormat directoryFormatter = new SimpleDateFormat("YYYY/MM/dd/HH/mm");
    private final DateFormat directoryFormatterHourly = new SimpleDateFormat("YYYY/MM/dd/HH/");
    private final DateFormat directoryFormatterDayly = new SimpleDateFormat("YYYY/MM/dd/");
    private final DateFormat directoryFormatterMonthly = new SimpleDateFormat("YYYY/MM/");
    private final TransferManager s3TransferManager;
    private LoadingCache<String, AtomicLong> payloadOffset = CacheBuilder.newBuilder().build((CacheLoader)new CacheLoader<String, AtomicLong>(){

        public AtomicLong load(String path) {
            return new AtomicLong(0L);
        }
    });

    public EventsOnS3(AmazonS3 s3Client, String bucketName) throws IOException {
        this(s3Client, bucketName, defaultBufferDirectory, 30L);
    }

    public EventsOnS3(AmazonS3 s3Client, String bucketName, String bufferDirectory) throws IOException {
        this(s3Client, bucketName, bufferDirectory, 30L);
    }

    public EventsOnS3(AmazonS3 s3Client, String bucketName, String bufferDirectory, long maxFlushIntervalSeconds) throws IOException {
        super(s3Client, bucketName, "events");
        this.bufferDirectory = bufferDirectory;
        this.maxFlushIntervalSeconds = maxFlushIntervalSeconds;
        TransferManagerBuilder builder = TransferManagerBuilder.standard();
        builder.setS3Client(this.s3Client);
        this.s3TransferManager = builder.build();
        this.executor.submit(this::flush);
    }

    public void store(String namespace, Collection<Events.Event> batch) throws IOException {
        EventsPreconditions.checkStore((String)namespace, batch);
        this.checkNamespace(namespace);
        try {
            this.doStore(namespace, batch);
        }
        catch (AmazonS3Exception e) {
            logger.warn("exception storing events to namespace: " + namespace, (Throwable)e);
            throw new IOException("exception storing events to namespace: " + namespace, e);
        }
    }

    public List<Events.Event> get(String namespace, long startTimestampMillis, long endTimestampMillis, Map<String, String> metadataQuery, Map<String, String> dimensionsQuery, boolean includePayloads, boolean ascending, int limit) throws IOException {
        EventsPreconditions.checkGet((String)namespace, (long)startTimestampMillis, (long)endTimestampMillis, metadataQuery, dimensionsQuery);
        this.checkNamespace(namespace);
        try {
            return this.doGet(namespace, startTimestampMillis, endTimestampMillis, metadataQuery != null ? metadataQuery : Collections.emptyMap(), dimensionsQuery != null ? dimensionsQuery : Collections.emptyMap(), includePayloads, ascending, limit);
        }
        catch (AmazonS3Exception e) {
            logger.warn("exception getting events from namespace: " + namespace, (Throwable)e);
            throw new IOException("exception getting events from namespace: " + namespace, e);
        }
    }

    public int delete(String namespace, long startTimestampMillis, long endTimestampMillis, Map<String, String> metadataQuery, Map<String, String> dimensionsQuery) throws IOException {
        return -1;
    }

    public Map<Long, Double> aggregate(String namespace, String dimension, long startTimestampMillis, long endTimestampMillis, Map<String, String> metadataQuery, Map<String, String> dimensionsQuery, int aggregateIntervalMillis, Events.AggregationFunction aggregationFunction) throws IOException {
        return Collections.emptyMap();
    }

    public Set<String> metadata(String namespace, String metadataKey, long startTimestampMillis, long endTimestampMillis, Map<String, String> metadataQuery, Map<String, String> dimensionsQuery) throws IOException {
        return Collections.emptySet();
    }

    public void expire(String namespace, long endTimestampMillis) throws IOException {
        EventsPreconditions.checkExpire((String)namespace, (long)endTimestampMillis);
        this.checkNamespace(namespace);
        try {
            this.doExpire(namespace, endTimestampMillis);
        }
        catch (AmazonS3Exception e) {
            logger.warn("exception expiring events from namespace: " + namespace, (Throwable)e);
            throw new IOException("exception expiring events from namespace: " + namespace, e);
        }
    }

    @Override
    protected String getObjectKeyPrefix(String namespace) {
        return String.format(objectKeyPrefix, namespace);
    }

    private Logger initSiftingLogger() {
        LoggerContext loggerContext = (LoggerContext)LoggerFactory.getILoggerFactory();
        SiftingAppender siftingAppender = new SiftingAppender();
        String loggerName = "cantor-s3-events-sifting-logger";
        siftingAppender.setName("cantor-s3-events-sifting-logger");
        siftingAppender.setContext((Context)loggerContext);
        MDCBasedDiscriminator discriminator = new MDCBasedDiscriminator();
        discriminator.setKey("path");
        discriminator.setDefaultValue("unknown");
        discriminator.start();
        siftingAppender.setDiscriminator((Discriminator)discriminator);
        siftingAppender.setTimeout(Duration.buildBySeconds((double)3.0));
        siftingAppender.setAppenderFactory((context, discriminatingValue) -> {
            FileAppender fileAppender = new FileAppender();
            fileAppender.setName("file-" + discriminatingValue);
            fileAppender.setContext(context);
            fileAppender.setFile(discriminatingValue);
            PatternLayoutEncoder patternLayoutEncoder = new PatternLayoutEncoder();
            patternLayoutEncoder.setContext(context);
            patternLayoutEncoder.setPattern("%msg%n");
            patternLayoutEncoder.start();
            fileAppender.setEncoder((Encoder)patternLayoutEncoder);
            fileAppender.start();
            return fileAppender;
        });
        siftingAppender.start();
        ch.qos.logback.classic.Logger logger = loggerContext.getLogger("cantor-s3-events-sifting-logger");
        logger.setAdditive(false);
        logger.setLevel(Level.ALL);
        logger.addAppender((Appender)siftingAppender);
        return logger;
    }

    private synchronized void doStore(String namespace, Collection<Events.Event> batch) throws IOException {
        for (Events.Event event : batch) {
            HashMap metadata = new HashMap(event.getMetadata());
            HashMap<String, Double> dimensions = new HashMap<String, Double>(event.getDimensions());
            byte[] payload = event.getPayload();
            String currentCycleName = this.getRolloverCycleName();
            String cyclePath = this.getPath(currentCycleName);
            String filePath = String.format("%s/%s/%s.%s", cyclePath, EventsOnS3.trim(namespace), this.directoryFormatter.format(event.getTimestampMillis()), currentCycleName);
            String payloadFilePath = filePath + ".b64";
            String eventsFilePath = filePath + ".json";
            if (payload != null && payload.length > 0) {
                String payloadBase64 = Base64.getEncoder().encodeToString(payload);
                this.append(payloadFilePath, payloadBase64);
                long offset = ((AtomicLong)this.payloadOffset.getUnchecked((Object)payloadFilePath)).getAndAdd(payloadBase64.length() + 1);
                dimensions.put(dimensionKeyPayloadOffset, Double.valueOf(offset));
                dimensions.put(dimensionKeyPayloadLength, Double.valueOf(payloadBase64.length()));
            }
            Events.Event toWrite = new Events.Event(event.getTimestampMillis(), metadata, dimensions);
            this.append(eventsFilePath, this.parser.toJson((Object)toWrite));
        }
    }

    private void append(String path, String message) {
        MDC.put((String)"path", (String)path);
        this.siftingLogger.info(message);
        MDC.remove((String)"path");
    }

    private List<Events.Event> doGet(String namespace, long startTimestampMillis, long endTimestampMillis, Map<String, String> metadataQuery, Map<String, String> dimensionsQuery, boolean includePayloads, boolean ascending, int limit) throws IOException {
        List<String> matchingKeys = this.getMatchingKeys(namespace, startTimestampMillis, endTimestampMillis);
        logger.info("matching files are: {}", matchingKeys);
        TreeMap<Long, Events.Event> events = ascending ? new TreeMap<Long, Events.Event>() : new TreeMap(Collections.reverseOrder());
        Map<String, Pattern> metadataPatterns = this.generateRegex(metadataQuery);
        for (String objectKey : matchingKeys) {
            if (!objectKey.endsWith("json")) continue;
            String query = this.generateQuery(startTimestampMillis, endTimestampMillis, metadataQuery, dimensionsQuery);
            InputStream jsonLines = S3Utils.S3Select.queryObjectJson(this.s3Client, this.bucketName, objectKey, query);
            Scanner lineReader = new Scanner(jsonLines);
            while (lineReader.hasNext()) {
                Events.Event event = (Events.Event)this.parser.fromJson(lineReader.nextLine(), Events.Event.class);
                if (includePayloads && event.getDimensions().containsKey(dimensionKeyPayloadOffset) && event.getDimensions().containsKey(dimensionKeyPayloadLength)) {
                    long offset = ((Double)event.getDimensions().get(dimensionKeyPayloadOffset)).longValue();
                    long length = ((Double)event.getDimensions().get(dimensionKeyPayloadLength)).longValue();
                    String payloadFilename = objectKey.replace("json", "b64");
                    byte[] payloadBase64Bytes = S3Utils.getObjectBytes(this.s3Client, this.bucketName, payloadFilename, offset, offset + length - 1L);
                    if (payloadBase64Bytes == null || payloadBase64Bytes.length == 0) {
                        throw new IOException("failed to retrieve payload for event");
                    }
                    byte[] payload = Base64.getDecoder().decode(new String(payloadBase64Bytes));
                    Events.Event eventWithPayload = new Events.Event(event.getTimestampMillis(), event.getMetadata(), event.getDimensions(), payload);
                    if (!this.matches(eventWithPayload, metadataPatterns)) continue;
                    events.put(event.getTimestampMillis(), eventWithPayload);
                    continue;
                }
                if (!this.matches(event, metadataPatterns)) continue;
                events.put(event.getTimestampMillis(), event);
            }
            if (limit <= 0 || events.size() < limit) continue;
            break;
        }
        ArrayList<Events.Event> orderedEvents = new ArrayList<Events.Event>(events.values());
        return limit <= 0 ? orderedEvents : orderedEvents.subList(0, limit);
    }

    private void doExpire(String namespace, long endTimestampMillis) throws IOException {
        logger.info("expiring namespace '{}' with end timestamp of '{}'", (Object)namespace, (Object)endTimestampMillis);
        List<String> keys = this.getMatchingKeys(namespace, 0L, endTimestampMillis);
        logger.info("expiring objects: {}", keys);
        S3Utils.deleteObjects(this.s3Client, this.bucketName, keys);
    }

    private String generateQuery(long startTimestampMillis, long endTmestampMillis, Map<String, String> metadataQuery, Map<String, String> dimensionsQuery) {
        String timestampClause = String.format("s.timestampMillis BETWEEN %d AND %d", startTimestampMillis, endTmestampMillis);
        return String.format("SELECT * FROM s3object[*] s WHERE %s %s %s", timestampClause, this.getMetadataQuerySql(metadataQuery, false), this.getDimensionQuerySql(dimensionsQuery, false));
    }

    private List<String> getMatchingKeys(String namespace, long startTimestampMillis, long endTimestampMillis) throws IOException {
        HashSet<String> prefixes = new HashSet<String>();
        long start = startTimestampMillis;
        while (start <= endTimestampMillis) {
            if (endTimestampMillis - start < TimeUnit.HOURS.toMillis(1L)) {
                prefixes.add(String.format("%s/%s", EventsOnS3.trim(namespace), this.directoryFormatter.format(start)));
                start += TimeUnit.MINUTES.toMillis(1L);
                continue;
            }
            if (endTimestampMillis - start < TimeUnit.DAYS.toMillis(1L)) {
                prefixes.add(String.format("%s/%s", EventsOnS3.trim(namespace), this.directoryFormatterHourly.format(start)));
                start += TimeUnit.HOURS.toMillis(1L);
                continue;
            }
            if (endTimestampMillis - start < TimeUnit.DAYS.toMillis(28L)) {
                prefixes.add(String.format("%s/%s", EventsOnS3.trim(namespace), this.directoryFormatterDayly.format(start)));
                start += TimeUnit.DAYS.toMillis(1L);
                continue;
            }
            prefixes.add(String.format("%s/%s", EventsOnS3.trim(namespace), this.directoryFormatterMonthly.format(start)));
            start += TimeUnit.DAYS.toMillis(28L);
        }
        logger.info("prefixes are: {}", prefixes);
        ArrayList<String> matchingKeys = new ArrayList<String>();
        for (String prefix : prefixes) {
            matchingKeys.addAll(S3Utils.getKeys(this.s3Client, this.bucketName, prefix));
        }
        return matchingKeys;
    }

    private boolean matches(Events.Event event, Map<String, Pattern> metadataPatterns) {
        for (Map.Entry<String, Pattern> metaRegex : metadataPatterns.entrySet()) {
            String metadataValue = (String)event.getMetadata().get(metaRegex.getKey().substring(2));
            if (metadataValue == null) {
                return false;
            }
            Matcher regex = metaRegex.getValue().matcher(metadataValue);
            if (metaRegex.getKey().startsWith("_~") && !regex.matches()) {
                return false;
            }
            if (!metaRegex.getKey().startsWith("!~") || !regex.matches()) continue;
            return false;
        }
        return true;
    }

    private Map<String, Pattern> generateRegex(Map<String, String> metadataQuery) {
        HashMap<String, Pattern> regexes = new HashMap<String, Pattern>();
        for (Map.Entry<String, String> entry : metadataQuery.entrySet()) {
            try {
                String fullRegex;
                String maybeRegex = entry.getValue();
                if (maybeRegex.startsWith("~")) {
                    fullRegex = maybeRegex.substring(1).replaceAll("(\\.?)\\*", ".*");
                    regexes.put("_~" + entry.getKey(), Pattern.compile(fullRegex));
                    continue;
                }
                if (!maybeRegex.startsWith("!~")) continue;
                fullRegex = maybeRegex.substring(2).replaceAll("(\\.?)\\*", ".*");
                regexes.put("!~" + entry.getKey(), Pattern.compile(fullRegex));
            }
            catch (PatternSyntaxException pse) {
                logger.warn("invalid regex pattern caught; will allow as limited regex may cause this exception", (Throwable)pse);
            }
        }
        return regexes;
    }

    private String getMetadataQuerySql(Map<String, String> metadataQuery, boolean invert) {
        if (metadataQuery.isEmpty()) {
            return "";
        }
        StringBuilder sql = new StringBuilder();
        for (Map.Entry<String, String> entry : metadataQuery.entrySet()) {
            String operation;
            String metadataName = this.prefixMetadata(entry.getKey());
            String query = entry.getValue();
            if (query.startsWith("~") || query.startsWith("!~")) {
                sql.append(" AND ").append(String.format(" %s LIKE %s", metadataName, this.quote("%")));
                continue;
            }
            if (query.startsWith("=")) {
                operation = invert ? " != " : " = ";
                sql.append(" AND ").append(metadataName).append(operation).append(this.quote(query.substring(1)));
                continue;
            }
            if (query.startsWith("!=")) {
                operation = invert ? " = " : " != ";
                sql.append(" AND ").append(metadataName).append(operation).append(this.quote(query.substring(2)));
                continue;
            }
            operation = invert ? " != " : " = ";
            sql.append(" AND ").append(metadataName).append(operation).append(this.quote(query));
        }
        return sql.toString();
    }

    private String getDimensionQuerySql(Map<String, String> dimensionsQuery, boolean invert) {
        if (dimensionsQuery.isEmpty()) {
            return "";
        }
        StringBuilder sql = new StringBuilder();
        for (Map.Entry<String, String> entry : dimensionsQuery.entrySet()) {
            String operation;
            String dimensionName = this.prefixDimension(entry.getKey());
            String query = entry.getValue();
            if (query.contains("..")) {
                sql.append(" AND ").append(dimensionName).append(" BETWEEN ").append(Double.valueOf(query.substring(0, query.indexOf("..")))).append(" AND ").append(Double.valueOf(query.substring(query.indexOf("..") + 2)));
                continue;
            }
            if (query.startsWith(">=")) {
                operation = invert ? " < " : " >= ";
                sql.append(" AND ").append(dimensionName).append(operation).append(query.substring(2));
                continue;
            }
            if (query.startsWith("<=")) {
                operation = invert ? " > " : " <= ";
                sql.append(" AND ").append(dimensionName).append(operation).append(query.substring(2));
                continue;
            }
            if (query.startsWith(">")) {
                operation = invert ? " <= " : " > ";
                sql.append(" AND ").append(dimensionName).append(operation).append(query.substring(1));
                continue;
            }
            if (query.startsWith("<")) {
                operation = invert ? " >= " : " < ";
                sql.append(" AND ").append(dimensionName).append(operation).append(query.substring(1));
                continue;
            }
            if (query.startsWith("!=")) {
                operation = invert ? " = " : " != ";
                sql.append(" AND ").append(dimensionName).append(operation).append(query.substring(2));
                continue;
            }
            if (query.startsWith("=")) {
                operation = invert ? " != " : " = ";
                sql.append(" AND ").append(dimensionName).append(operation).append(query.substring(1));
                continue;
            }
            operation = invert ? " != " : " = ";
            sql.append(" AND ").append(dimensionName).append(operation).append(query);
        }
        return sql.toString();
    }

    private String quote(String key) {
        return String.format("'%s'", key);
    }

    private String prefixMetadata(String key) {
        return String.format("s.metadata.\"%s\"", key);
    }

    private String prefixDimension(String key) {
        return String.format("CAST ( s.dimensions.\"%s\" as decimal)", key);
    }

    private String rollover() {
        String rolloverCycleName = String.format("%s.%s", this.cycleNameFormatter.format(System.currentTimeMillis()), UUID.randomUUID().toString().replaceAll("-", ""));
        logger.info("starting new cycle: {}", (Object)rolloverCycleName);
        return this.currentFlushCycleGuid.getAndSet(rolloverCycleName);
    }

    private String getRolloverCycleName() {
        return this.currentFlushCycleGuid.get();
    }

    private String getPath(String rolloverCycleName) {
        return this.bufferDirectory + File.separator + rolloverCycleName;
    }

    private void flush() {
        long startMillis = System.currentTimeMillis();
        try {
            this.rollover();
            File bufferDirectoryFile = new File(this.bufferDirectory);
            if (!(bufferDirectoryFile.exists() && bufferDirectoryFile.canWrite() && bufferDirectoryFile.isDirectory())) {
                logger.error("buffer directory '{}' does not exist or is not writable", (Object)this.bufferDirectory);
                return;
            }
            ArrayList<File> toDelete = new ArrayList<File>();
            for (File toUpload : bufferDirectoryFile.listFiles()) {
                logger.info("uploading buffer directory: {}", (Object)toUpload.getAbsolutePath());
                if (!toUpload.exists() || !toUpload.isDirectory()) {
                    logger.info("nothing to upload");
                    continue;
                }
                this.uploadDirectory(toUpload);
                logger.info("successfully uploaded buffer directory: {}", (Object)toUpload.getAbsolutePath());
                toDelete.add(toUpload);
            }
            for (File file : toDelete) {
                logger.info("deleting buffer file: {}", (Object)file.getAbsolutePath());
                this.delete(file);
            }
        }
        catch (InterruptedException e) {
            logger.warn("flush cycle interrupted; exiting");
            return;
        }
        catch (Exception e) {
            logger.warn("exception during flush", (Throwable)e);
        }
        long endMillis = System.currentTimeMillis();
        long elapsedSeconds = (endMillis - startMillis) / 1000L;
        logger.info("flush cycle elapsed time: {}s", (Object)elapsedSeconds);
        this.executor.schedule(this::flush, Math.max(0L, this.maxFlushIntervalSeconds - elapsedSeconds), TimeUnit.SECONDS);
    }

    private void uploadDirectory(File toUpload) throws InterruptedException {
        MultipleFileUpload upload = this.s3TransferManager.uploadDirectory(this.bucketName, null, toUpload, true, (file, metadata) -> metadata.setContentType("text/plain"));
        do {
            logger.info("s3 transfer progress of '{}': {}% of {}mb", new Object[]{toUpload.getAbsolutePath(), (int)upload.getProgress().getPercentTransferred(), upload.getProgress().getTotalBytesToTransfer() / 0x100000L});
            Thread.sleep(1000L);
        } while (!upload.isDone());
    }

    private void delete(File dir) {
        File[] files = dir.listFiles();
        if (files != null) {
            for (File file : files) {
                this.delete(file);
            }
        }
        dir.delete();
    }
}

