/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.direct.hbase;

import cz.o2.proxima.direct.core.CommitCallback;
import cz.o2.proxima.direct.core.OnlineAttributeWriter;
import cz.o2.proxima.direct.hbase.HBaseClientWrapper;
import cz.o2.proxima.direct.hbase.Util;
import cz.o2.proxima.storage.StreamElement;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class HBaseWriter
extends HBaseClientWrapper
implements OnlineAttributeWriter {
    private static final Logger log = LoggerFactory.getLogger(HBaseWriter.class);
    private static final String DEL_BATCH_SIZE_CONF = "del-batch-size";
    private static final String FLUSH_COMMITS_CFG = "flush-commits";
    private final int batchSize;
    private boolean flushCommits;

    HBaseWriter(URI uri, Configuration conf, Map<String, Object> cfg) {
        super(uri, conf);
        this.batchSize = Optional.ofNullable(cfg.get(DEL_BATCH_SIZE_CONF)).map(o -> Integer.valueOf(o.toString())).orElse(1000);
        this.flushCommits = Optional.ofNullable(cfg.get(FLUSH_COMMITS_CFG)).map(o -> Boolean.valueOf(o.toString())).orElse(true);
    }

    public void write(StreamElement data, CommitCallback statusCallback) {
        this.ensureClient();
        byte[] key = data.getKey().getBytes(StandardCharsets.UTF_8);
        long stamp = data.getStamp();
        try {
            if (data.isDelete()) {
                if (data.isDeleteWildcard()) {
                    this.deletePrefix(key, this.family, data.getAttributeDescriptor().toAttributePrefix(), stamp);
                } else {
                    Delete del = new Delete(key, stamp);
                    del.addColumns(this.family, data.getAttribute().getBytes(StandardCharsets.UTF_8), stamp);
                    this.client.delete(del);
                }
            } else {
                String column = data.getAttribute();
                Put put = new Put(key, stamp);
                put.addColumn(this.family, column.getBytes(StandardCharsets.UTF_8), stamp, data.getValue());
                this.client.put(put);
            }
            if (this.flushCommits) {
                ((HTable)this.client).flushCommits();
            }
            statusCallback.commit(true, null);
        }
        catch (Exception ex) {
            log.error("Failed to write {}", (Object)data, (Object)ex);
            statusCallback.commit(false, (Throwable)ex);
        }
    }

    private void deletePrefix(byte[] key, byte[] family, String prefix, long stamp) throws IOException {
        Delete del = new Delete(key);
        Get get = new Get(key);
        get.addFamily(family);
        get.setFilter((Filter)new ColumnPrefixFilter(prefix.getBytes(StandardCharsets.UTF_8)));
        Scan scan = new Scan(get);
        scan.setAllowPartialResults(true);
        try (ResultScanner scanner = this.client.getScanner(scan);){
            Result res;
            while ((res = scanner.next()) != null) {
                CellScanner cellScanner = res.cellScanner();
                while (cellScanner.advance()) {
                    Cell c = cellScanner.current();
                    if (c.getTimestamp() <= stamp) {
                        del.addColumns(family, Util.cloneArray(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength()), stamp);
                    }
                    if (del.size() < this.batchSize) continue;
                    this.client.delete(del);
                    del = new Delete(key);
                }
            }
        }
        if (!del.isEmpty()) {
            this.client.delete(del);
        }
        if (log.isDebugEnabled()) {
            log.debug("Deleted prefix {} of key {} in family {} at {}", new Object[]{prefix, new String(key, StandardCharsets.UTF_8), new String(family, StandardCharsets.UTF_8), stamp});
        }
    }

    @Override
    void ensureClient() {
        super.ensureClient();
        if (!(this.client instanceof HTable)) {
            this.flushCommits = false;
        }
    }

    public boolean equals(Object o) {
        if (o == this) {
            return true;
        }
        if (!(o instanceof HBaseWriter)) {
            return false;
        }
        HBaseWriter other = (HBaseWriter)o;
        if (!other.canEqual(this)) {
            return false;
        }
        if (this.batchSize != other.batchSize) {
            return false;
        }
        return this.flushCommits == other.flushCommits;
    }

    protected boolean canEqual(Object other) {
        return other instanceof HBaseWriter;
    }

    public int hashCode() {
        int PRIME = 59;
        int result = 1;
        result = result * 59 + this.batchSize;
        result = result * 59 + (this.flushCommits ? 79 : 97);
        return result;
    }
}

