package peloton.persistence.cassandra;

import cats.effect.IO;
import cats.effect.IO$;
import cats.effect.std.AtomicCell;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import fs2.Compiler$;
import fs2.Compiler$Target$;
import fs2.Stream;
import fs2.Stream$;
import fs2.interop.flow.syntax$;
import fs2.interop.flow.syntax$PublisherOps$;
import java.nio.ByteBuffer;
import java.util.UUID;
import org.reactivestreams.FlowAdapters;
import peloton.persistence.EncodedEvent;
import peloton.persistence.EncodedEvent$;
import peloton.persistence.Event;
import peloton.persistence.EventStore;
import peloton.persistence.PayloadCodec;
import peloton.persistence.Retention;
import peloton.persistence.Snapshot;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Seq;
import scala.runtime.Arrays$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;
import scala.util.NotGiven$;

/* compiled from: EventStoreCassandra.scala */
/* loaded from: input_file:peloton/persistence/cassandra/EventStoreCassandra.class */
public class EventStoreCassandra implements EventStore {
    private final CqlSession cqlSession;
    private final AtomicCell<IO, Map<String, PreparedStatement>> statementCache;
    private final String replicationStrategy;
    private final int replicationFactor;

    public EventStoreCassandra(CqlSession cqlSession, AtomicCell<IO, Map<String, PreparedStatement>> atomicCell, String str, int i) {
        this.cqlSession = cqlSession;
        this.statementCache = atomicCell;
        this.replicationStrategy = str;
        this.replicationFactor = i;
    }

    public /* bridge */ /* synthetic */ Stream readEvents(String str, boolean z, PayloadCodec payloadCodec, PayloadCodec payloadCodec2) {
        return EventStore.readEvents$(this, str, z, payloadCodec, payloadCodec2);
    }

    public /* bridge */ /* synthetic */ IO writeEvent(String str, Event event, PayloadCodec payloadCodec) {
        return EventStore.writeEvent$(this, str, event, payloadCodec);
    }

    public /* bridge */ /* synthetic */ IO writeSnapshot(String str, Snapshot snapshot, Retention retention, PayloadCodec payloadCodec) {
        return EventStore.writeSnapshot$(this, str, snapshot, retention, payloadCodec);
    }

    public IO<BoxedUnit> create() {
        return execute(new StringBuilder(188).append("\n              create keyspace if not exists peloton \n              with replication = { \n                'class': '").append(this.replicationStrategy).append("', \n                'replication_factor' : ").append(this.replicationFactor).append("\n              }\n            ").toString(), ScalaRunTime$.MODULE$.genericWrapArray(new Object[0])).$greater$greater(this::create$$anonfun$1).$greater$greater(this::create$$anonfun$2);
    }

    public IO<BoxedUnit> drop() {
        return execute("drop table if exists peloton.eventstore", ScalaRunTime$.MODULE$.genericWrapArray(new Object[0])).$greater$greater(this::drop$$anonfun$1);
    }

    public IO<BoxedUnit> clear() {
        return execute("truncate table peloton.eventstore", ScalaRunTime$.MODULE$.genericWrapArray(new Object[0])).$greater$greater(this::clear$$anonfun$1);
    }

    private Stream<IO, EncodedEvent> readEventsSinceSnapshot(String str, String str2) {
        return query("\n        select payload, timestamp, is_snapshot\n        from peloton.eventstore \n        where persistence_id = ? and sequence_id >= ? \n        order by sequence_id\n      ", ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{str.toString(), UUID.fromString(str2)})).map(row -> {
            return EncodedEvent$.MODULE$.apply(row.getBytesUnsafe("payload").array(), row.getLong("timestamp"), row.getBoolean("is_snapshot"));
        });
    }

    private Stream<IO, EncodedEvent> readAllEvents(String str) {
        return query("\n           select payload, timestamp, is_snapshot\n           from peloton.eventstore \n           where persistence_id = ?\n           order by sequence_id\n       ", ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{str.toString()})).map(row -> {
            return EncodedEvent$.MODULE$.apply(row.getBytesUnsafe("payload").array(), row.getLong("timestamp"), row.getBoolean("is_snapshot"));
        });
    }

    public Stream<IO, EncodedEvent> readEncodedEvents(String str, boolean z) {
        return z ? Stream$.MODULE$.eval(getCurrentSnapshot(str)).flatMap(option -> {
            if (None$.MODULE$.equals(option)) {
                return readAllEvents(str);
            }
            if (option instanceof Some) {
                return readEventsSinceSnapshot(str, (String) ((Some) option).value());
            }
            throw new MatchError(option);
        }, NotGiven$.MODULE$.value()) : readAllEvents(str);
    }

    public IO<BoxedUnit> writeEncodedEvent(String str, EncodedEvent encodedEvent) {
        UUID timeBased = Uuids.timeBased();
        return encodedEvent.isSnapshot() ? execute("\n        insert into peloton.eventstore (\n          persistence_id,\n          sequence_id,\n          timestamp,\n          payload,\n          is_snapshot\n        ) values (?, ?, ?, ?, ?)\n        ", ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{str.toString(), timeBased, BoxesRunTime.boxToLong(encodedEvent.timestamp()), ByteBuffer.wrap(encodedEvent.payload()), BoxesRunTime.boxToBoolean(true)})).$greater$greater(() -> {
            return r1.writeEncodedEvent$$anonfun$1(r2, r3);
        }) : execute("\n        insert into peloton.eventstore (\n          persistence_id,\n          sequence_id,\n          timestamp,\n          payload,\n          is_snapshot\n        ) values (?, ?, ?, ?, ?)\n        ", ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{str.toString(), timeBased, BoxesRunTime.boxToLong(encodedEvent.timestamp()), ByteBuffer.wrap(encodedEvent.payload()), BoxesRunTime.boxToBoolean(false)}));
    }

    public IO<BoxedUnit> purge(String str, int i) {
        return getOldestSnapshot(str, i).flatMap(option -> {
            IO $greater$greater;
            if (None$.MODULE$.equals(option)) {
                $greater$greater = IO$.MODULE$.unit();
            } else {
                if (!(option instanceof Some)) {
                    throw new MatchError(option);
                }
                String str2 = (String) ((Some) option).value();
                $greater$greater = execute("\n                                    delete from peloton.eventstore \n                                    where persistence_id = ? and sequence_id < ?\n                                    ", ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{str.toString(), UUID.fromString(str2)})).$greater$greater(() -> {
                    return r1.purge$$anonfun$1$$anonfun$1(r2, r3);
                });
            }
            return $greater$greater.map(boxedUnit -> {
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            });
        });
    }

    private IO<Option<String>> getCurrentSnapshot(String str) {
        return getOldestSnapshot(str, 1);
    }

    private IO<Option<String>> getOldestSnapshot(String str, int i) {
        return (IO) query("\n        select sequence_id\n        from peloton.snapshots\n        where persistence_id = ?\n        order by sequence_id desc\n        limit ?\n      ", ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{str.toString(), BoxesRunTime.boxToInteger(i)})).map(row -> {
            return row.getUuid("sequence_id").toString();
        }).compile(Compiler$.MODULE$.target(Compiler$Target$.MODULE$.forConcurrent(IO$.MODULE$.asyncForIO()))).last();
    }

    private Stream<IO, PreparedStatement> prepare(String str) {
        return Stream$.MODULE$.eval(this.statementCache.modify(map -> {
            Some some = map.get(str);
            if (None$.MODULE$.equals(some)) {
                PreparedStatement prepare = this.cqlSession.prepare(str);
                return Tuple2$.MODULE$.apply(map.$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((String) Predef$.MODULE$.ArrowAssoc(str), prepare)), prepare);
            }
            if (!(some instanceof Some)) {
                throw new MatchError(some);
            }
            return Tuple2$.MODULE$.apply(map, (PreparedStatement) some.value());
        }));
    }

    private IO<BoxedUnit> execute(String str, Seq<Object> seq) {
        return (IO) prepare(str).evalMap(preparedStatement -> {
            return IO$.MODULE$.apply(() -> {
                return execute$$anonfun$1$$anonfun$1(r1, r2);
            });
        }).evalMap(boundStatement -> {
            return IO$.MODULE$.apply(() -> {
                return r1.execute$$anonfun$2$$anonfun$1(r2);
            });
        }).compile(Compiler$.MODULE$.target(Compiler$Target$.MODULE$.forConcurrent(IO$.MODULE$.asyncForIO()))).drain();
    }

    private Stream<IO, Row> query(String str, Seq<Object> seq) {
        return prepare(str).evalMap(preparedStatement -> {
            return IO$.MODULE$.apply(() -> {
                return query$$anonfun$1$$anonfun$1(r1, r2);
            });
        }).flatMap(boundStatement -> {
            return syntax$PublisherOps$.MODULE$.toStream$extension(syntax$.MODULE$.PublisherOps(FlowAdapters.toFlowPublisher(this.cqlSession.executeReactive(boundStatement))), 512, IO$.MODULE$.asyncForIO());
        }, NotGiven$.MODULE$.value());
    }

    private final IO create$$anonfun$1() {
        return execute("\n              create table if not exists peloton.eventstore (\n                persistence_id  text,\n                sequence_id     timeuuid,\n                is_snapshot     boolean,\n                timestamp       bigint,\n                payload         blob,\n\n                primary key (persistence_id, sequence_id)\n              ) with clustering order by (sequence_id asc)\n            ", ScalaRunTime$.MODULE$.genericWrapArray(new Object[0]));
    }

    private final IO create$$anonfun$2() {
        return execute("\n              create table if not exists peloton.snapshots (\n                persistence_id  text,\n                sequence_id     timeuuid,\n\n                primary key (persistence_id, sequence_id)\n              ) with clustering order by (sequence_id desc)\n            ", ScalaRunTime$.MODULE$.genericWrapArray(new Object[0]));
    }

    private final IO drop$$anonfun$1() {
        return execute("drop table if exists peloton.snapshots", ScalaRunTime$.MODULE$.genericWrapArray(new Object[0]));
    }

    private final IO clear$$anonfun$1() {
        return execute("truncate table peloton.eventstore", ScalaRunTime$.MODULE$.genericWrapArray(new Object[0]));
    }

    private final IO writeEncodedEvent$$anonfun$1(String str, UUID uuid) {
        return execute("\n        insert into peloton.snapshots (\n          persistence_id,\n          sequence_id\n        ) values (?, ?)\n        ", ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{str.toString(), uuid}));
    }

    private final IO purge$$anonfun$1$$anonfun$1(String str, String str2) {
        return execute("\n                                    delete from peloton.snapshots\n                                    where persistence_id = ? and sequence_id < ?\n                                    ", ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{str.toString(), UUID.fromString(str2)}));
    }

    private static final BoundStatement execute$$anonfun$1$$anonfun$1(PreparedStatement preparedStatement, Seq seq) {
        return preparedStatement.bind((Object[]) Arrays$.MODULE$.seqToArray(seq, Object.class));
    }

    private final ResultSet execute$$anonfun$2$$anonfun$1(BoundStatement boundStatement) {
        return this.cqlSession.execute(boundStatement);
    }

    private static final BoundStatement query$$anonfun$1$$anonfun$1(PreparedStatement preparedStatement, Seq seq) {
        return preparedStatement.bind((Object[]) Arrays$.MODULE$.seqToArray(seq, Object.class));
    }
}
