/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.direct.cassandra;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import cz.o2.proxima.cassandra.shaded.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.cassandra.shaded.com.google.common.base.Strings;
import cz.o2.proxima.direct.batch.BatchLogReader;
import cz.o2.proxima.direct.cassandra.CassandraLogReader;
import cz.o2.proxima.direct.cassandra.CassandraRandomReader;
import cz.o2.proxima.direct.cassandra.CassandraWriter;
import cz.o2.proxima.direct.cassandra.CqlFactory;
import cz.o2.proxima.direct.cassandra.DefaultCqlFactory;
import cz.o2.proxima.direct.cassandra.StringConverter;
import cz.o2.proxima.direct.core.AttributeWriterBase;
import cz.o2.proxima.direct.core.Context;
import cz.o2.proxima.direct.core.DataAccessor;
import cz.o2.proxima.direct.randomaccess.RandomAccessReader;
import cz.o2.proxima.functional.Factory;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.storage.AbstractStorage;
import cz.o2.proxima.util.Classpath;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CassandraDBAccessor
extends AbstractStorage
implements DataAccessor {
    private static final Logger log = LoggerFactory.getLogger(CassandraDBAccessor.class);
    private static final long serialVersionUID = 1L;
    static final String CQL_FACTORY_CFG = "cqlFactory";
    static final String CQL_STRING_CONVERTER = "converter";
    static final String CQL_PARALLEL_SCANS = "scanParallelism";
    private final CqlFactory cqlFactory;
    private final StringConverter<Object> converter;
    private final int batchParallelism;
    @Nullable
    private transient Cluster cluster;
    @Nullable
    private transient Session session;

    public CassandraDBAccessor(EntityDescriptor entityDesc, URI uri, Map<String, Object> cfg) {
        super(entityDesc, uri);
        Object factoryName = cfg.get(CQL_FACTORY_CFG);
        String cqlFactoryName = factoryName == null ? DefaultCqlFactory.class.getName() : factoryName.toString();
        Object tmp = cfg.get(CQL_PARALLEL_SCANS);
        this.batchParallelism = tmp != null ? Integer.parseInt(tmp.toString()) : Runtime.getRuntime().availableProcessors();
        if (this.batchParallelism < 2) {
            throw new IllegalArgumentException("Batch parallelism must be at least 2, got " + this.batchParallelism);
        }
        tmp = cfg.get(CQL_STRING_CONVERTER);
        StringConverter c = StringConverter.getDefault();
        if (tmp != null) {
            try {
                c = (StringConverter)Classpath.newInstance((String)tmp.toString(), StringConverter.class);
            }
            catch (Exception ex) {
                log.warn("Failed to instantiate type converter {}", tmp, (Object)ex);
            }
        }
        this.converter = c;
        try {
            this.cqlFactory = (CqlFactory)Classpath.findClass((String)cqlFactoryName, CqlFactory.class).newInstance();
            this.cqlFactory.setup(entityDesc, uri, this.converter);
        }
        catch (IllegalAccessException | InstantiationException ex) {
            throw new IllegalArgumentException("Cannot instantiate class " + cqlFactoryName, ex);
        }
    }

    ResultSet execute(Statement statement) {
        if (log.isDebugEnabled()) {
            if (statement instanceof BoundStatement) {
                BoundStatement s2 = (BoundStatement)statement;
                log.debug("Executing BoundStatement {}", (Object)s2.preparedStatement().getQueryString());
            } else {
                log.debug("Executing {} {} with payload {}", statement.getClass().getSimpleName(), statement, statement.getOutgoingPayload());
            }
        }
        return this.session.execute(statement);
    }

    @VisibleForTesting
    Cluster getCluster(URI uri) {
        String authority = uri.getAuthority();
        if (Strings.isNullOrEmpty(authority)) {
            throw new IllegalArgumentException("Invalid authority in " + uri);
        }
        return Cluster.builder().addContactPointsWithPorts(Arrays.stream(authority.split(",")).map(p -> {
            String[] parts = p.split(":", 2);
            if (parts.length != 2) {
                throw new IllegalArgumentException("Invalid hostport " + p);
            }
            return InetSocketAddress.createUnresolved(parts[0], Integer.parseInt(parts[1]));
        }).collect(Collectors.toList())).build();
    }

    synchronized Session ensureSession() {
        if (this.session == null || this.session.isClosed()) {
            if (this.cluster == null || this.cluster.isClosed()) {
                if (this.cluster != null) {
                    this.cluster.close();
                }
                this.cluster = this.getCluster(this.getUri());
            }
            if (this.session != null) {
                this.session.close();
            }
            this.session = this.cluster.connect();
        }
        return this.session;
    }

    synchronized void close() {
        if (this.session != null) {
            this.session.close();
            this.session = null;
        }
        if (this.cluster != null) {
            this.cluster.close();
            this.cluster = null;
        }
    }

    public Optional<AttributeWriterBase> getWriter(Context context) {
        return Optional.of(this.newWriter());
    }

    public Optional<RandomAccessReader> getRandomAccessReader(Context context) {
        return Optional.of(this.newRandomReader());
    }

    public Optional<BatchLogReader> getBatchLogReader(Context context) {
        return Optional.of(this.newBatchReader(context));
    }

    @VisibleForTesting
    CassandraRandomReader newRandomReader() {
        return new CassandraRandomReader(this);
    }

    @VisibleForTesting
    CassandraLogReader newBatchReader(Context context) {
        return new CassandraLogReader(this, (Factory<ExecutorService>)((Factory & Serializable)() -> ((Context)context).getExecutorService()));
    }

    @VisibleForTesting
    CassandraWriter newWriter() {
        return new CassandraWriter(this);
    }

    CqlFactory getCqlFactory() {
        return this.cqlFactory;
    }

    StringConverter<Object> getConverter() {
        return this.converter;
    }

    int getBatchParallelism() {
        return this.batchParallelism;
    }
}

