/*
 * Decompiled with CFR 0.152.
 */
package com.salesforce.cantor.s3;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.encoder.PatternLayoutEncoder;
import ch.qos.logback.classic.sift.MDCBasedDiscriminator;
import ch.qos.logback.classic.sift.SiftingAppender;
import ch.qos.logback.core.Appender;
import ch.qos.logback.core.Context;
import ch.qos.logback.core.FileAppender;
import ch.qos.logback.core.encoder.Encoder;
import ch.qos.logback.core.sift.Discriminator;
import ch.qos.logback.core.util.Duration;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.ObjectTagging;
import com.amazonaws.services.s3.model.Tag;
import com.amazonaws.services.s3.transfer.MultipleFileUpload;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.salesforce.cantor.Events;
import com.salesforce.cantor.common.EventsPreconditions;
import com.salesforce.cantor.s3.AbstractBaseS3Namespaceable;
import com.salesforce.cantor.s3.S3Utils;
import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Scanner;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public class EventsOnS3
extends AbstractBaseS3Namespaceable
implements Events {
    private static final Logger logger = LoggerFactory.getLogger(EventsOnS3.class);
    private static final String defaultBufferDirectory = "cantor-events-s3-buffer";
    private static final long defaultFlushIntervalSeconds = 60L;
    private static final long defaultTimeoutSeconds = 60L;
    private static final String dimensionKeyPayloadOffset = ".cantor-payload-offset";
    private static final String dimensionKeyPayloadLength = ".cantor-payload-length";
    private static final String siftingDiscriminatorKey = "path";
    private static final Logger siftingLogger = EventsOnS3.initSiftingLogger();
    private static final String objectKeyPrefix = "cantor-events";
    private static final String directoryFormatterMinPattern = "yyyy/MM/dd/HH/mm";
    private static final String directoryFormatterHourPattern = "yyyy/MM/dd/HH/";
    private static final String cycleNameFormatterPattern = "yyyy-MM-dd_HH-mm-ss";
    private static final Map<String, Object> namespaceLocks = new ConcurrentHashMap<String, Object>();
    private static final ScheduledExecutorService flushExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("cantor-s3-buffer-flusher-%d").build());
    private final Gson parser = new GsonBuilder().create();
    private final AtomicReference<String> currentFlushCycleGuid = new AtomicReference();
    private final TransferManager s3TransferManager;
    private final String bufferDirectory;
    private final LoadingCache<String, AtomicLong> payloadOffset = CacheBuilder.newBuilder().build((CacheLoader)new CacheLoader<String, AtomicLong>(){

        public AtomicLong load(String ignoredPath) {
            return new AtomicLong(0L);
        }
    });

    public EventsOnS3(AmazonS3 s3Client, String bucketName) throws IOException {
        this(s3Client, bucketName, defaultBufferDirectory, 60L);
    }

    public EventsOnS3(AmazonS3 s3Client, String bucketName, String bufferDirectory) throws IOException {
        this(s3Client, bucketName, bufferDirectory, 60L);
    }

    public EventsOnS3(AmazonS3 s3Client, String bucketName, String bufferDirectory, long flushIntervalSeconds) throws IOException {
        super(s3Client, bucketName, "events");
        EventsPreconditions.checkArgument((flushIntervalSeconds > 0L ? 1 : 0) != 0, (String)"invalid flush interval");
        EventsPreconditions.checkString((String)bucketName, (String)"invalid bucket name");
        EventsPreconditions.checkString((String)bufferDirectory, (String)"invalid buffer directory");
        this.bufferDirectory = bufferDirectory;
        TransferManagerBuilder builder = TransferManagerBuilder.standard();
        builder.withS3Client(this.s3Client).withMultipartUploadThreshold(Long.valueOf(0x2000000L)).withMinimumUploadPartSize(Long.valueOf(0x2000000L)).withExecutorFactory(() -> Executors.newFixedThreadPool(32, new ThreadFactoryBuilder().setNameFormat("cantor-s3-event-transfer-manager-worker-%d").build()));
        this.s3TransferManager = builder.build();
        this.rollover();
        flushExecutorService.scheduleWithFixedDelay(this::flush, 0L, flushIntervalSeconds, TimeUnit.SECONDS);
    }

    public void store(String namespace, Collection<Events.Event> batch) throws IOException {
        EventsPreconditions.checkStore((String)namespace, batch);
        this.checkNamespace(namespace);
        try {
            this.doStore(namespace, batch);
        }
        catch (AmazonS3Exception e) {
            logger.warn("exception storing events to namespace: " + namespace, (Throwable)e);
            throw new IOException("exception storing events to namespace: " + namespace, e);
        }
    }

    public List<Events.Event> get(String namespace, long startTimestampMillis, long endTimestampMillis, Map<String, String> metadataQuery, Map<String, String> dimensionsQuery, boolean includePayloads, boolean ascending, int limit) throws IOException {
        EventsPreconditions.checkGet((String)namespace, (long)startTimestampMillis, (long)endTimestampMillis, metadataQuery, dimensionsQuery);
        this.checkNamespace(namespace);
        try {
            return this.doGet(namespace, startTimestampMillis, endTimestampMillis, metadataQuery != null ? metadataQuery : Collections.emptyMap(), dimensionsQuery != null ? dimensionsQuery : Collections.emptyMap(), includePayloads, ascending, limit);
        }
        catch (AmazonS3Exception | InterruptedException e) {
            logger.warn("exception getting events from namespace: " + namespace, e);
            throw new IOException("exception getting events from namespace: " + namespace, e);
        }
    }

    public Set<String> metadata(String namespace, String metadataKey, long startTimestampMillis, long endTimestampMillis, Map<String, String> metadataQuery, Map<String, String> dimensionsQuery) throws IOException {
        EventsPreconditions.checkMetadata((String)namespace, (String)metadataKey, (long)startTimestampMillis, (long)endTimestampMillis, metadataQuery, dimensionsQuery);
        this.checkNamespace(namespace);
        try {
            return this.doMetadata(namespace, metadataKey, startTimestampMillis, endTimestampMillis, metadataQuery != null ? metadataQuery : Collections.emptyMap(), dimensionsQuery != null ? dimensionsQuery : Collections.emptyMap());
        }
        catch (AmazonS3Exception | InterruptedException e) {
            logger.warn("exception getting metadata from namespace: " + namespace, e);
            throw new IOException("exception getting metadata from namespace: " + namespace, e);
        }
    }

    public List<Events.Event> dimension(String namespace, String dimensionKey, long startTimestampMillis, long endTimestampMillis, Map<String, String> metadataQuery, Map<String, String> dimensionsQuery) throws IOException {
        EventsPreconditions.checkDimension((String)namespace, (String)dimensionKey, (long)startTimestampMillis, (long)endTimestampMillis, metadataQuery, dimensionsQuery);
        this.checkNamespace(namespace);
        try {
            return this.doDimension(namespace, dimensionKey, startTimestampMillis, endTimestampMillis, metadataQuery != null ? metadataQuery : Collections.emptyMap(), dimensionsQuery != null ? dimensionsQuery : Collections.emptyMap());
        }
        catch (AmazonS3Exception | InterruptedException e) {
            logger.warn("exception getting dimension from namespace: " + namespace, e);
            throw new IOException("exception getting dimension from namespace: " + namespace, e);
        }
    }

    public void expire(String namespace, long endTimestampMillis) throws IOException {
        EventsPreconditions.checkExpire((String)namespace, (long)endTimestampMillis);
        this.checkNamespace(namespace);
        try {
            this.doExpire(namespace, endTimestampMillis);
        }
        catch (AmazonS3Exception | InterruptedException e) {
            logger.warn("exception expiring events from namespace: " + namespace, e);
            throw new IOException("exception expiring events from namespace: " + namespace, e);
        }
    }

    @Override
    protected String getObjectKeyPrefix(String namespace) {
        return String.format("%s/%s", objectKeyPrefix, EventsOnS3.trim(namespace));
    }

    private static Logger initSiftingLogger() {
        LoggerContext loggerContext = (LoggerContext)LoggerFactory.getILoggerFactory();
        SiftingAppender siftingAppender = new SiftingAppender();
        String loggerName = "cantor-s3-events-sifting-logger";
        siftingAppender.setName("cantor-s3-events-sifting-logger");
        siftingAppender.setContext((Context)loggerContext);
        MDCBasedDiscriminator discriminator = new MDCBasedDiscriminator();
        discriminator.setKey(siftingDiscriminatorKey);
        discriminator.setDefaultValue("unknown");
        discriminator.start();
        siftingAppender.setDiscriminator((Discriminator)discriminator);
        siftingAppender.setTimeout(Duration.buildBySeconds((double)3.0));
        siftingAppender.setAppenderFactory((context, discriminatingValue) -> {
            FileAppender fileAppender = new FileAppender();
            fileAppender.setName("file-" + discriminatingValue);
            fileAppender.setContext(context);
            fileAppender.setFile(discriminatingValue);
            PatternLayoutEncoder patternLayoutEncoder = new PatternLayoutEncoder();
            patternLayoutEncoder.setContext(context);
            patternLayoutEncoder.setPattern("%msg%n");
            patternLayoutEncoder.start();
            fileAppender.setEncoder((Encoder)patternLayoutEncoder);
            fileAppender.start();
            return fileAppender;
        });
        siftingAppender.start();
        ch.qos.logback.classic.Logger logger = loggerContext.getLogger("cantor-s3-events-sifting-logger");
        logger.setAdditive(false);
        logger.setLevel(Level.ALL);
        logger.addAppender((Appender)siftingAppender);
        return logger;
    }

    private void doStore(String namespace, Collection<Events.Event> batch) {
        for (Events.Event event : batch) {
            this.appendEvent(namespace, event);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void appendEvent(String namespace, Events.Event event) {
        SimpleDateFormat directoryFormatterMin = new SimpleDateFormat(directoryFormatterMinPattern);
        HashMap metadata = new HashMap(event.getMetadata());
        HashMap<String, Double> dimensions = new HashMap<String, Double>(event.getDimensions());
        byte[] payload = event.getPayload();
        namespaceLocks.putIfAbsent(namespace, namespace);
        Object object = namespaceLocks.get(namespace);
        synchronized (object) {
            String currentCycleName = this.getRolloverCycleName();
            String cyclePath = this.getPath(currentCycleName);
            String filePath = String.format("%s/%s/%s.%s", cyclePath, this.getObjectKeyPrefix(namespace), directoryFormatterMin.format(event.getTimestampMillis()), currentCycleName);
            String payloadFilePath = filePath + ".b64";
            String eventsFilePath = filePath + ".json";
            if (payload != null && payload.length > 0) {
                String payloadBase64 = Base64.getEncoder().encodeToString(payload);
                this.append(payloadFilePath, payloadBase64);
                long offset = ((AtomicLong)this.payloadOffset.getUnchecked((Object)payloadFilePath)).getAndAdd(payloadBase64.length() + 1);
                dimensions.put(dimensionKeyPayloadOffset, Double.valueOf(offset));
                dimensions.put(dimensionKeyPayloadLength, Double.valueOf(payloadBase64.length()));
            }
            Events.Event toWrite = new Events.Event(event.getTimestampMillis(), metadata, dimensions);
            this.append(eventsFilePath, this.parser.toJson((Object)toWrite));
        }
    }

    private synchronized void append(String path, String message) {
        MDC.put((String)siftingDiscriminatorKey, (String)path);
        siftingLogger.info(message);
        MDC.remove((String)siftingDiscriminatorKey);
    }

    private List<Events.Event> doGet(String namespace, long startTimestampMillis, long endTimestampMillis, Map<String, String> metadataQuery, Map<String, String> dimensionsQuery, boolean includePayloads, boolean ascending, int limit) throws IOException, InterruptedException {
        final CopyOnWriteArrayList<Events.Event> results = new CopyOnWriteArrayList<Events.Event>();
        ListeningExecutorService executorService = EventsOnS3.newListeningExecutor("cantor-events-s3-get-%d");
        final AtomicBoolean futureHasFailed = new AtomicBoolean(false);
        for (String objectKey : this.getMatchingKeys(namespace, startTimestampMillis, endTimestampMillis)) {
            if (!objectKey.endsWith("json")) continue;
            ListenableFuture future = executorService.submit(() -> this.doGetOnObject(objectKey, startTimestampMillis, endTimestampMillis, metadataQuery, dimensionsQuery, includePayloads));
            FutureCallback<List<Events.Event>> callback = new FutureCallback<List<Events.Event>>(){

                public void onSuccess(List<Events.Event> events) {
                    results.addAll(events);
                }

                public void onFailure(Throwable e) {
                    futureHasFailed.set(true);
                    logger.warn("exception on get call to s3: {}", (Object)e.getMessage(), (Object)e);
                }
            };
            Futures.addCallback((ListenableFuture)future, (FutureCallback)callback, (Executor)MoreExecutors.directExecutor());
        }
        EventsOnS3.awaitTermination((ExecutorService)executorService);
        if (futureHasFailed.get()) {
            throw new IOException("exception on get call to s3");
        }
        this.sortEventsByTimestamp(results, ascending);
        if (limit > 0) {
            return results.subList(0, Math.min(limit, results.size()));
        }
        return results;
    }

    private Set<String> doMetadata(String namespace, String metadataKey, long startTimestampMillis, long endTimestampMillis, Map<String, String> metadataQuery, Map<String, String> dimensionsQuery) throws IOException, InterruptedException {
        final CopyOnWriteArraySet<String> results = new CopyOnWriteArraySet<String>();
        ListeningExecutorService executorService = EventsOnS3.newListeningExecutor("cantor-events-s3-metadata-%d");
        final AtomicBoolean futureHasFailed = new AtomicBoolean(false);
        for (String objectKey : this.getMatchingKeys(namespace, startTimestampMillis, endTimestampMillis)) {
            if (!objectKey.endsWith("json")) continue;
            ListenableFuture future = executorService.submit(() -> this.doMetadataOnObject(objectKey, metadataKey, startTimestampMillis, endTimestampMillis, metadataQuery, dimensionsQuery));
            FutureCallback<Set<String>> callback = new FutureCallback<Set<String>>(){

                public void onSuccess(Set<String> metadata) {
                    results.addAll(metadata);
                }

                public void onFailure(Throwable e) {
                    futureHasFailed.set(true);
                    logger.warn("exception on metadata call to s3", e);
                }
            };
            Futures.addCallback((ListenableFuture)future, (FutureCallback)callback, (Executor)MoreExecutors.directExecutor());
        }
        EventsOnS3.awaitTermination((ExecutorService)executorService);
        if (futureHasFailed.get()) {
            throw new IOException("exception on metadata call to s3");
        }
        return results;
    }

    private List<Events.Event> doDimension(String namespace, String dimensionKey, long startTimestampMillis, long endTimestampMillis, Map<String, String> metadataQuery, Map<String, String> dimensionsQuery) throws IOException, InterruptedException {
        final CopyOnWriteArrayList<Events.Event> results = new CopyOnWriteArrayList<Events.Event>();
        ListeningExecutorService executorService = EventsOnS3.newListeningExecutor("cantor-events-s3-dimension-%d");
        final AtomicBoolean futureHasFailed = new AtomicBoolean(false);
        for (String objectKey : this.getMatchingKeys(namespace, startTimestampMillis, endTimestampMillis)) {
            if (!objectKey.endsWith("json")) continue;
            ListenableFuture future = executorService.submit(() -> this.doDimensionOnObject(objectKey, dimensionKey, startTimestampMillis, endTimestampMillis, metadataQuery, dimensionsQuery));
            FutureCallback<List<Events.Event>> callback = new FutureCallback<List<Events.Event>>(){

                public void onSuccess(List<Events.Event> events) {
                    results.addAll(events);
                }

                public void onFailure(Throwable e) {
                    futureHasFailed.set(true);
                    logger.warn("exception on dimension call to s3", e);
                }
            };
            Futures.addCallback((ListenableFuture)future, (FutureCallback)callback, (Executor)MoreExecutors.directExecutor());
        }
        EventsOnS3.awaitTermination((ExecutorService)executorService);
        if (futureHasFailed.get()) {
            throw new IOException("exception on get call to s3");
        }
        return results;
    }

    private void sortEventsByTimestamp(List<Events.Event> events, boolean ascending) {
        events.sort((event1, event2) -> {
            if (event1.getTimestampMillis() < event2.getTimestampMillis()) {
                return ascending ? -1 : 1;
            }
            if (event1.getTimestampMillis() > event2.getTimestampMillis()) {
                return ascending ? 1 : -1;
            }
            return 0;
        });
    }

    private List<Events.Event> doGetOnObject(String objectKey, long startTimestampMillis, long endTimestampMillis, Map<String, String> metadataQuery, Map<String, String> dimensionsQuery, boolean includePayloads) throws IOException {
        ArrayList<Events.Event> results = new ArrayList<Events.Event>();
        String query = this.generateGetQuery(startTimestampMillis, endTimestampMillis, metadataQuery, dimensionsQuery);
        try (Scanner lineReader = new Scanner(S3Utils.S3Select.queryObjectJson(this.s3Client, this.bucketName, objectKey, query));){
            while (lineReader.hasNext()) {
                Events.Event event = (Events.Event)this.parser.fromJson(lineReader.nextLine(), Events.Event.class);
                if (includePayloads && event.getDimensions().containsKey(dimensionKeyPayloadOffset) && event.getDimensions().containsKey(dimensionKeyPayloadLength)) {
                    long offset = ((Double)event.getDimensions().get(dimensionKeyPayloadOffset)).longValue();
                    long length = ((Double)event.getDimensions().get(dimensionKeyPayloadLength)).longValue();
                    String payloadFilename = objectKey.substring(0, objectKey.lastIndexOf("json")) + "b64";
                    byte[] payloadBase64Bytes = S3Utils.getObjectBytes(this.s3Client, this.bucketName, payloadFilename, offset, offset + length - 1L);
                    if (payloadBase64Bytes.length == 0) {
                        throw new IOException("failed to retrieve payload for event");
                    }
                    byte[] payload = Base64.getDecoder().decode(new String(payloadBase64Bytes));
                    Events.Event eventWithPayload = new Events.Event(event.getTimestampMillis(), event.getMetadata(), event.getDimensions(), payload);
                    results.add(eventWithPayload);
                    continue;
                }
                results.add(event);
            }
        }
        return results;
    }

    private Set<String> doMetadataOnObject(String objectKey, String metadataKey, long startTimestampMillis, long endTimestampMillis, Map<String, String> metadataQuery, Map<String, String> dimensionsQuery) throws IOException {
        HashSet<String> results = new HashSet<String>();
        String query = this.generateMetadataQuery(metadataKey, startTimestampMillis, endTimestampMillis, metadataQuery, dimensionsQuery);
        try (Scanner lineReader = new Scanner(S3Utils.S3Select.queryObjectJson(this.s3Client, this.bucketName, objectKey, query));){
            while (lineReader.hasNext()) {
                Map metadata = (Map)this.parser.fromJson(lineReader.nextLine(), Map.class);
                if (!metadata.containsKey(metadataKey)) continue;
                results.add((String)metadata.get(metadataKey));
            }
        }
        return results;
    }

    private List<Events.Event> doDimensionOnObject(String objectKey, String dimensionKey, long startTimestampMillis, long endTimestampMillis, Map<String, String> metadataQuery, Map<String, String> dimensionsQuery) throws IOException {
        ArrayList<Events.Event> results = new ArrayList<Events.Event>();
        String query = this.generateDimensionQuery(dimensionKey, startTimestampMillis, endTimestampMillis, metadataQuery, dimensionsQuery);
        try (Scanner lineReader = new Scanner(S3Utils.S3Select.queryObjectJson(this.s3Client, this.bucketName, objectKey, query));){
            while (lineReader.hasNext()) {
                Map parsedJson = (Map)this.parser.fromJson(lineReader.nextLine(), Map.class);
                long timestamp = ((Double)parsedJson.get("timestampMillis")).longValue();
                Map dimensions = Collections.singletonMap(dimensionKey, parsedJson.get(dimensionKey));
                results.add(new Events.Event(timestamp, Collections.emptyMap(), dimensions));
            }
        }
        return results;
    }

    private void doExpire(String namespace, long endTimestampMillis) throws IOException, InterruptedException {
        logger.info("expiring namespace '{}' with end timestamp of '{}'", (Object)namespace, (Object)endTimestampMillis);
        Set<String> keys = this.getMatchingKeys(namespace, 0L, endTimestampMillis);
        logger.info("expiring objects: {}", keys);
        S3Utils.deleteObjects(this.s3Client, this.bucketName, keys);
    }

    private String generateGetQuery(long startTimestampMillis, long endTmestampMillis, Map<String, String> metadataQuery, Map<String, String> dimensionsQuery) {
        String timestampClause = String.format("s.timestampMillis >= %d AND s.timestampMillis <= %d", startTimestampMillis, endTmestampMillis);
        return String.format("SELECT * FROM s3object[*] s WHERE %s %s %s", timestampClause, this.getMetadataQuerySql(metadataQuery), this.getDimensionsQuerySql(dimensionsQuery));
    }

    private String generateMetadataQuery(String metadataKey, long startTimestampMillis, long endTmestampMillis, Map<String, String> metadataQuery, Map<String, String> dimensionsQuery) {
        String timestampClause = String.format("s.timestampMillis >= %d AND s.timestampMillis <= %d", startTimestampMillis, endTmestampMillis);
        return String.format("SELECT s.metadata.\"%s\" FROM s3object[*] s WHERE %s %s %s", metadataKey, timestampClause, this.getMetadataQuerySql(metadataQuery), this.getDimensionsQuerySql(dimensionsQuery));
    }

    private String generateDimensionQuery(String dimensionKey, long startTimestampMillis, long endTmestampMillis, Map<String, String> metadataQuery, Map<String, String> dimensionsQuery) {
        String timestampClause = String.format("s.timestampMillis >= %d AND s.timestampMillis <= %d", startTimestampMillis, endTmestampMillis);
        return String.format("SELECT s.timestampMillis, s.dimensions.\"%s\" FROM s3object[*] s WHERE %s %s %s", dimensionKey, timestampClause, this.getMetadataQuerySql(metadataQuery), this.getDimensionsQuerySql(dimensionsQuery));
    }

    private Set<String> getMatchingKeys(String namespace, long startTimestampMillis, long endTimestampMillis) throws IOException {
        SimpleDateFormat directoryFormatterMin = new SimpleDateFormat(directoryFormatterMinPattern);
        SimpleDateFormat directoryFormatterHour = new SimpleDateFormat(directoryFormatterHourPattern);
        HashSet<String> prefixes = new HashSet<String>();
        long start = startTimestampMillis;
        while (start <= endTimestampMillis) {
            if (start + TimeUnit.HOURS.toMillis(1L) <= endTimestampMillis) {
                prefixes.add(String.format("%s/%s", this.getObjectKeyPrefix(namespace), directoryFormatterHour.format(start)));
                start = start / 3600000L * 3600000L;
                start += TimeUnit.HOURS.toMillis(1L);
                continue;
            }
            prefixes.add(String.format("%s/%s", this.getObjectKeyPrefix(namespace), directoryFormatterMin.format(start)));
            start += TimeUnit.MINUTES.toMillis(1L);
        }
        prefixes.add(String.format("%s/%s", this.getObjectKeyPrefix(namespace), directoryFormatterMin.format(endTimestampMillis)));
        final ConcurrentSkipListSet<String> matchingKeys = new ConcurrentSkipListSet<String>();
        ListeningExecutorService executorService = EventsOnS3.newListeningExecutor("cantor-events-s3-get-matching-keys-%d");
        final AtomicBoolean futureHasFailed = new AtomicBoolean(false);
        for (String prefix : prefixes) {
            ListenableFuture future = executorService.submit(() -> S3Utils.getKeys(this.s3Client, this.bucketName, prefix));
            FutureCallback<Collection<String>> callback = new FutureCallback<Collection<String>>(){

                public void onSuccess(Collection<String> keys) {
                    matchingKeys.addAll(keys);
                }

                public void onFailure(Throwable e) {
                    futureHasFailed.set(true);
                    logger.warn("exception on getting object keys from s3", e);
                }
            };
            Futures.addCallback((ListenableFuture)future, (FutureCallback)callback, (Executor)MoreExecutors.directExecutor());
        }
        EventsOnS3.awaitTermination((ExecutorService)executorService);
        if (futureHasFailed.get()) {
            throw new IOException("exception on getting object keys from s3");
        }
        return matchingKeys;
    }

    private String getMetadataQuerySql(Map<String, String> metadataQuery) {
        if (metadataQuery.isEmpty()) {
            return "";
        }
        StringBuilder sql = new StringBuilder();
        for (Map.Entry<String, String> entry : metadataQuery.entrySet()) {
            String metadataName = this.prefixMetadata(entry.getKey());
            String query = entry.getValue();
            if (query.startsWith("~")) {
                sql.append(" AND ").append(metadataName).append(" LIKE ").append(this.quote(this.regexToSql(query.substring(1))));
                continue;
            }
            if (query.startsWith("!~")) {
                sql.append(" AND ").append(metadataName).append(" NOT LIKE ").append(this.quote(this.regexToSql(query.substring(2))));
                continue;
            }
            if (query.startsWith("=")) {
                sql.append(" AND ").append(metadataName).append("=").append(this.quote(query.substring(1)));
                continue;
            }
            if (query.startsWith("!=")) {
                sql.append(" AND ").append(metadataName).append("!=").append(this.quote(query.substring(2)));
                continue;
            }
            sql.append(" AND ").append(metadataName).append("=").append(this.quote(query));
        }
        return sql.toString();
    }

    private String regexToSql(String regex) {
        return regex.replace("*", "%").replace("_", "\\\\_");
    }

    private String getDimensionsQuerySql(Map<String, String> dimensionsQuery) {
        if (dimensionsQuery.isEmpty()) {
            return "";
        }
        StringBuilder sql = new StringBuilder();
        for (Map.Entry<String, String> entry : dimensionsQuery.entrySet()) {
            String dimensionName = this.prefixDimension(entry.getKey());
            String query = entry.getValue();
            if (query.contains("..")) {
                sql.append(" AND ").append(dimensionName).append(" BETWEEN ").append(Double.valueOf(query.substring(0, query.indexOf("..")))).append(" AND ").append(Double.valueOf(query.substring(query.indexOf("..") + 2)));
                continue;
            }
            if (query.startsWith(">=")) {
                sql.append(" AND ").append(dimensionName).append(">=").append(query.substring(2));
                continue;
            }
            if (query.startsWith("<=")) {
                sql.append(" AND ").append(dimensionName).append("<=").append(query.substring(2));
                continue;
            }
            if (query.startsWith(">")) {
                sql.append(" AND ").append(dimensionName).append(">").append(query.substring(1));
                continue;
            }
            if (query.startsWith("<")) {
                sql.append(" AND ").append(dimensionName).append("<").append(query.substring(1));
                continue;
            }
            if (query.startsWith("!=")) {
                sql.append(" AND ").append(dimensionName).append("!=").append(query.substring(2));
                continue;
            }
            if (query.startsWith("=")) {
                sql.append(" AND ").append(dimensionName).append("=").append(query.substring(1));
                continue;
            }
            sql.append(" AND ").append(dimensionName).append("=").append(query);
        }
        return sql.toString();
    }

    private String quote(String key) {
        return String.format("'%s'", key);
    }

    private String prefixMetadata(String key) {
        return String.format("s.metadata.\"%s\"", key);
    }

    private String prefixDimension(String key) {
        return String.format("CAST ( s.dimensions.\"%s\" as decimal)", key);
    }

    private void rollover() {
        SimpleDateFormat cycleNameFormatter = new SimpleDateFormat(cycleNameFormatterPattern);
        String rolloverCycleName = String.format("%s.%s", cycleNameFormatter.format(System.currentTimeMillis()), UUID.randomUUID().toString().replaceAll("-", ""));
        logger.info("starting new cycle: {}", (Object)rolloverCycleName);
        this.setRolloverCycleName(rolloverCycleName);
    }

    private String getRolloverCycleName() {
        return this.currentFlushCycleGuid.get();
    }

    private void setRolloverCycleName(String name) {
        this.currentFlushCycleGuid.set(name);
    }

    private String getPath(String rolloverCycleName) {
        return this.bufferDirectory + File.separator + rolloverCycleName;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void flush() {
        long startMillis = System.currentTimeMillis();
        try {
            this.rollover();
            Thread.sleep(3000L);
            File bufferDirectoryFile = new File(this.bufferDirectory);
            if (!(bufferDirectoryFile.exists() && bufferDirectoryFile.canWrite() && bufferDirectoryFile.isDirectory())) {
                logger.info("buffer directory '{}' does not exist or is not writable", (Object)this.bufferDirectory);
                return;
            }
            ArrayList<File> toDelete = new ArrayList<File>();
            for (File toUpload : bufferDirectoryFile.listFiles()) {
                logger.info("uploading buffer directory: {}", (Object)toUpload.getAbsolutePath());
                if (!toUpload.exists() || !toUpload.isDirectory()) {
                    logger.info("nothing to upload");
                    continue;
                }
                if (toUpload.getName().contains(this.getRolloverCycleName())) continue;
                this.uploadDirectory(toUpload);
                logger.info("successfully uploaded buffer directory: {}", (Object)toUpload.getAbsolutePath());
                toDelete.add(toUpload);
            }
            for (File file : toDelete) {
                logger.info("deleting buffer file: {}", (Object)file.getAbsolutePath());
                this.delete(file);
            }
        }
        catch (InterruptedException e) {
            logger.warn("flush cycle interrupted; exiting");
        }
        catch (Exception e) {
            logger.warn("exception during flush", (Throwable)e);
        }
        finally {
            long endMillis = System.currentTimeMillis();
            long elapsedSeconds = (endMillis - startMillis) / 1000L;
            logger.info("flush cycle elapsed time: {}s", (Object)elapsedSeconds);
        }
    }

    private void uploadDirectory(File toUpload) throws InterruptedException {
        MultipleFileUpload upload = this.s3TransferManager.uploadDirectory(this.bucketName, null, toUpload, true, (file, metadata) -> metadata.setContentType("text/plain"), uploadContext -> {
            String key = uploadContext.getKey();
            String tag = key.substring(key.indexOf("/") + 1);
            return new ObjectTagging(Collections.singletonList(new Tag("namespace", tag.substring(0, tag.indexOf("/")))));
        }, file -> CannedAccessControlList.BucketOwnerFullControl);
        do {
            logger.info("s3 transfer progress of '{}': {}% of {}mb state: {}", new Object[]{toUpload.getAbsolutePath(), (int)upload.getProgress().getPercentTransferred(), upload.getProgress().getTotalBytesToTransfer() / 0x100000L, upload.getState()});
            Thread.sleep(1000L);
        } while (!upload.isDone());
        upload.waitForCompletion();
    }

    private void delete(File dir) {
        File[] files = dir.listFiles();
        if (files != null) {
            for (File file : files) {
                this.delete(file);
            }
        }
        dir.delete();
    }

    private static ListeningExecutorService newListeningExecutor(String nameFormat) {
        return MoreExecutors.listeningDecorator((ExecutorService)Executors.newFixedThreadPool(32, new ThreadFactoryBuilder().setNameFormat(nameFormat).build()));
    }

    private static void awaitTermination(ExecutorService executor) throws IOException {
        executor.shutdown();
        try {
            executor.awaitTermination(60L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            throw new IOException(e);
        }
    }
}

