package dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.subscription;

import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.eventstream.AggregateType;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.subscription.jdbi.AggregateTypeArgumentFactory;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.subscription.jdbi.AggregateTypeColumnMapper;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.subscription.jdbi.SubscriberIdArgumentFactory;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.subscription.jdbi.SubscriberIdColumnMapper;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.types.GlobalEventOrder;
import dk.cloudcreate.essentials.components.foundation.postgresql.PostgresqlUtil;
import dk.cloudcreate.essentials.components.foundation.transaction.jdbi.HandleAwareUnitOfWorkFactory;
import dk.cloudcreate.essentials.components.foundation.types.SubscriberId;
import dk.cloudcreate.essentials.shared.FailFast;
import dk.cloudcreate.essentials.shared.MessageFormatter;
import java.time.Clock;
import java.time.OffsetDateTime;
import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;
import org.jdbi.v3.core.Jdbi;
import org.jdbi.v3.core.statement.PreparedBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dk/cloudcreate/essentials/components/eventsourced/eventstore/postgresql/subscription/PostgresqlDurableSubscriptionRepository.class */
public final class PostgresqlDurableSubscriptionRepository implements DurableSubscriptionRepository {
    private static final Logger log = LoggerFactory.getLogger(PostgresqlDurableSubscriptionRepository.class);
    public static final String DEFAULT_DURABLE_SUBSCRIPTIONS_TABLE_NAME = "durable_subscriptions";
    private final Jdbi jdbi;
    private final String durableSubscriptionsTableName;
    private final HandleAwareUnitOfWorkFactory<?> unitOfWorkFactory;

    public PostgresqlDurableSubscriptionRepository(Jdbi jdbi, HandleAwareUnitOfWorkFactory<?> handleAwareUnitOfWorkFactory) {
        this(jdbi, handleAwareUnitOfWorkFactory, DEFAULT_DURABLE_SUBSCRIPTIONS_TABLE_NAME);
    }

    public PostgresqlDurableSubscriptionRepository(Jdbi jdbi, HandleAwareUnitOfWorkFactory<?> handleAwareUnitOfWorkFactory, String str) {
        this.jdbi = (Jdbi) FailFast.requireNonNull(jdbi, "No Jdbi instance provided");
        this.unitOfWorkFactory = (HandleAwareUnitOfWorkFactory) FailFast.requireNonNull(handleAwareUnitOfWorkFactory, "No unitOfWorkFactory instance provided");
        this.durableSubscriptionsTableName = ((String) FailFast.requireNonNull(str, "No durableSubscriptionsTableName provided")).toLowerCase();
        PostgresqlUtil.checkIsValidTableOrColumnName(this.durableSubscriptionsTableName);
        log.info("Using durableSubscriptionsTableName: '{}'", this.durableSubscriptionsTableName);
        jdbi.registerArgument(new AggregateTypeArgumentFactory());
        jdbi.registerColumnMapper(new AggregateTypeColumnMapper());
        jdbi.registerArgument(new SubscriberIdArgumentFactory());
        jdbi.registerColumnMapper(new SubscriberIdColumnMapper());
        handleAwareUnitOfWorkFactory.usingUnitOfWork(handleAwareUnitOfWork -> {
            handleAwareUnitOfWork.handle().execute("CREATE TABLE IF NOT EXISTS " + this.durableSubscriptionsTableName + " (\nsubscriber_id TEXT NOT NULL,\naggregate_type TEXT NOT NULL,\nresume_from_and_including_global_eventorder bigint,\nlast_updated TIMESTAMP WITH TIME ZONE,\nPRIMARY KEY (subscriber_id, aggregate_type))", new Object[0]);
            log.info("Ensured '{}' table exists", str);
        });
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.subscription.DurableSubscriptionRepository
    public SubscriptionResumePoint createResumePoint(SubscriberId subscriberId, AggregateType aggregateType, GlobalEventOrder globalEventOrder) {
        FailFast.requireNonNull(subscriberId, "No subscriberId value provided");
        FailFast.requireNonNull(aggregateType, "No forAggregateType value provided");
        FailFast.requireNonNull(globalEventOrder, "No onFirstSubscriptionSubscribeFromAndIncludingGlobalOrder value provided");
        return (SubscriptionResumePoint) this.unitOfWorkFactory.withUnitOfWork(handleAwareUnitOfWork -> {
            OffsetDateTime now = OffsetDateTime.now(Clock.systemUTC());
            if (handleAwareUnitOfWork.handle().createUpdate("INSERT INTO " + this.durableSubscriptionsTableName + " (\naggregate_type, subscriber_id, resume_from_and_including_global_eventorder, last_updated)\nVALUES (:aggregate_type, :subscriber_id, :resume_from_and_including_global_eventorder, :last_updated) ON CONFLICT DO NOTHING").bind("aggregate_type", aggregateType).bind("subscriber_id", subscriberId).bind("resume_from_and_including_global_eventorder", globalEventOrder).bind("last_updated", now).execute() == 0) {
                throw new IllegalStateException(MessageFormatter.msg("Cannot createResumePoint for subscriberId '{}' and aggregateType '{}' since a ResumePoint already exists", new Object[]{subscriberId, aggregateType}));
            }
            SubscriptionResumePoint subscriptionResumePoint = new SubscriptionResumePoint(subscriberId, aggregateType, globalEventOrder, now);
            log.debug("Created {}", subscriptionResumePoint);
            return subscriptionResumePoint;
        });
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.subscription.DurableSubscriptionRepository
    public void deleteResumePoint(SubscriberId subscriberId, AggregateType aggregateType) {
        FailFast.requireNonNull(aggregateType, "No aggregateType value provided");
        FailFast.requireNonNull(subscriberId, "No subscriberId value provided");
        if (((Integer) this.unitOfWorkFactory.withUnitOfWork(handleAwareUnitOfWork -> {
            return Integer.valueOf(handleAwareUnitOfWork.handle().createUpdate("DELETE FROM " + this.durableSubscriptionsTableName + " WHERE aggregate_type = :aggregate_type AND subscriber_id = :subscriber_id").bind("aggregate_type", aggregateType).bind("subscriber_id", subscriberId).execute());
        })).intValue() == 1) {
            log.debug("Deleted ResumePoint for subscriberId: '{}' and aggregateType '{}'", subscriberId, aggregateType);
        } else {
            log.debug("Didn't find ResumePoint to Delete for subscriberId: '{}' and aggregateType '{}'", subscriberId, aggregateType);
        }
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.subscription.DurableSubscriptionRepository
    public Optional<SubscriptionResumePoint> getResumePoint(SubscriberId subscriberId, AggregateType aggregateType) {
        FailFast.requireNonNull(aggregateType, "No forAggregateType value provided");
        FailFast.requireNonNull(subscriberId, "No subscriberId value provided");
        Optional<SubscriptionResumePoint> optional = (Optional) this.unitOfWorkFactory.withUnitOfWork(handleAwareUnitOfWork -> {
            return handleAwareUnitOfWork.handle().createQuery("SELECT resume_from_and_including_global_eventorder, last_updated FROM " + this.durableSubscriptionsTableName + " WHERE aggregate_type = :aggregate_type AND subscriber_id = :subscriber_id").bind("aggregate_type", aggregateType).bind("subscriber_id", subscriberId).map((resultSet, statementContext) -> {
                return new SubscriptionResumePoint(subscriberId, aggregateType, GlobalEventOrder.of(resultSet.getLong("resume_from_and_including_global_eventorder")), (OffsetDateTime) resultSet.getObject("last_updated", OffsetDateTime.class));
            }).findOne();
        });
        if (optional.isPresent()) {
            log.debug("Found {}", optional.get());
        } else {
            log.debug("Didn't find ResumePoint for subscriberId: '{}' and aggregateType '{}'", subscriberId, aggregateType);
        }
        return optional;
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.subscription.DurableSubscriptionRepository
    public void saveResumePoints(Collection<SubscriptionResumePoint> collection) {
        FailFast.requireNonNull(collection, "No resumePoints provided");
        if (collection.isEmpty()) {
            log.trace("No resumePoints to save");
        } else {
            log.trace("Received {} ResumePoints to save: {}", Integer.valueOf(collection.size()), collection);
            this.unitOfWorkFactory.usingUnitOfWork(handleAwareUnitOfWork -> {
                OffsetDateTime now = OffsetDateTime.now(Clock.systemUTC());
                PreparedBatch prepareBatch = handleAwareUnitOfWork.handle().prepareBatch("UPDATE " + this.durableSubscriptionsTableName + " SET resume_from_and_including_global_eventorder = :resume_from_and_including_global_eventorder, last_updated = :last_updated  WHERE aggregate_type = :aggregate_type AND subscriber_id = :subscriber_id");
                collection.forEach(subscriptionResumePoint -> {
                    if (!subscriptionResumePoint.isChanged()) {
                        log.debug("Wont save Unchanged {}", subscriptionResumePoint);
                    } else {
                        log.debug("Saving {}", subscriptionResumePoint);
                        prepareBatch.bind("aggregate_type", subscriptionResumePoint.getAggregateType()).bind("subscriber_id", subscriptionResumePoint.getSubscriberId()).bind("resume_from_and_including_global_eventorder", subscriptionResumePoint.getResumeFromAndIncluding()).bind("last_updated", now).add();
                    }
                });
                int size = prepareBatch.size();
                int orElse = Arrays.stream(prepareBatch.execute()).reduce(Integer::sum).orElse(0);
                collection.forEach(subscriptionResumePoint2 -> {
                    if (subscriptionResumePoint2.isChanged()) {
                        subscriptionResumePoint2.setLastUpdated(now);
                    }
                });
                if (log.isTraceEnabled()) {
                    log.trace("Saved {} resumePoints out of {} resulting in {} updated rows: {}", new Object[]{Integer.valueOf(size), Integer.valueOf(collection.size()), Integer.valueOf(orElse), collection});
                } else {
                    log.debug("Saved {} resumePoints out of {} resulting in {} updated rows", new Object[]{Integer.valueOf(size), Integer.valueOf(collection.size()), Integer.valueOf(orElse)});
                }
            });
        }
    }
}
