/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.direct.hbase;

import cz.o2.proxima.direct.batch.BatchLogObservable;
import cz.o2.proxima.direct.batch.BatchLogObserver;
import cz.o2.proxima.direct.core.Partition;
import cz.o2.proxima.direct.hbase.HBaseClientWrapper;
import cz.o2.proxima.direct.hbase.HBasePartition;
import cz.o2.proxima.direct.hbase.Util;
import cz.o2.proxima.functional.Factory;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.storage.StreamElement;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class HBaseLogObservable
extends HBaseClientWrapper
implements BatchLogObservable {
    private static final Logger log = LoggerFactory.getLogger(HBaseLogObservable.class);
    private final EntityDescriptor entity;
    private final Factory<Executor> executorFactory;
    private transient Executor executor;

    public HBaseLogObservable(URI uri, Configuration conf, EntityDescriptor entity, Factory<Executor> executorFactory) {
        super(uri, conf);
        this.entity = entity;
        this.executorFactory = executorFactory;
    }

    public List<Partition> getPartitions(long startStamp, long endStamp) {
        try {
            this.ensureClient();
            ArrayList<Partition> ret = new ArrayList<Partition>();
            byte[][] end = this.conn.getRegionLocator(this.tableName()).getEndKeys();
            byte[] startPos = new byte[]{};
            if (startStamp < 0L) {
                startStamp = 0L;
            }
            for (int i = 0; i < end.length; ++i) {
                ret.add(new HBasePartition(i, startPos, end[i], startStamp, endStamp));
                startPos = end[i];
            }
            return ret;
        }
        catch (IOException ex) {
            throw new RuntimeException(ex);
        }
    }

    public void observe(List<Partition> partitions, List<AttributeDescriptor<?>> attributes, BatchLogObserver observer) {
        this.executor().execute(() -> {
            block2: {
                this.ensureClient();
                try {
                    this.flushPartitions(partitions, attributes, observer);
                }
                catch (Throwable ex) {
                    log.warn("Failed to observe partitions {}", (Object)partitions, (Object)ex);
                    if (!observer.onError(ex)) break block2;
                    log.info("Restaring processing by request");
                    this.observe(partitions, attributes, observer);
                }
            }
        });
    }

    private void flushPartitions(List<Partition> partitions, List<AttributeDescriptor<?>> attributes, BatchLogObserver observer) throws IOException {
        for (Partition p : partitions) {
            HBasePartition hp = (HBasePartition)p;
            Scan scan = new Scan(hp.getStartKey(), hp.getEndKey());
            scan.addFamily(this.family);
            scan.setTimeRange(hp.getStartStamp(), hp.getEndStamp());
            scan.setFilter(this.toFilter(attributes));
            boolean finish = false;
            try (ResultScanner scanner = this.client.getScanner(scan);){
                Result next;
                while ((next = scanner.next()) != null && !Thread.currentThread().isInterrupted()) {
                    if (this.consume(next, attributes, hp, observer)) continue;
                    finish = true;
                    break;
                }
            }
            if (!finish) continue;
            break;
        }
        observer.onCompleted();
    }

    private Executor executor() {
        if (this.executor == null) {
            this.executor = (Executor)this.executorFactory.apply();
        }
        return this.executor;
    }

    private boolean consume(Result r, List<AttributeDescriptor<?>> attrs, HBasePartition hp, BatchLogObserver observer) throws IOException {
        CellScanner scanner = r.cellScanner();
        while (scanner.advance()) {
            if (observer.onNext(this.toStreamElement(scanner.current(), attrs, hp), (Partition)hp)) continue;
            return false;
        }
        return true;
    }

    private StreamElement toStreamElement(Cell cell, List<AttributeDescriptor<?>> attrs, HBasePartition hp) {
        String key = new String(cell.getRowArray(), cell.getRowOffset(), (int)cell.getRowLength());
        String qualifier = new String(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
        for (AttributeDescriptor<?> d : attrs) {
            if (!qualifier.startsWith(d.toAttributePrefix())) continue;
            return StreamElement.upsert((EntityDescriptor)this.entity, d, (String)(new String(hp.getStartKey()) + "#" + cell.getSequenceId()), (String)key, (String)qualifier, (long)cell.getTimestamp(), (byte[])Util.cloneArray(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
        }
        throw new IllegalStateException("Illegal state! Fix code!");
    }

    private Filter toFilter(List<AttributeDescriptor<?>> attributes) {
        FilterList attrFilter = new FilterList(FilterList.Operator.MUST_PASS_ONE);
        attributes.forEach(attr -> {
            if (attr.isWildcard()) {
                attrFilter.addFilter((Filter)new ColumnPrefixFilter(attr.toAttributePrefix().getBytes(StandardCharsets.UTF_8)));
            } else {
                attrFilter.addFilter((Filter)new QualifierFilter(CompareFilter.CompareOp.EQUAL, (ByteArrayComparable)new BinaryComparator(attr.getName().getBytes(StandardCharsets.UTF_8))));
            }
        });
        return attrFilter;
    }

    public int hashCode() {
        return 73;
    }

    public boolean equals(Object obj) {
        if (obj instanceof HBaseLogObservable) {
            HBaseLogObservable o = (HBaseLogObservable)obj;
            return o.entity.equals(this.entity) && o.uri.equals(this.uri);
        }
        return false;
    }
}

