/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsoftware.elasticactors.cassandra2.state;

import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ProtocolVersion;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.elasticsoftware.elasticactors.cassandra.common.state.PersistentActorUpdateEvent;
import org.elasticsoftware.elasticactors.cassandra2.util.ExecutionUtils;
import org.elasticsoftware.elasticactors.util.concurrent.ThreadBoundEventProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class PersistentActorUpdateEventProcessor
implements ThreadBoundEventProcessor<PersistentActorUpdateEvent> {
    private static final Logger logger = LoggerFactory.getLogger(PersistentActorUpdateEventProcessor.class);
    public static final String INSERT_QUERY = "INSERT INTO \"PersistentActors\" (key, key2, column1, value) VALUES (?, ?, ?, ?)";
    public static final String DELETE_QUERY = "DELETE FROM \"PersistentActors\" where key = ? AND key2 = ? AND column1 = ?";
    private final Session cassandraSession;
    private final PreparedStatement insertStatement;
    private final PreparedStatement deleteStatement;
    private final Map<Integer, PreparedStatement> batchStatements = new HashMap<Integer, PreparedStatement>();
    private final boolean optimizedV1Batches;

    public PersistentActorUpdateEventProcessor(Session cassandraSession, int maxBatchSize) {
        this(cassandraSession, maxBatchSize, true);
    }

    public PersistentActorUpdateEventProcessor(Session cassandraSession, int maxBatchSize, boolean optimizedV1Batches) {
        this.cassandraSession = cassandraSession;
        this.insertStatement = cassandraSession.prepare(INSERT_QUERY);
        this.deleteStatement = cassandraSession.prepare(DELETE_QUERY);
        if (optimizedV1Batches) {
            this.prepareBatchIfNeeded(maxBatchSize);
        }
        this.optimizedV1Batches = optimizedV1Batches;
    }

    private void prepareBatchIfNeeded(int maxBatchSize) {
        ProtocolVersion protocolVersion = this.cassandraSession.getCluster().getConfiguration().getProtocolOptions().getProtocolVersion();
        if (ProtocolVersion.V1.equals((Object)protocolVersion)) {
            for (int batchSize = 2; batchSize <= maxBatchSize; ++batchSize) {
                StringBuilder batchBuilder = new StringBuilder("BEGIN UNLOGGED BATCH ");
                for (int i = 0; i < batchSize; ++i) {
                    batchBuilder.append("   ").append(INSERT_QUERY).append("; ");
                }
                batchBuilder.append("APPLY BATCH");
                this.batchStatements.put(batchSize, this.cassandraSession.prepare(batchBuilder.toString()));
            }
        }
    }

    public void process(PersistentActorUpdateEvent event) {
        this.process(Collections.singletonList(event));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void process(List<PersistentActorUpdateEvent> events) {
        block17: {
            Exception executionException = null;
            long startTime = logger.isTraceEnabled() ? System.nanoTime() : 0L;
            try {
                if (events.size() == 1) {
                    PersistentActorUpdateEvent event = events.get(0);
                    BoundStatement boundStatement = event.getPersistentActorBytes() != null ? this.insertStatement.bind(new Object[]{event.getRowKey()[0], event.getRowKey()[1], event.getPersistentActorId(), event.getPersistentActorBytes()}) : this.deleteStatement.bind(new Object[]{event.getRowKey()[0], event.getRowKey()[1], event.getPersistentActorId()});
                    ExecutionUtils.executeWithRetry(this.cassandraSession, (Statement)boundStatement, logger);
                } else {
                    ProtocolVersion protocolVersion = this.cassandraSession.getCluster().getConfiguration().getProtocolOptions().getProtocolVersion();
                    if (ProtocolVersion.V1.equals((Object)protocolVersion)) {
                        if (this.optimizedV1Batches) {
                            this.executeBatchV1Optimized(events);
                        } else {
                            this.executeBatchV1(events);
                        }
                    } else {
                        this.executeBatchV2AndUp(events);
                    }
                }
            }
            catch (Exception e) {
                executionException = e;
                break block17;
            }
            finally {
                for (PersistentActorUpdateEvent event : events) {
                    if (event.getEventListener() == null) continue;
                    if (executionException == null) {
                        event.getEventListener().onDone(event.getMessage());
                        continue;
                    }
                    event.getEventListener().onError(event.getMessage(), (Throwable)executionException);
                }
                if (logger.isTraceEnabled()) {
                    long duration = TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startTime);
                    logger.trace("Updating {} Actor state entrie(s) took {} microsecs", (Object)events.size(), (Object)duration);
                }
            }
            for (PersistentActorUpdateEvent event : events) {
                if (event.getEventListener() == null) continue;
                if (executionException == null) {
                    event.getEventListener().onDone(event.getMessage());
                    continue;
                }
                event.getEventListener().onError(event.getMessage(), (Throwable)executionException);
            }
            if (logger.isTraceEnabled()) {
                long duration = TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startTime);
                logger.trace("Updating {} Actor state entrie(s) took {} microsecs", (Object)events.size(), (Object)duration);
            }
        }
    }

    private void executeBatchV1(List<PersistentActorUpdateEvent> events) {
        LinkedList<Object> arguments = new LinkedList<Object>();
        StringBuilder batchBuilder = new StringBuilder("BEGIN UNLOGGED BATCH ");
        for (PersistentActorUpdateEvent event : events) {
            batchBuilder.append("   ");
            if (event.getPersistentActorBytes() != null) {
                batchBuilder.append(INSERT_QUERY);
                arguments.add(event.getRowKey()[0]);
                arguments.add(event.getRowKey()[1]);
                arguments.add(event.getPersistentActorId());
                arguments.add(event.getPersistentActorBytes());
            } else {
                batchBuilder.append(DELETE_QUERY);
                arguments.add(event.getRowKey()[0]);
                arguments.add(event.getRowKey()[1]);
                arguments.add(event.getPersistentActorId());
            }
            batchBuilder.append("; ");
        }
        batchBuilder.append("APPLY BATCH");
        PreparedStatement batchStatement = this.cassandraSession.prepare(batchBuilder.toString());
        ExecutionUtils.executeWithRetry(this.cassandraSession, (Statement)batchStatement.bind(arguments.toArray()), logger);
    }

    private void executeBatchV1Optimized(List<PersistentActorUpdateEvent> events) {
        LinkedList<Object> arguments = new LinkedList<Object>();
        int batchSize = 0;
        for (PersistentActorUpdateEvent event : events) {
            if (event.getPersistentActorBytes() == null) break;
            arguments.add(event.getRowKey()[0]);
            arguments.add(event.getRowKey()[1]);
            arguments.add(event.getPersistentActorId());
            arguments.add(event.getPersistentActorBytes());
            ++batchSize;
        }
        PreparedStatement batchStatement = null;
        if (batchSize == events.size()) {
            batchStatement = this.batchStatements.get(batchSize);
        }
        if (batchStatement != null) {
            ExecutionUtils.executeWithRetry(this.cassandraSession, (Statement)batchStatement.bind(arguments.toArray()), logger);
        } else {
            this.executeBatchV1(events);
        }
    }

    private void executeBatchV2AndUp(List<PersistentActorUpdateEvent> events) {
        BatchStatement batchStatement = new BatchStatement(BatchStatement.Type.UNLOGGED);
        for (PersistentActorUpdateEvent event : events) {
            if (event.getPersistentActorBytes() != null) {
                batchStatement.add((Statement)this.insertStatement.bind(new Object[]{event.getRowKey()[0], event.getRowKey()[1], event.getPersistentActorId(), event.getPersistentActorBytes()}));
                continue;
            }
            batchStatement.add((Statement)this.deleteStatement.bind(new Object[]{event.getRowKey()[0], event.getRowKey()[1], event.getPersistentActorId()}));
        }
        ExecutionUtils.executeWithRetry(this.cassandraSession, (Statement)batchStatement, logger);
    }
}

