package cz.o2.proxima.direct.cassandra;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import cz.o2.proxima.direct.batch.BatchLogObservable;
import cz.o2.proxima.direct.batch.BatchLogObserver;
import cz.o2.proxima.direct.core.Partition;
import cz.o2.proxima.functional.Factory;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.storage.StreamElement;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;

/* loaded from: input_file:cz/o2/proxima/direct/cassandra/CassandraLogObservable.class */
class CassandraLogObservable implements BatchLogObservable {
    private final CassandraDBAccessor accessor;
    private final int parallelism;
    private final Factory<Executor> executorFactory;

    @Nullable
    private transient Executor executor;

    /* JADX INFO: Access modifiers changed from: package-private */
    public CassandraLogObservable(CassandraDBAccessor cassandraDBAccessor, Factory<Executor> factory) {
        this.accessor = cassandraDBAccessor;
        this.parallelism = cassandraDBAccessor.getBatchParallelism();
        this.executorFactory = factory;
    }

    public List<Partition> getPartitions(long j, long j2) {
        ArrayList arrayList = new ArrayList();
        double d = 1.8446744073709552E19d / this.parallelism;
        double d2 = -9.223372036854776E18d;
        double d3 = (-9.223372036854776E18d) + d;
        int i = 0;
        while (i < this.parallelism) {
            arrayList.add(new CassandraPartition(i, j, j2, (long) d2, (long) d3, i == this.parallelism - 1));
            d2 = d3;
            d3 += d;
            if (i == this.parallelism - 2) {
                d3 = 9.223372036854776E18d;
            }
            i++;
        }
        return arrayList;
    }

    public void observe(List<Partition> list, List<AttributeDescriptor<?>> list2, BatchLogObserver batchLogObserver) {
        executor().execute(() -> {
            boolean z = true;
            Iterator it = list.iterator();
            while (z) {
                try {
                    if (!it.hasNext()) {
                        break;
                    }
                    CassandraPartition cassandraPartition = (CassandraPartition) it.next();
                    ResultSet execute = this.accessor.execute(this.accessor.getCqlFactory().scanPartition(list2, cassandraPartition, this.accessor.ensureSession()));
                    AtomicLong atomicLong = new AtomicLong();
                    Iterator<Row> it2 = execute.iterator();
                    while (z && it2.hasNext()) {
                        Row next = it2.next();
                        String string = next.getString(0);
                        int i = 1;
                        Iterator it3 = list2.iterator();
                        while (true) {
                            if (it3.hasNext()) {
                                AttributeDescriptor attributeDescriptor = (AttributeDescriptor) it3.next();
                                String name = attributeDescriptor.getName();
                                if (attributeDescriptor.isWildcard()) {
                                    int i2 = i;
                                    i++;
                                    name = attributeDescriptor.toAttributePrefix() + this.accessor.getConverter().asString(next.getObject(i2));
                                }
                                int i3 = i;
                                i++;
                                ByteBuffer bytes = next.getBytes(i3);
                                if (bytes != null) {
                                    if (!batchLogObserver.onNext(StreamElement.upsert(this.accessor.getEntityDescriptor(), attributeDescriptor, "cql-" + this.accessor.getEntityDescriptor().getName() + "-part" + cassandraPartition.getId() + atomicLong.incrementAndGet(), string, name, System.currentTimeMillis(), bytes.slice().array()), cassandraPartition)) {
                                        z = false;
                                        break;
                                    }
                                }
                            }
                        }
                    }
                } catch (Throwable th) {
                    batchLogObserver.onError(th);
                    return;
                }
            }
            batchLogObserver.onCompleted();
        });
    }

    private Executor executor() {
        if (this.executor == null) {
            this.executor = (Executor) this.executorFactory.apply();
        }
        return this.executor;
    }
}
