/*
 * Decompiled with CFR 0.152.
 */
package com.spotify.styx.storage;

import com.google.cloud.datastore.DatastoreException;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.Sets;
import com.spotify.styx.model.Event;
import com.spotify.styx.model.SequenceEvent;
import com.spotify.styx.model.WorkflowId;
import com.spotify.styx.model.WorkflowInstance;
import com.spotify.styx.model.data.WorkflowInstanceExecutionData;
import com.spotify.styx.serialization.Json;
import com.spotify.styx.util.ResourceNotFoundException;
import com.spotify.styx.util.RunnableWithException;
import java.io.IOException;
import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;
import okio.ByteString;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BigtableStorage {
    private static final Logger LOG = LoggerFactory.getLogger(BigtableStorage.class);
    public static final TableName EVENTS_TABLE_NAME = TableName.valueOf((String)"styx_events");
    public static final byte[] EVENT_CF = Bytes.toBytes((String)"event");
    public static final byte[] EVENT_QUALIFIER = Bytes.toBytes((String)"event");
    public static final int MAX_BIGTABLE_RETRIES = 100;
    private final Connection connection;
    private final Duration retryBaseDelay;

    BigtableStorage(Connection connection, Duration retryBaseDelay) {
        this.connection = Objects.requireNonNull(connection);
        this.retryBaseDelay = Objects.requireNonNull(retryBaseDelay);
    }

    SortedSet<SequenceEvent> readEvents(WorkflowInstance workflowInstance) throws IOException {
        try (Table eventsTable = this.connection.getTable(EVENTS_TABLE_NAME);){
            Scan scan = new Scan().setRowPrefixFilter(Bytes.toBytes((String)(workflowInstance.toKey() + '#')));
            TreeSet<SequenceEvent> set = BigtableStorage.newSortedEventSet();
            for (Result result : eventsTable.getScanner(scan)) {
                set.add(this.parseEventResult(result));
            }
            TreeSet<SequenceEvent> treeSet = set;
            return treeSet;
        }
    }

    void writeEvent(SequenceEvent sequenceEvent) throws IOException {
        this.storeWithRetries(() -> {
            Table eventsTable = this.connection.getTable(EVENTS_TABLE_NAME);
            Throwable throwable = null;
            try {
                String workflowInstanceKey = sequenceEvent.event().workflowInstance().toKey();
                String keyString = String.format("%s#%08d", workflowInstanceKey, sequenceEvent.counter());
                byte[] key = Bytes.toBytes((String)keyString);
                Put put = new Put(key, sequenceEvent.timestamp());
                byte[] eventBytes = Json.serialize(sequenceEvent.event()).toByteArray();
                put.addColumn(EVENT_CF, EVENT_QUALIFIER, eventBytes);
                eventsTable.put(put);
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (eventsTable != null) {
                    BigtableStorage.$closeResource(throwable, (AutoCloseable)eventsTable);
                }
            }
        });
    }

    List<WorkflowInstanceExecutionData> executionData(WorkflowId workflowId, String offset, int limit) throws IOException {
        try (Table eventsTable = this.connection.getTable(EVENTS_TABLE_NAME);){
            Scan scan = new Scan().setRowPrefixFilter(Bytes.toBytes((String)(workflowId.toKey() + '#'))).setFilter((Filter)new FirstKeyOnlyFilter());
            if (!Strings.isNullOrEmpty((String)offset)) {
                WorkflowInstance offsetInstance = WorkflowInstance.create(workflowId, offset);
                scan.setStartRow(Bytes.toBytes((String)(offsetInstance.toKey() + '#')));
            }
            HashSet workflowInstancesSet = Sets.newHashSet();
            try (ResultScanner scanner = eventsTable.getScanner(scan);){
                Result result = scanner.next();
                while (result != null) {
                    String key = new String(result.getRow());
                    int lastHash = key.lastIndexOf(35);
                    WorkflowInstance wfi = WorkflowInstance.parseKey(key.substring(0, lastHash));
                    workflowInstancesSet.add(wfi);
                    if (workflowInstancesSet.size() == limit) {
                        break;
                    }
                    result = scanner.next();
                }
            }
            List<WorkflowInstanceExecutionData> list = this.executionData(workflowInstancesSet);
            return list;
        }
    }

    List<WorkflowInstanceExecutionData> executionData(WorkflowId workflowId, String start, String stop) throws IOException {
        try (Table eventsTable = this.connection.getTable(EVENTS_TABLE_NAME);){
            Scan scan = new Scan().setRowPrefixFilter(Bytes.toBytes((String)(workflowId.toKey() + '#'))).setFilter((Filter)new FirstKeyOnlyFilter());
            WorkflowInstance startRow = WorkflowInstance.create(workflowId, start);
            scan.setStartRow(Bytes.toBytes((String)(startRow.toKey() + '#')));
            if (!Strings.isNullOrEmpty((String)stop)) {
                WorkflowInstance stopRow = WorkflowInstance.create(workflowId, stop);
                scan.setStopRow(Bytes.toBytes((String)(stopRow.toKey() + '#')));
            }
            HashSet workflowInstancesSet = Sets.newHashSet();
            try (ResultScanner scanner = eventsTable.getScanner(scan);){
                Result result = scanner.next();
                while (result != null) {
                    String key = new String(result.getRow());
                    int lastHash = key.lastIndexOf(35);
                    WorkflowInstance wfi = WorkflowInstance.parseKey(key.substring(0, lastHash));
                    workflowInstancesSet.add(wfi);
                    result = scanner.next();
                }
            }
            List<WorkflowInstanceExecutionData> list = this.executionData(workflowInstancesSet);
            return list;
        }
    }

    Optional<Long> getLatestStoredCounter(WorkflowInstance workflowInstance) throws IOException {
        SortedSet<SequenceEvent> storedEvents = this.readEvents(workflowInstance);
        Optional lastStoredEvent = storedEvents.stream().reduce((a, b) -> b);
        if (lastStoredEvent.isPresent()) {
            return Optional.of(((SequenceEvent)lastStoredEvent.get()).counter());
        }
        return Optional.empty();
    }

    WorkflowInstanceExecutionData executionData(WorkflowInstance workflowInstance) throws IOException {
        SortedSet<SequenceEvent> events = this.readEvents(workflowInstance);
        if (events.isEmpty()) {
            throw new ResourceNotFoundException("Workflow instance not found");
        }
        return WorkflowInstanceExecutionData.fromEvents(events);
    }

    private List<WorkflowInstanceExecutionData> executionData(Set<WorkflowInstance> workflowInstancesSet) {
        return workflowInstancesSet.parallelStream().map(workflowInstance -> {
            try {
                return this.executionData((WorkflowInstance)workflowInstance);
            }
            catch (IOException e) {
                throw Throwables.propagate((Throwable)e);
            }
        }).sorted(WorkflowInstanceExecutionData.COMPARATOR).collect(Collectors.toList());
    }

    private SequenceEvent parseEventResult(Result r) throws IOException {
        String key = new String(r.getRow());
        long timestamp = r.getColumnLatestCell(EVENT_CF, EVENT_QUALIFIER).getTimestamp();
        byte[] value = r.getValue(EVENT_CF, EVENT_QUALIFIER);
        Event event = Json.deserializeEvent(ByteString.of((byte[])value));
        return SequenceEvent.parseKey(key, event, timestamp);
    }

    private void storeWithRetries(RunnableWithException<IOException> storingOperation) throws IOException {
        int storeRetries = 0;
        boolean succeeded = false;
        while (storeRetries < 100 && !succeeded) {
            try {
                storingOperation.run();
                succeeded = true;
            }
            catch (ResourceNotFoundException e) {
                throw e;
            }
            catch (DatastoreException | IOException e) {
                if (++storeRetries == 100) {
                    throw e;
                }
                LOG.warn(String.format("Failed to read/write from/to Bigtable (attempt #%d)", storeRetries), e);
                try {
                    Thread.sleep(this.retryBaseDelay.toMillis());
                }
                catch (InterruptedException e1) {
                    throw Throwables.propagate((Throwable)e1);
                }
            }
        }
    }

    private static TreeSet<SequenceEvent> newSortedEventSet() {
        return Sets.newTreeSet(SequenceEvent.COUNTER_COMPARATOR);
    }
}

