/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.direct.io.cassandra;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import cz.o2.proxima.cassandra.shaded.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.cassandra.shaded.com.google.common.base.Preconditions;
import cz.o2.proxima.cassandra.shaded.com.google.common.base.Strings;
import cz.o2.proxima.core.functional.UnaryFunction;
import cz.o2.proxima.core.repository.AttributeDescriptor;
import cz.o2.proxima.core.repository.EntityDescriptor;
import cz.o2.proxima.core.storage.AbstractStorage;
import cz.o2.proxima.core.storage.Partition;
import cz.o2.proxima.core.util.Classpath;
import cz.o2.proxima.direct.core.AttributeWriterBase;
import cz.o2.proxima.direct.core.Context;
import cz.o2.proxima.direct.core.DataAccessor;
import cz.o2.proxima.direct.core.batch.BatchLogObserver;
import cz.o2.proxima.direct.core.batch.BatchLogReader;
import cz.o2.proxima.direct.core.batch.ObserveHandle;
import cz.o2.proxima.direct.core.randomaccess.RandomAccessReader;
import cz.o2.proxima.direct.io.cassandra.CassandraLogReader;
import cz.o2.proxima.direct.io.cassandra.CassandraRandomReader;
import cz.o2.proxima.direct.io.cassandra.CassandraWriter;
import cz.o2.proxima.direct.io.cassandra.CqlFactory;
import cz.o2.proxima.direct.io.cassandra.DefaultCqlFactory;
import cz.o2.proxima.direct.io.cassandra.StringConverter;
import java.io.ObjectStreamException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CassandraDBAccessor
extends AbstractStorage.SerializableAbstractStorage
implements DataAccessor {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(CassandraDBAccessor.class);
    private static final long serialVersionUID = 1L;
    @VisibleForTesting
    private static final Map<String, Cluster> CLUSTER_MAP = Collections.synchronizedMap(new HashMap());
    private static final Map<Cluster, AtomicInteger> CLUSTER_REFERENCES = new ConcurrentHashMap<Cluster, AtomicInteger>();
    private static final Map<Cluster, Session> CLUSTER_SESSIONS = new ConcurrentHashMap<Cluster, Session>();
    static final String CQL_FACTORY_CFG = "cqlFactory";
    static final String CQL_STRING_CONVERTER = "converter";
    static final String CQL_PARALLEL_SCANS = "scanParallelism";
    static final String CONSISTENCY_LEVEL_CFG = "consistency-level";
    static final ConsistencyLevel DEFAULT_CONSISTENCY_LEVEL = ConsistencyLevel.QUORUM;
    static final String USERNAME_CFG = "username";
    static final String PASSWORD_CFG = "password";
    private final StringConverter<?> converter;
    private final int batchParallelism;
    private final String cqlFactoryName;
    private transient ThreadLocal<CqlFactory> cqlFactory;
    @VisibleForTesting
    private final ConsistencyLevel consistencyLevel;
    @Nullable
    @VisibleForTesting
    private final String username;
    @Nullable
    @VisibleForTesting
    private final String password;

    public CassandraDBAccessor(EntityDescriptor entityDesc, URI uri, Map<String, Object> cfg) {
        super(entityDesc, uri);
        this.cqlFactoryName = this.getCqlFactoryName(cfg);
        this.batchParallelism = this.getBatchParallelism(cfg);
        this.converter = this.getStringConverter(cfg);
        this.consistencyLevel = this.getConsistencyLevel(cfg);
        this.username = CassandraDBAccessor.getOpt(cfg, USERNAME_CFG, Object::toString, null);
        this.password = CassandraDBAccessor.getOpt(cfg, PASSWORD_CFG, Object::toString, "");
        this.initializeCqlFactory();
    }

    private String getCqlFactoryName(Map<String, Object> cfg) {
        Object tmp = cfg.get(CQL_FACTORY_CFG);
        return tmp == null ? DefaultCqlFactory.class.getName() : tmp.toString();
    }

    private int getBatchParallelism(Map<String, Object> cfg) {
        Object tmp = cfg.get(CQL_PARALLEL_SCANS);
        int ret = tmp != null ? Integer.parseInt(tmp.toString()) : Math.max(2, Runtime.getRuntime().availableProcessors());
        Preconditions.checkArgument(ret >= 2, "Batch parallelism must be at least 2, got %s", ret);
        return ret;
    }

    private StringConverter<?> getStringConverter(Map<String, Object> cfg) {
        Object tmp = cfg.get(CQL_STRING_CONVERTER);
        StringConverter c = StringConverter.getDefault();
        if (tmp != null) {
            try {
                c = (StringConverter)Classpath.newInstance((String)tmp.toString(), StringConverter.class);
            }
            catch (Exception ex) {
                log.warn("Failed to instantiate type converter {}", tmp, (Object)ex);
            }
        }
        return c;
    }

    private ConsistencyLevel getConsistencyLevel(Map<String, Object> cfg) {
        return CassandraDBAccessor.getOpt(cfg, CONSISTENCY_LEVEL_CFG, ConsistencyLevel::valueOf, DEFAULT_CONSISTENCY_LEVEL);
    }

    private static <T> T getOpt(Map<String, Object> cfg, String name, UnaryFunction<String, T> map, T defval) {
        return (T)Optional.ofNullable(cfg.get(name)).map(Object::toString).map(arg_0 -> map.apply(arg_0)).orElse(defval);
    }

    private void initializeCqlFactory() {
        this.cqlFactory = ThreadLocal.withInitial(() -> {
            CqlFactory factory = (CqlFactory)Classpath.newInstance((String)this.cqlFactoryName, CqlFactory.class);
            factory.setup(this.getEntityDescriptor(), this.getUri(), this.converter);
            return factory;
        });
    }

    ResultSet execute(Statement statement) {
        statement.setConsistencyLevel(this.consistencyLevel);
        if (log.isDebugEnabled()) {
            if (statement instanceof BoundStatement) {
                BoundStatement s = (BoundStatement)statement;
                log.debug("Executing BoundStatement {}", (Object)s.preparedStatement().getQueryString());
            } else {
                log.debug("Executing {} {} with payload {}", new Object[]{statement.getClass().getSimpleName(), statement, statement.getOutgoingPayload()});
            }
        }
        return this.ensureSession().execute(statement);
    }

    ClusterHolder acquireCluster() {
        return new ClusterHolder(this.getCluster(this.getUri()));
    }

    private Cluster getCluster(URI uri) {
        Preconditions.checkArgument(!Strings.isNullOrEmpty(uri.getAuthority()), "Invalid authority in %s", uri);
        return this.getCluster(uri.getAuthority(), this.username);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Cluster getCluster(String authority, @Nullable String username) {
        String clusterCachedKey = (String)(username != null ? username + "@" : "") + authority;
        Map<String, Cluster> map = CLUSTER_MAP;
        synchronized (map) {
            Cluster cluster = CLUSTER_MAP.get(clusterCachedKey);
            if (cluster == null) {
                cluster = this.createCluster(authority);
                CLUSTER_MAP.put(clusterCachedKey, cluster);
            }
            return Objects.requireNonNull(cluster);
        }
    }

    @VisibleForTesting
    Cluster createCluster(String authority) {
        log.info("Creating cluster for authority {} in accessor {}", (Object)authority, (Object)this);
        return this.configureClusterBuilder(Cluster.builder(), authority).build();
    }

    @VisibleForTesting
    Cluster.Builder configureClusterBuilder(Cluster.Builder builder, String authority) {
        builder.addContactPointsWithPorts(Arrays.stream(authority.split(",")).map(CassandraDBAccessor::getAddress).collect(Collectors.toList()));
        if (this.username != null) {
            builder.withCredentials(this.username, this.password);
        }
        return builder;
    }

    @VisibleForTesting
    static InetSocketAddress getAddress(String p) {
        String[] parts = p.split(":", 2);
        if (parts.length != 2) {
            throw new IllegalArgumentException("Invalid hostport " + p);
        }
        return InetSocketAddress.createUnresolved(parts[0], Integer.parseInt(parts[1]));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    Session ensureSession() {
        Cluster cluster = this.getCluster(this.getUri());
        Preconditions.checkState(cluster != null);
        Session session = CLUSTER_SESSIONS.computeIfAbsent(cluster, Cluster::connect);
        if (session.isClosed()) {
            CassandraDBAccessor cassandraDBAccessor = this;
            synchronized (cassandraDBAccessor) {
                session = CLUSTER_SESSIONS.get(cluster);
                if (session.isClosed()) {
                    session = cluster.connect();
                    CLUSTER_SESSIONS.put(cluster, session);
                }
            }
        }
        Preconditions.checkState(!session.isClosed());
        return session;
    }

    public Optional<AttributeWriterBase> getWriter(Context context) {
        return Optional.of(this.newWriter());
    }

    public Optional<RandomAccessReader> getRandomAccessReader(Context context) {
        return Optional.of(this.newRandomReader());
    }

    public Optional<BatchLogReader> getBatchLogReader(Context context) {
        return Optional.of(this.newBatchReader(context));
    }

    @VisibleForTesting
    CassandraRandomReader newRandomReader() {
        return new CassandraRandomReader(this){

            @Override
            public void close() {
                super.close();
                CassandraDBAccessor.this.cqlFactory.remove();
            }
        };
    }

    @VisibleForTesting
    CassandraLogReader newBatchReader(Context context) {
        return new CassandraLogReader(this, () -> ((Context)context).getExecutorService()){

            @Override
            public ObserveHandle observe(List<Partition> partitions, List<AttributeDescriptor<?>> attributes, BatchLogObserver observer) {
                ObserveHandle handle = super.observe(partitions, attributes, observer);
                return () -> {
                    handle.close();
                    CassandraDBAccessor.this.cqlFactory.remove();
                };
            }
        };
    }

    @VisibleForTesting
    CassandraWriter newWriter() {
        return new CassandraWriter(this){

            @Override
            public void close() {
                super.close();
                CassandraDBAccessor.this.cqlFactory.remove();
            }
        };
    }

    @VisibleForTesting
    static void clear() {
        CLUSTER_REFERENCES.clear();
        CLUSTER_MAP.clear();
        CLUSTER_SESSIONS.clear();
    }

    String asString(Object value) {
        return this.converter.asString(value);
    }

    CqlFactory getCqlFactory() {
        return this.cqlFactory.get();
    }

    Object readResolve() throws ObjectStreamException {
        this.initializeCqlFactory();
        return this;
    }

    @Generated
    static Map<String, Cluster> getCLUSTER_MAP() {
        return CLUSTER_MAP;
    }

    @Generated
    int getBatchParallelism() {
        return this.batchParallelism;
    }

    @Generated
    ConsistencyLevel getConsistencyLevel() {
        return this.consistencyLevel;
    }

    @Nullable
    @Generated
    String getUsername() {
        return this.username;
    }

    @Nullable
    @Generated
    String getPassword() {
        return this.password;
    }

    class ClusterHolder
    implements AutoCloseable {
        private Cluster cluster;

        private ClusterHolder(Cluster cluster) {
            this.cluster = cluster;
            this.incrementClusterReference();
        }

        @Override
        public void close() {
            if (this.cluster != null) {
                this.decrementClusterReference();
                this.cluster = null;
            }
        }

        private void incrementClusterReference() {
            CLUSTER_REFERENCES.computeIfAbsent(this.cluster, tmp -> new AtomicInteger(0)).incrementAndGet();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void decrementClusterReference() {
            AtomicInteger references = CLUSTER_REFERENCES.get(this.cluster);
            log.debug("Decrementing reference of cluster {}, current count {}", (Object)this.cluster, (Object)references);
            if (references != null && references.decrementAndGet() == 0) {
                Map<String, Cluster> map = CLUSTER_MAP;
                synchronized (map) {
                    Optional.ofNullable(CLUSTER_SESSIONS.remove(this.cluster)).ifPresent(Session::close);
                    Optional.ofNullable(CLUSTER_MAP.remove(CassandraDBAccessor.this.getUri().getAuthority())).ifPresent(Cluster::close);
                    CLUSTER_REFERENCES.remove(this.cluster);
                    log.debug("Cluster {} closed", (Object)this.cluster);
                }
            }
        }

        @Generated
        Cluster getCluster() {
            return this.cluster;
        }
    }
}

