package com.salesforce.cantor.s3;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.encoder.PatternLayoutEncoder;
import ch.qos.logback.classic.pattern.CallerDataConverter;
import ch.qos.logback.classic.sift.MDCBasedDiscriminator;
import ch.qos.logback.classic.sift.SiftingAppender;
import ch.qos.logback.core.CoreConstants;
import ch.qos.logback.core.FileAppender;
import ch.qos.logback.core.util.Duration;
import ch.qos.logback.core.util.FileSize;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.ObjectTagging;
import com.amazonaws.services.s3.model.Tag;
import com.amazonaws.services.s3.transfer.MultipleFileUpload;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.Weigher;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.salesforce.cantor.Events;
import com.salesforce.cantor.common.EventsPreconditions;
import com.salesforce.cantor.s3.S3Utils;
import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Scanner;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.h2.engine.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:com/salesforce/cantor/s3/EventsOnS3.class */
public class EventsOnS3 extends AbstractBaseS3Namespaceable implements Events {
    private static final String defaultBufferDirectory = "cantor-events-s3-buffer";
    private static final long defaultFlushIntervalSeconds = 60;
    private static final long defaultTimeoutSeconds = 30;
    private static final String dimensionKeyPayloadOffset = ".cantor-payload-offset";
    private static final String dimensionKeyPayloadLength = ".cantor-payload-length";
    private static final String siftingDiscriminatorKey = "path";
    private static final String objectKeyPrefix = "cantor-events";
    private static final String directoryFormatterMinPattern = "yyyy/MM/dd/HH/mm";
    private static final String directoryFormatterHourPattern = "yyyy/MM/dd/HH/";
    private static final String cycleNameFormatterPattern = "yyyy-MM-dd_HH-mm-ss";
    private final Gson parser;
    private final AtomicReference<String> currentFlushCycleGuid;
    private final TransferManager s3TransferManager;
    private final String bufferDirectory;
    private LoadingCache<String, AtomicLong> payloadOffset;
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) EventsOnS3.class);
    private static final Logger siftingLogger = initSiftingLogger();
    private static final Map<String, Object> namespaceLocks = new ConcurrentHashMap();
    private static final Cache<String, List<Events.Event>> cache = CacheBuilder.newBuilder().maximumWeight(1073741824).weigher(new ObjectWeigher()).build();
    private static final Cache<String, Set<String>> keysCache = CacheBuilder.newBuilder().maximumSize(FileSize.KB_COEFFICIENT).build();

    /* loaded from: input_file:com/salesforce/cantor/s3/EventsOnS3$ObjectWeigher.class */
    private static class ObjectWeigher implements Weigher<String, List<Events.Event>> {
        private ObjectWeigher() {
        }

        @Override // com.google.common.cache.Weigher
        public int weigh(String str, List<Events.Event> list) {
            int i = 0;
            Iterator<Events.Event> it = list.iterator();
            while (it.hasNext()) {
                i += it.next().toString().length();
            }
            return i;
        }
    }

    public EventsOnS3(AmazonS3 amazonS3, String str) throws IOException {
        this(amazonS3, str, defaultBufferDirectory, defaultFlushIntervalSeconds);
    }

    public EventsOnS3(AmazonS3 amazonS3, String str, String str2) throws IOException {
        this(amazonS3, str, str2, defaultFlushIntervalSeconds);
    }

    public EventsOnS3(AmazonS3 amazonS3, String str, String str2, long j) throws IOException {
        super(amazonS3, str, "events");
        this.parser = new GsonBuilder().create();
        this.currentFlushCycleGuid = new AtomicReference<>();
        this.payloadOffset = CacheBuilder.newBuilder().build(new CacheLoader<String, AtomicLong>() { // from class: com.salesforce.cantor.s3.EventsOnS3.1
            @Override // com.google.common.cache.CacheLoader
            public AtomicLong load(String str3) {
                return new AtomicLong(0L);
            }
        });
        EventsPreconditions.checkArgument(j > 0, "invalid flush interval");
        EventsPreconditions.checkString(str, "invalid bucket name");
        EventsPreconditions.checkString(str2, "invalid buffer directory");
        this.bufferDirectory = str2;
        TransferManagerBuilder standard = TransferManagerBuilder.standard();
        standard.setS3Client(this.s3Client);
        this.s3TransferManager = standard.build();
        rollover();
        Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("cantor-s3-buffer-flusher").build()).scheduleAtFixedRate(this::flush, 0L, j, TimeUnit.SECONDS);
    }

    @Override // com.salesforce.cantor.Events
    public void store(String str, Collection<Events.Event> collection) throws IOException {
        EventsPreconditions.checkStore(str, collection);
        checkNamespace(str);
        try {
            doStore(str, collection);
        } catch (AmazonS3Exception e) {
            logger.warn("exception storing events to namespace: " + str, (Throwable) e);
            throw new IOException("exception storing events to namespace: " + str, e);
        }
    }

    @Override // com.salesforce.cantor.Events
    public List<Events.Event> get(String str, long j, long j2, Map<String, String> map, Map<String, String> map2, boolean z, boolean z2, int i) throws IOException {
        Map<String, String> emptyMap;
        EventsPreconditions.checkGet(str, j, j2, map, map2);
        checkNamespace(str);
        if (map != null) {
            emptyMap = map;
        } else {
            try {
                emptyMap = Collections.emptyMap();
            } catch (AmazonS3Exception | InterruptedException e) {
                logger.warn("exception getting events from namespace: " + str, e);
                throw new IOException("exception getting events from namespace: " + str, e);
            }
        }
        return doGet(str, j, j2, emptyMap, map2 != null ? map2 : Collections.emptyMap(), z, z2, i);
    }

    @Override // com.salesforce.cantor.Events
    public Set<String> metadata(String str, String str2, long j, long j2, Map<String, String> map, Map<String, String> map2) throws IOException {
        Map<String, String> emptyMap;
        EventsPreconditions.checkMetadata(str, str2, j, j2, map, map2);
        checkNamespace(str);
        if (map != null) {
            emptyMap = map;
        } else {
            try {
                emptyMap = Collections.emptyMap();
            } catch (AmazonS3Exception | InterruptedException e) {
                logger.warn("exception getting metadata from namespace: " + str, e);
                throw new IOException("exception getting metadata from namespace: " + str, e);
            }
        }
        return doMetadata(str, str2, j, j2, emptyMap, map2 != null ? map2 : Collections.emptyMap());
    }

    @Override // com.salesforce.cantor.Events
    public List<Events.Event> dimension(String str, String str2, long j, long j2, Map<String, String> map, Map<String, String> map2) throws IOException {
        Map<String, String> emptyMap;
        EventsPreconditions.checkDimension(str, str2, j, j2, map, map2);
        checkNamespace(str);
        if (map != null) {
            emptyMap = map;
        } else {
            try {
                emptyMap = Collections.emptyMap();
            } catch (AmazonS3Exception | InterruptedException e) {
                logger.warn("exception getting dimension from namespace: " + str, e);
                throw new IOException("exception getting dimension from namespace: " + str, e);
            }
        }
        return doDimension(str, str2, j, j2, emptyMap, map2 != null ? map2 : Collections.emptyMap());
    }

    @Override // com.salesforce.cantor.Events
    public void expire(String str, long j) throws IOException {
        EventsPreconditions.checkExpire(str, j);
        checkNamespace(str);
        try {
            doExpire(str, j);
        } catch (AmazonS3Exception | InterruptedException e) {
            logger.warn("exception expiring events from namespace: " + str, e);
            throw new IOException("exception expiring events from namespace: " + str, e);
        }
    }

    @Override // com.salesforce.cantor.s3.AbstractBaseS3Namespaceable
    protected String getObjectKeyPrefix(String str) {
        return String.format("%s/%s", objectKeyPrefix, trim(str));
    }

    private static Logger initSiftingLogger() {
        LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
        SiftingAppender siftingAppender = new SiftingAppender();
        siftingAppender.setName("cantor-s3-events-sifting-logger");
        siftingAppender.setContext(loggerContext);
        MDCBasedDiscriminator mDCBasedDiscriminator = new MDCBasedDiscriminator();
        mDCBasedDiscriminator.setKey("path");
        mDCBasedDiscriminator.setDefaultValue("unknown");
        mDCBasedDiscriminator.start();
        siftingAppender.setDiscriminator(mDCBasedDiscriminator);
        siftingAppender.setTimeout(Duration.buildBySeconds(3.0d));
        siftingAppender.setAppenderFactory((context, str) -> {
            FileAppender fileAppender = new FileAppender();
            fileAppender.setName("file-" + str);
            fileAppender.setContext(context);
            fileAppender.setFile(str);
            PatternLayoutEncoder patternLayoutEncoder = new PatternLayoutEncoder();
            patternLayoutEncoder.setContext(context);
            patternLayoutEncoder.setPattern("%msg%n");
            patternLayoutEncoder.start();
            fileAppender.setEncoder(patternLayoutEncoder);
            fileAppender.start();
            return fileAppender;
        });
        siftingAppender.start();
        ch.qos.logback.classic.Logger logger2 = loggerContext.getLogger("cantor-s3-events-sifting-logger");
        logger2.setAdditive(false);
        logger2.setLevel(Level.ALL);
        logger2.addAppender(siftingAppender);
        return logger2;
    }

    private void doStore(String str, Collection<Events.Event> collection) {
        Iterator<Events.Event> it = collection.iterator();
        while (it.hasNext()) {
            appendEvent(str, it.next());
        }
    }

    private void appendEvent(String str, Events.Event event) {
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat(directoryFormatterMinPattern);
        HashMap hashMap = new HashMap(event.getMetadata());
        HashMap hashMap2 = new HashMap(event.getDimensions());
        byte[] payload = event.getPayload();
        String rolloverCycleName = getRolloverCycleName();
        String format = String.format("%s/%s/%s.%s", getPath(rolloverCycleName), getObjectKeyPrefix(str), simpleDateFormat.format(Long.valueOf(event.getTimestampMillis())), rolloverCycleName);
        String str2 = format + ".b64";
        String str3 = format + ".json";
        namespaceLocks.putIfAbsent(str, str);
        synchronized (namespaceLocks.get(str)) {
            if (payload != null) {
                if (payload.length > 0) {
                    append(str2, Base64.getEncoder().encodeToString(payload));
                    hashMap2.put(dimensionKeyPayloadOffset, Double.valueOf(this.payloadOffset.getUnchecked(str2).getAndAdd(r0.length() + 1)));
                    hashMap2.put(dimensionKeyPayloadLength, Double.valueOf(r0.length()));
                }
            }
            append(str3, this.parser.toJson(new Events.Event(event.getTimestampMillis(), hashMap, hashMap2)));
        }
    }

    private synchronized void append(String str, String str2) {
        MDC.put("path", str);
        siftingLogger.info(str2);
        MDC.remove("path");
    }

    private List<Events.Event> doGet(String str, long j, long j2, Map<String, String> map, Map<String, String> map2, boolean z, boolean z2, int i) throws IOException, InterruptedException {
        final CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        ListeningExecutorService newListeningExecutor = newListeningExecutor("cantor-events-s3-get-%d");
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        for (String str2 : getMatchingKeys(str, j, j2)) {
            if (str2.endsWith("json")) {
                Futures.addCallback(newListeningExecutor.submit(() -> {
                    return doCacheableGetOnObject(str2, j, j2, map, map2, z);
                }), new FutureCallback<List<Events.Event>>() { // from class: com.salesforce.cantor.s3.EventsOnS3.2
                    @Override // com.google.common.util.concurrent.FutureCallback
                    public void onSuccess(List<Events.Event> list) {
                        copyOnWriteArrayList.addAll(list);
                    }

                    @Override // com.google.common.util.concurrent.FutureCallback
                    public void onFailure(Throwable th) {
                        atomicBoolean.set(true);
                        EventsOnS3.logger.warn("exception on get call to s3", th);
                    }
                }, MoreExecutors.directExecutor());
            }
        }
        awaitTermination(newListeningExecutor);
        if (atomicBoolean.get()) {
            throw new IOException("exception on get call to s3");
        }
        sortEventsByTimestamp(copyOnWriteArrayList, z2);
        return i > 0 ? copyOnWriteArrayList.subList(0, Math.min(i, copyOnWriteArrayList.size())) : copyOnWriteArrayList;
    }

    private Set<String> doMetadata(String str, String str2, long j, long j2, Map<String, String> map, Map<String, String> map2) throws IOException, InterruptedException {
        final CopyOnWriteArraySet copyOnWriteArraySet = new CopyOnWriteArraySet();
        ListeningExecutorService newListeningExecutor = newListeningExecutor("cantor-events-s3-metadata-%d");
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        for (String str3 : getMatchingKeys(str, j, j2)) {
            if (str3.endsWith("json")) {
                Futures.addCallback(newListeningExecutor.submit(() -> {
                    return doMetadataOnObject(str3, str2, j, j2, map, map2);
                }), new FutureCallback<Set<String>>() { // from class: com.salesforce.cantor.s3.EventsOnS3.3
                    @Override // com.google.common.util.concurrent.FutureCallback
                    public void onSuccess(Set<String> set) {
                        copyOnWriteArraySet.addAll(set);
                    }

                    @Override // com.google.common.util.concurrent.FutureCallback
                    public void onFailure(Throwable th) {
                        atomicBoolean.set(true);
                        EventsOnS3.logger.warn("exception on metadata call to s3", th);
                    }
                }, MoreExecutors.directExecutor());
            }
        }
        awaitTermination(newListeningExecutor);
        if (atomicBoolean.get()) {
            throw new IOException("exception on metadata call to s3");
        }
        return copyOnWriteArraySet;
    }

    private List<Events.Event> doDimension(String str, String str2, long j, long j2, Map<String, String> map, Map<String, String> map2) throws IOException, InterruptedException {
        final CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        ListeningExecutorService newListeningExecutor = newListeningExecutor("cantor-events-s3-dimension-%d");
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        for (String str3 : getMatchingKeys(str, j, j2)) {
            if (str3.endsWith("json")) {
                Futures.addCallback(newListeningExecutor.submit(() -> {
                    return doDimensionOnObject(str3, str2, j, j2, map, map2);
                }), new FutureCallback<List<Events.Event>>() { // from class: com.salesforce.cantor.s3.EventsOnS3.4
                    @Override // com.google.common.util.concurrent.FutureCallback
                    public void onSuccess(List<Events.Event> list) {
                        copyOnWriteArrayList.addAll(list);
                    }

                    @Override // com.google.common.util.concurrent.FutureCallback
                    public void onFailure(Throwable th) {
                        atomicBoolean.set(true);
                        EventsOnS3.logger.warn("exception on dimension call to s3", th);
                    }
                }, MoreExecutors.directExecutor());
            }
        }
        awaitTermination(newListeningExecutor);
        if (atomicBoolean.get()) {
            throw new IOException("exception on get call to s3");
        }
        return copyOnWriteArrayList;
    }

    private void sortEventsByTimestamp(List<Events.Event> list, boolean z) {
        list.sort((event, event2) -> {
            if (event.getTimestampMillis() < event2.getTimestampMillis()) {
                return z ? -1 : 1;
            }
            if (event.getTimestampMillis() > event2.getTimestampMillis()) {
                return z ? 1 : -1;
            }
            return 0;
        });
    }

    private List<Events.Event> doCacheableGetOnObject(String str, long j, long j2, Map<String, String> map, Map<String, String> map2, boolean z) throws IOException {
        try {
            return cache.get(String.format("%s-%d-%d-%d-%d-%b", Integer.valueOf(str.hashCode()), Long.valueOf(j), Long.valueOf(j2), Integer.valueOf(map.hashCode()), Integer.valueOf(map2.hashCode()), Boolean.valueOf(z)), () -> {
                return doGetOnObject(str, j, j2, map, map2, z);
            });
        } catch (ExecutionException e) {
            return doGetOnObject(str, j, j2, map, map2, z);
        }
    }

    private List<Events.Event> doGetOnObject(String str, long j, long j2, Map<String, String> map, Map<String, String> map2, boolean z) throws IOException {
        ArrayList arrayList = new ArrayList();
        Scanner scanner = new Scanner(S3Utils.S3Select.queryObjectJson(this.s3Client, this.bucketName, str, generateGetQuery(j, j2, map, map2)));
        Throwable th = null;
        while (scanner.hasNext()) {
            try {
                Events.Event event = (Events.Event) this.parser.fromJson(scanner.nextLine(), Events.Event.class);
                if (z && event.getDimensions().containsKey(dimensionKeyPayloadOffset) && event.getDimensions().containsKey(dimensionKeyPayloadLength)) {
                    long longValue = event.getDimensions().get(dimensionKeyPayloadOffset).longValue();
                    long longValue2 = event.getDimensions().get(dimensionKeyPayloadLength).longValue();
                    byte[] objectBytes = S3Utils.getObjectBytes(this.s3Client, this.bucketName, str.replace("json", "b64"), longValue, (longValue + longValue2) - 1);
                    if (objectBytes == null || objectBytes.length == 0) {
                        throw new IOException("failed to retrieve payload for event");
                    }
                    arrayList.add(new Events.Event(event.getTimestampMillis(), event.getMetadata(), event.getDimensions(), Base64.getDecoder().decode(new String(objectBytes))));
                } else {
                    arrayList.add(event);
                }
            } finally {
                if (scanner != null) {
                    if (0 != 0) {
                        try {
                            scanner.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        scanner.close();
                    }
                }
            }
        }
        return arrayList;
    }

    private Set<String> doMetadataOnObject(String str, String str2, long j, long j2, Map<String, String> map, Map<String, String> map2) throws IOException {
        HashSet hashSet = new HashSet();
        Scanner scanner = new Scanner(S3Utils.S3Select.queryObjectJson(this.s3Client, this.bucketName, str, generateMetadataQuery(str2, j, j2, map, map2)));
        Throwable th = null;
        while (scanner.hasNext()) {
            try {
                try {
                    Map map3 = (Map) this.parser.fromJson(scanner.nextLine(), Map.class);
                    if (map3.containsKey(str2)) {
                        hashSet.add(map3.get(str2));
                    }
                } finally {
                }
            } catch (Throwable th2) {
                if (scanner != null) {
                    if (th != null) {
                        try {
                            scanner.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        scanner.close();
                    }
                }
                throw th2;
            }
        }
        if (scanner != null) {
            if (0 != 0) {
                try {
                    scanner.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                scanner.close();
            }
        }
        return hashSet;
    }

    private List<Events.Event> doDimensionOnObject(String str, String str2, long j, long j2, Map<String, String> map, Map<String, String> map2) throws IOException {
        ArrayList arrayList = new ArrayList();
        Scanner scanner = new Scanner(S3Utils.S3Select.queryObjectJson(this.s3Client, this.bucketName, str, generateDimensionQuery(str2, j, j2, map, map2)));
        Throwable th = null;
        while (scanner.hasNext()) {
            try {
                try {
                    Map map3 = (Map) this.parser.fromJson(scanner.nextLine(), Map.class);
                    arrayList.add(new Events.Event(((Double) map3.get("timestampMillis")).longValue(), Collections.emptyMap(), Collections.singletonMap(str2, map3.get(str2))));
                } finally {
                }
            } catch (Throwable th2) {
                if (scanner != null) {
                    if (th != null) {
                        try {
                            scanner.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        scanner.close();
                    }
                }
                throw th2;
            }
        }
        if (scanner != null) {
            if (0 != 0) {
                try {
                    scanner.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                scanner.close();
            }
        }
        return arrayList;
    }

    private void doExpire(String str, long j) throws IOException, InterruptedException {
        logger.info("expiring namespace '{}' with end timestamp of '{}'", str, Long.valueOf(j));
        Set<String> matchingKeys = getMatchingKeys(str, 0L, j);
        logger.info("expiring objects: {}", matchingKeys);
        S3Utils.deleteObjects(this.s3Client, this.bucketName, matchingKeys);
    }

    private String generateGetQuery(long j, long j2, Map<String, String> map, Map<String, String> map2) {
        return String.format("SELECT * FROM s3object[*] s WHERE %s %s %s", String.format("s.timestampMillis BETWEEN %d AND %d", Long.valueOf(j), Long.valueOf(j2)), getMetadataQuerySql(map), getDimensionsQuerySql(map2));
    }

    private String generateMetadataQuery(String str, long j, long j2, Map<String, String> map, Map<String, String> map2) {
        return String.format("SELECT s.metadata.\"%s\" FROM s3object[*] s WHERE %s %s %s", str, String.format("s.timestampMillis BETWEEN %d AND %d", Long.valueOf(j), Long.valueOf(j2)), getMetadataQuerySql(map), getDimensionsQuerySql(map2));
    }

    private String generateDimensionQuery(String str, long j, long j2, Map<String, String> map, Map<String, String> map2) {
        return String.format("SELECT s.timestampMillis, s.dimensions.\"%s\" FROM s3object[*] s WHERE %s %s %s", str, String.format("s.timestampMillis BETWEEN %d AND %d", Long.valueOf(j), Long.valueOf(j2)), getMetadataQuerySql(map), getDimensionsQuerySql(map2));
    }

    private Set<String> getMatchingKeys(String str, long j, long j2) throws IOException, InterruptedException {
        try {
            return keysCache.get(String.format("%d-%d-%d", Integer.valueOf(str.hashCode()), Long.valueOf(j), Long.valueOf(j2)), () -> {
                return doGetMatchingKeys(str, j, j2);
            });
        } catch (ExecutionException e) {
            return doGetMatchingKeys(str, j, j2);
        }
    }

    private Set<String> doGetMatchingKeys(String str, long j, long j2) throws IOException, InterruptedException {
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat(directoryFormatterMinPattern);
        SimpleDateFormat simpleDateFormat2 = new SimpleDateFormat(directoryFormatterHourPattern);
        HashSet<String> hashSet = new HashSet();
        long j3 = j;
        while (true) {
            long j4 = j3;
            if (j4 > j2) {
                break;
            }
            if (j4 + TimeUnit.HOURS.toMillis(1L) <= j2) {
                hashSet.add(String.format("%s/%s", getObjectKeyPrefix(str), simpleDateFormat2.format(Long.valueOf(j4))));
                j3 = ((j4 / CoreConstants.MILLIS_IN_ONE_HOUR) * CoreConstants.MILLIS_IN_ONE_HOUR) + TimeUnit.HOURS.toMillis(1L);
            } else {
                hashSet.add(String.format("%s/%s", getObjectKeyPrefix(str), simpleDateFormat.format(Long.valueOf(j4))));
                j3 = j4 + TimeUnit.MINUTES.toMillis(1L);
            }
        }
        hashSet.add(String.format("%s/%s", getObjectKeyPrefix(str), simpleDateFormat.format(Long.valueOf(j2))));
        final ConcurrentSkipListSet concurrentSkipListSet = new ConcurrentSkipListSet();
        ListeningExecutorService newListeningExecutor = newListeningExecutor("cantor-events-s3-get-matching-keys-%d");
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        for (String str2 : hashSet) {
            Futures.addCallback(newListeningExecutor.submit(() -> {
                return S3Utils.getKeys(this.s3Client, this.bucketName, str2);
            }), new FutureCallback<Collection<String>>() { // from class: com.salesforce.cantor.s3.EventsOnS3.5
                @Override // com.google.common.util.concurrent.FutureCallback
                public void onSuccess(Collection<String> collection) {
                    concurrentSkipListSet.addAll(collection);
                }

                @Override // com.google.common.util.concurrent.FutureCallback
                public void onFailure(Throwable th) {
                    atomicBoolean.set(true);
                    EventsOnS3.logger.warn("exception on getting object keys from s3", th);
                }
            }, MoreExecutors.directExecutor());
        }
        awaitTermination(newListeningExecutor);
        if (atomicBoolean.get()) {
            throw new IOException("exception on getting object keys from s3");
        }
        return concurrentSkipListSet;
    }

    private String getMetadataQuerySql(Map<String, String> map) {
        if (map.isEmpty()) {
            return "";
        }
        StringBuilder sb = new StringBuilder();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            String prefixMetadata = prefixMetadata(entry.getKey());
            String value = entry.getValue();
            if (value.startsWith(Constants.SERVER_PROPERTIES_DIR)) {
                sb.append(" AND ").append(prefixMetadata).append(" LIKE ").append(quote(regexToSql(value.substring(1))));
            } else if (value.startsWith("!~")) {
                sb.append(" AND ").append(prefixMetadata).append(" NOT LIKE ").append(quote(regexToSql(value.substring(2))));
            } else if (value.startsWith("=")) {
                sb.append(" AND ").append(prefixMetadata).append("=").append(quote(value.substring(1)));
            } else if (value.startsWith("!=")) {
                sb.append(" AND ").append(prefixMetadata).append("!=").append(quote(value.substring(2)));
            } else {
                sb.append(" AND ").append(prefixMetadata).append("=").append(quote(value));
            }
        }
        return sb.toString();
    }

    private String regexToSql(String str) {
        return str.replace("*", "%").replace("_", "\\\\_");
    }

    private String getDimensionsQuerySql(Map<String, String> map) {
        if (map.isEmpty()) {
            return "";
        }
        StringBuilder sb = new StringBuilder();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            String prefixDimension = prefixDimension(entry.getKey());
            String value = entry.getValue();
            if (value.contains(CallerDataConverter.DEFAULT_RANGE_DELIMITER)) {
                sb.append(" AND ").append(prefixDimension).append(" BETWEEN ").append(Double.valueOf(value.substring(0, value.indexOf(CallerDataConverter.DEFAULT_RANGE_DELIMITER)))).append(" AND ").append(Double.valueOf(value.substring(value.indexOf(CallerDataConverter.DEFAULT_RANGE_DELIMITER) + 2)));
            } else if (value.startsWith(">=")) {
                sb.append(" AND ").append(prefixDimension).append(">=").append(value.substring(2));
            } else if (value.startsWith("<=")) {
                sb.append(" AND ").append(prefixDimension).append("<=").append(value.substring(2));
            } else if (value.startsWith(">")) {
                sb.append(" AND ").append(prefixDimension).append(">").append(value.substring(1));
            } else if (value.startsWith("<")) {
                sb.append(" AND ").append(prefixDimension).append("<").append(value.substring(1));
            } else if (value.startsWith("!=")) {
                sb.append(" AND ").append(prefixDimension).append("!=").append(value.substring(2));
            } else if (value.startsWith("=")) {
                sb.append(" AND ").append(prefixDimension).append("=").append(value.substring(1));
            } else {
                sb.append(" AND ").append(prefixDimension).append("=").append(value);
            }
        }
        return sb.toString();
    }

    private String quote(String str) {
        return String.format("'%s'", str);
    }

    private String prefixMetadata(String str) {
        return String.format("s.metadata.\"%s\"", str);
    }

    private String prefixDimension(String str) {
        return String.format("CAST ( s.dimensions.\"%s\" as decimal)", str);
    }

    private void rollover() {
        String format = String.format("%s.%s", new SimpleDateFormat(cycleNameFormatterPattern).format(Long.valueOf(System.currentTimeMillis())), UUID.randomUUID().toString().replaceAll("-", ""));
        logger.info("starting new cycle: {}", format);
        setRolloverCycleName(format);
    }

    private String getRolloverCycleName() {
        return this.currentFlushCycleGuid.get();
    }

    private void setRolloverCycleName(String str) {
        this.currentFlushCycleGuid.set(str);
    }

    private String getPath(String str) {
        return this.bufferDirectory + File.separator + str;
    }

    private void flush() {
        long currentTimeMillis = System.currentTimeMillis();
        try {
            try {
                try {
                    rollover();
                    File file = new File(this.bufferDirectory);
                    if (!file.exists() || !file.canWrite() || !file.isDirectory()) {
                        logger.warn("buffer directory '{}' does not exist or is not writable", this.bufferDirectory);
                        logger.info("flush cycle elapsed time: {}s", Long.valueOf((System.currentTimeMillis() - currentTimeMillis) / 1000));
                        return;
                    }
                    ArrayList<File> arrayList = new ArrayList();
                    for (File file2 : file.listFiles()) {
                        logger.info("uploading buffer directory: {}", file2.getAbsolutePath());
                        if (file2.exists() && file2.isDirectory()) {
                            uploadDirectory(file2);
                            logger.info("successfully uploaded buffer directory: {}", file2.getAbsolutePath());
                            arrayList.add(file2);
                        } else {
                            logger.info("nothing to upload");
                        }
                    }
                    for (File file3 : arrayList) {
                        logger.info("deleting buffer file: {}", file3.getAbsolutePath());
                        delete(file3);
                    }
                    logger.info("flush cycle elapsed time: {}s", Long.valueOf((System.currentTimeMillis() - currentTimeMillis) / 1000));
                } catch (Exception e) {
                    logger.warn("exception during flush", (Throwable) e);
                    logger.info("flush cycle elapsed time: {}s", Long.valueOf((System.currentTimeMillis() - currentTimeMillis) / 1000));
                }
            } catch (InterruptedException e2) {
                logger.warn("flush cycle interrupted; exiting");
                logger.info("flush cycle elapsed time: {}s", Long.valueOf((System.currentTimeMillis() - currentTimeMillis) / 1000));
            }
        } catch (Throwable th) {
            logger.info("flush cycle elapsed time: {}s", Long.valueOf((System.currentTimeMillis() - currentTimeMillis) / 1000));
            throw th;
        }
    }

    private void uploadDirectory(File file) throws InterruptedException {
        MultipleFileUpload uploadDirectory = this.s3TransferManager.uploadDirectory(this.bucketName, null, file, true, (file2, objectMetadata) -> {
            objectMetadata.setContentType("text/plain");
        }, uploadContext -> {
            String key = uploadContext.getKey();
            String substring = key.substring(key.indexOf("/") + 1);
            return new ObjectTagging(Collections.singletonList(new Tag("namespace", substring.substring(0, substring.indexOf("/")))));
        }, file3 -> {
            return CannedAccessControlList.BucketOwnerFullControl;
        });
        do {
            logger.info("s3 transfer progress of '{}': {}% of {}mb state: {}", file.getAbsolutePath(), Integer.valueOf((int) uploadDirectory.getProgress().getPercentTransferred()), Long.valueOf(uploadDirectory.getProgress().getTotalBytesToTransfer() / FileSize.MB_COEFFICIENT), uploadDirectory.getState());
            Thread.sleep(1000L);
        } while (!uploadDirectory.isDone());
        uploadDirectory.waitForCompletion();
    }

    private void delete(File file) {
        File[] listFiles = file.listFiles();
        if (listFiles != null) {
            for (File file2 : listFiles) {
                delete(file2);
            }
        }
        file.delete();
    }

    private static ListeningExecutorService newListeningExecutor(String str) {
        return MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat(str).build()));
    }

    private static void awaitTermination(ExecutorService executorService) throws IOException {
        executorService.shutdown();
        try {
            executorService.awaitTermination(defaultTimeoutSeconds, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            throw new IOException(e);
        }
    }
}
