package com.spotify.styx.storage;

import com.google.cloud.datastore.DatastoreException;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.Sets;
import com.spotify.styx.model.SequenceEvent;
import com.spotify.styx.model.WorkflowId;
import com.spotify.styx.model.WorkflowInstance;
import com.spotify.styx.model.data.WorkflowInstanceExecutionData;
import com.spotify.styx.serialization.Json;
import com.spotify.styx.util.ResourceNotFoundException;
import com.spotify.styx.util.RunnableWithException;
import java.io.IOException;
import java.time.Duration;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;
import okio.ByteString;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/spotify/styx/storage/BigtableStorage.class */
public class BigtableStorage {
    private static final Logger LOG = LoggerFactory.getLogger(BigtableStorage.class);
    public static final TableName EVENTS_TABLE_NAME = TableName.valueOf("styx_events");
    public static final byte[] EVENT_CF = Bytes.toBytes("event");
    public static final byte[] EVENT_QUALIFIER = Bytes.toBytes("event");
    public static final int MAX_BIGTABLE_RETRIES = 100;
    private final Connection connection;
    private final Duration retryBaseDelay;

    /* JADX INFO: Access modifiers changed from: package-private */
    public BigtableStorage(Connection connection, Duration duration) {
        this.connection = (Connection) Objects.requireNonNull(connection);
        this.retryBaseDelay = (Duration) Objects.requireNonNull(duration);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SortedSet<SequenceEvent> readEvents(WorkflowInstance workflowInstance) throws IOException {
        Table table = this.connection.getTable(EVENTS_TABLE_NAME);
        Throwable th = null;
        try {
            try {
                Scan rowPrefixFilter = new Scan().setRowPrefixFilter(Bytes.toBytes(workflowInstance.toKey() + '#'));
                TreeSet<SequenceEvent> newSortedEventSet = newSortedEventSet();
                Iterator it = table.getScanner(rowPrefixFilter).iterator();
                while (it.hasNext()) {
                    newSortedEventSet.add(parseEventResult((Result) it.next()));
                }
                if (table != null) {
                    $closeResource(null, table);
                }
                return newSortedEventSet;
            } finally {
            }
        } catch (Throwable th2) {
            if (table != null) {
                $closeResource(th, table);
            }
            throw th2;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void writeEvent(SequenceEvent sequenceEvent) throws IOException {
        storeWithRetries(() -> {
            Table table = this.connection.getTable(EVENTS_TABLE_NAME);
            try {
                Put put = new Put(Bytes.toBytes(String.format("%s#%08d", sequenceEvent.event().workflowInstance().toKey(), Long.valueOf(sequenceEvent.counter()))), sequenceEvent.timestamp());
                put.addColumn(EVENT_CF, EVENT_QUALIFIER, Json.serialize(sequenceEvent.event()).toByteArray());
                table.put(put);
                if (table != null) {
                    $closeResource(null, table);
                }
            } catch (Throwable th) {
                if (table != null) {
                    $closeResource(null, table);
                }
                throw th;
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<WorkflowInstanceExecutionData> executionData(WorkflowId workflowId, String str, int i) throws IOException {
        Table table = this.connection.getTable(EVENTS_TABLE_NAME);
        try {
            Scan filter = new Scan().setRowPrefixFilter(Bytes.toBytes(workflowId.toKey() + '#')).setFilter(new FirstKeyOnlyFilter());
            if (!Strings.isNullOrEmpty(str)) {
                filter.setStartRow(Bytes.toBytes(WorkflowInstance.create(workflowId, str).toKey() + '#'));
            }
            HashSet newHashSet = Sets.newHashSet();
            ResultScanner scanner = table.getScanner(filter);
            Throwable th = null;
            try {
                try {
                    for (Result next = scanner.next(); next != null; next = scanner.next()) {
                        String str2 = new String(next.getRow());
                        newHashSet.add(WorkflowInstance.parseKey(str2.substring(0, str2.lastIndexOf(35))));
                        if (newHashSet.size() == i) {
                            break;
                        }
                    }
                    if (scanner != null) {
                        $closeResource(null, scanner);
                    }
                    List<WorkflowInstanceExecutionData> executionData = executionData(newHashSet);
                    if (table != null) {
                        $closeResource(null, table);
                    }
                    return executionData;
                } finally {
                }
            } catch (Throwable th2) {
                if (scanner != null) {
                    $closeResource(th, scanner);
                }
                throw th2;
            }
        } catch (Throwable th3) {
            if (table != null) {
                $closeResource(null, table);
            }
            throw th3;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<WorkflowInstanceExecutionData> executionData(WorkflowId workflowId, String str, String str2) throws IOException {
        Table table = this.connection.getTable(EVENTS_TABLE_NAME);
        try {
            Scan filter = new Scan().setRowPrefixFilter(Bytes.toBytes(workflowId.toKey() + '#')).setFilter(new FirstKeyOnlyFilter());
            filter.setStartRow(Bytes.toBytes(WorkflowInstance.create(workflowId, str).toKey() + '#'));
            if (!Strings.isNullOrEmpty(str2)) {
                filter.setStopRow(Bytes.toBytes(WorkflowInstance.create(workflowId, str2).toKey() + '#'));
            }
            HashSet newHashSet = Sets.newHashSet();
            ResultScanner scanner = table.getScanner(filter);
            Throwable th = null;
            try {
                try {
                    for (Result next = scanner.next(); next != null; next = scanner.next()) {
                        String str3 = new String(next.getRow());
                        newHashSet.add(WorkflowInstance.parseKey(str3.substring(0, str3.lastIndexOf(35))));
                    }
                    if (scanner != null) {
                        $closeResource(null, scanner);
                    }
                    List<WorkflowInstanceExecutionData> executionData = executionData(newHashSet);
                    if (table != null) {
                        $closeResource(null, table);
                    }
                    return executionData;
                } finally {
                }
            } catch (Throwable th2) {
                if (scanner != null) {
                    $closeResource(th, scanner);
                }
                throw th2;
            }
        } catch (Throwable th3) {
            if (table != null) {
                $closeResource(null, table);
            }
            throw th3;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Optional<Long> getLatestStoredCounter(WorkflowInstance workflowInstance) throws IOException {
        Optional<SequenceEvent> reduce = readEvents(workflowInstance).stream().reduce((sequenceEvent, sequenceEvent2) -> {
            return sequenceEvent2;
        });
        return reduce.isPresent() ? Optional.of(Long.valueOf(reduce.get().counter())) : Optional.empty();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public WorkflowInstanceExecutionData executionData(WorkflowInstance workflowInstance) throws IOException {
        SortedSet<SequenceEvent> readEvents = readEvents(workflowInstance);
        if (readEvents.isEmpty()) {
            throw new IOException("Workflow instance not found");
        }
        return WorkflowInstanceExecutionData.fromEvents(readEvents);
    }

    private List<WorkflowInstanceExecutionData> executionData(Set<WorkflowInstance> set) {
        return (List) set.parallelStream().map(workflowInstance -> {
            try {
                return executionData(workflowInstance);
            } catch (IOException e) {
                throw Throwables.propagate(e);
            }
        }).sorted(WorkflowInstanceExecutionData.COMPARATOR).collect(Collectors.toList());
    }

    private SequenceEvent parseEventResult(Result result) throws IOException {
        return SequenceEvent.parseKey(new String(result.getRow()), Json.deserializeEvent(ByteString.of(result.getValue(EVENT_CF, EVENT_QUALIFIER))), result.getColumnLatestCell(EVENT_CF, EVENT_QUALIFIER).getTimestamp());
    }

    private void storeWithRetries(RunnableWithException<IOException> runnableWithException) throws IOException {
        int i = 0;
        boolean z = false;
        while (i < 100 && !z) {
            try {
                runnableWithException.run();
                z = true;
            } catch (ResourceNotFoundException e) {
                throw e;
            } catch (DatastoreException | IOException e2) {
                i++;
                if (i == 100) {
                    throw e2;
                }
                LOG.warn(String.format("Failed to read/write from/to Bigtable (attempt #%d)", Integer.valueOf(i)), e2);
                try {
                    Thread.sleep(this.retryBaseDelay.toMillis());
                } catch (InterruptedException e3) {
                    throw Throwables.propagate(e3);
                }
            }
        }
    }

    private static TreeSet<SequenceEvent> newSortedEventSet() {
        return Sets.newTreeSet(SequenceEvent.COUNTER_COMPARATOR);
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
