package cz.o2.proxima.direct.hbase;

import cz.o2.proxima.direct.batch.BatchLogObserver;
import cz.o2.proxima.direct.batch.BatchLogObservers;
import cz.o2.proxima.direct.batch.BatchLogReader;
import cz.o2.proxima.direct.batch.ObserveHandle;
import cz.o2.proxima.direct.batch.TerminationContext;
import cz.o2.proxima.functional.Factory;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.util.ExceptionUtils;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import lombok.Generated;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/direct/hbase/HBaseLogReader.class */
class HBaseLogReader extends HBaseClientWrapper implements BatchLogReader {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(HBaseLogReader.class);
    private final EntityDescriptor entity;
    private final InternalSerializer serializer;
    private final Factory<ExecutorService> executorFactory;
    private final ExecutorService executor;

    public HBaseLogReader(URI uri, Configuration configuration, EntityDescriptor entityDescriptor, Factory<ExecutorService> factory) {
        super(uri, configuration);
        this.entity = entityDescriptor;
        this.serializer = HBaseDataAccessor.instantiateSerializer(uri);
        this.executorFactory = factory;
        this.executor = (ExecutorService) factory.apply();
    }

    public List<Partition> getPartitions(long j, long j2) {
        try {
            ensureClient();
            ArrayList arrayList = new ArrayList();
            byte[][] endKeys = this.conn.getRegionLocator(tableName()).getEndKeys();
            byte[] bArr = new byte[0];
            if (j < 0) {
                j = 0;
            }
            for (int i = 0; i < endKeys.length; i++) {
                arrayList.add(new HBasePartition(i, bArr, endKeys[i], j, j2));
                bArr = endKeys[i];
            }
            return arrayList;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public ObserveHandle observe(List<Partition> list, List<AttributeDescriptor<?>> list2, BatchLogObserver batchLogObserver) {
        TerminationContext terminationContext = new TerminationContext(batchLogObserver);
        observeInternal(list, list2, batchLogObserver, terminationContext);
        return terminationContext;
    }

    public void observeInternal(List<Partition> list, List<AttributeDescriptor<?>> list2, BatchLogObserver batchLogObserver, TerminationContext terminationContext) {
        this.executor.submit(() -> {
            ensureClient();
            try {
                ExceptionUtils.ignoringInterrupted(() -> {
                    flushPartitions(list, list2, terminationContext, batchLogObserver);
                });
            } catch (Throwable th) {
                terminationContext.handleErrorCaught(th, () -> {
                    log.info("Restarting processing by request");
                    observeInternal(list, list2, batchLogObserver, terminationContext);
                });
            }
        });
    }

    public BatchLogReader.Factory<?> asFactory() {
        URI uri = getUri();
        EntityDescriptor entityDescriptor = this.entity;
        Factory<ExecutorService> factory = this.executorFactory;
        byte[] bArr = this.serializedConf;
        return repository -> {
            return new HBaseLogReader(uri, deserialize(bArr, new Configuration()), entityDescriptor, factory);
        };
    }

    private void flushPartitions(List<Partition> list, List<AttributeDescriptor<?>> list2, TerminationContext terminationContext, BatchLogObserver batchLogObserver) throws IOException {
        Result next;
        Iterator<Partition> it = list.iterator();
        while (it.hasNext()) {
            HBasePartition hBasePartition = (HBasePartition) it.next();
            Scan scan = new Scan(hBasePartition.getStartKey(), hBasePartition.getEndKey());
            scan.addFamily(this.family);
            scan.setTimeRange(hBasePartition.getStartStamp(), hBasePartition.getEndStamp());
            scan.setFilter(toFilter(list2));
            ResultScanner scanner = this.client.getScanner(scan);
            Result result = null;
            do {
                try {
                    if (terminationContext.isCancelled() || !(result == null || consume(result, list2, hBasePartition, batchLogObserver, terminationContext))) {
                        if (scanner != null) {
                            scanner.close();
                        }
                        terminationContext.finished();
                    }
                    next = scanner.next();
                    result = next;
                } catch (Throwable th) {
                    if (scanner != null) {
                        try {
                            scanner.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } while (next != null);
            if (scanner != null) {
                scanner.close();
            }
        }
        terminationContext.finished();
    }

    private boolean consume(Result result, List<AttributeDescriptor<?>> list, HBasePartition hBasePartition, BatchLogObserver batchLogObserver, TerminationContext terminationContext) throws IOException {
        CellScanner cellScanner = result.cellScanner();
        while (!terminationContext.isCancelled() && cellScanner.advance()) {
            if (!batchLogObserver.onNext(toStreamElement(cellScanner.current(), list), BatchLogObservers.defaultContext(hBasePartition))) {
                return false;
            }
        }
        return true;
    }

    private StreamElement toStreamElement(Cell cell, List<AttributeDescriptor<?>> list) throws IOException {
        String str = new String(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
        for (AttributeDescriptor<?> attributeDescriptor : list) {
            if (str.startsWith(attributeDescriptor.toAttributePrefix())) {
                return this.serializer.toKeyValue(this.entity, attributeDescriptor, cell);
            }
        }
        throw new IllegalStateException("Illegal state! Fix code!");
    }

    private Filter toFilter(List<AttributeDescriptor<?>> list) {
        FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
        list.forEach(attributeDescriptor -> {
            if (attributeDescriptor.isWildcard()) {
                filterList.addFilter(new ColumnPrefixFilter(attributeDescriptor.toAttributePrefix().getBytes(StandardCharsets.UTF_8)));
            } else {
                filterList.addFilter(new QualifierFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(attributeDescriptor.getName().getBytes(StandardCharsets.UTF_8))));
            }
        });
        return filterList;
    }

    public int hashCode() {
        return 73;
    }

    public boolean equals(Object obj) {
        if (!(obj instanceof HBaseLogReader)) {
            return false;
        }
        HBaseLogReader hBaseLogReader = (HBaseLogReader) obj;
        return hBaseLogReader.entity.equals(this.entity) && hBaseLogReader.uri.equals(this.uri);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -578678635:
                if (implMethodName.equals("lambda$observeInternal$9af58abb$1")) {
                    z = false;
                    break;
                }
                break;
            case -477049911:
                if (implMethodName.equals("lambda$asFactory$dc9167fa$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/util/ExceptionUtils$ThrowingRunnable") && serializedLambda.getFunctionalInterfaceMethodName().equals("run") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()V") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/hbase/HBaseLogReader") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/List;Ljava/util/List;Lcz/o2/proxima/direct/batch/TerminationContext;Lcz/o2/proxima/direct/batch/BatchLogObserver;)V")) {
                    HBaseLogReader hBaseLogReader = (HBaseLogReader) serializedLambda.getCapturedArg(0);
                    List list = (List) serializedLambda.getCapturedArg(1);
                    List list2 = (List) serializedLambda.getCapturedArg(2);
                    TerminationContext terminationContext = (TerminationContext) serializedLambda.getCapturedArg(3);
                    BatchLogObserver batchLogObserver = (BatchLogObserver) serializedLambda.getCapturedArg(4);
                    return () -> {
                        flushPartitions(list, list2, terminationContext, batchLogObserver);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/direct/batch/BatchLogReader$Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/hbase/HBaseLogReader") && serializedLambda.getImplMethodSignature().equals("(Ljava/net/URI;[BLcz/o2/proxima/repository/EntityDescriptor;Lcz/o2/proxima/functional/Factory;Lcz/o2/proxima/repository/Repository;)Lcz/o2/proxima/direct/batch/BatchLogReader;")) {
                    URI uri = (URI) serializedLambda.getCapturedArg(0);
                    byte[] bArr = (byte[]) serializedLambda.getCapturedArg(1);
                    EntityDescriptor entityDescriptor = (EntityDescriptor) serializedLambda.getCapturedArg(2);
                    Factory factory = (Factory) serializedLambda.getCapturedArg(3);
                    return repository -> {
                        return new HBaseLogReader(uri, deserialize(bArr, new Configuration()), entityDescriptor, factory);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
