package dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.gap;

import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.PostgresqlEventStore;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.eventstream.AggregateType;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.eventstream.PersistedEvent;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.persistence.AggregateEventStreamConfiguration;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.subscription.jdbi.AggregateTypeArgumentFactory;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.subscription.jdbi.AggregateTypeColumnMapper;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.subscription.jdbi.SubscriberIdArgumentFactory;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.subscription.jdbi.SubscriberIdColumnMapper;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.transaction.EventStoreUnitOfWork;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.transaction.EventStoreUnitOfWorkFactory;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.types.GlobalEventOrder;
import dk.cloudcreate.essentials.components.foundation.postgresql.PostgresqlUtil;
import dk.cloudcreate.essentials.components.foundation.types.SubscriberId;
import dk.cloudcreate.essentials.shared.FailFast;
import dk.cloudcreate.essentials.shared.functional.tuple.Pair;
import dk.cloudcreate.essentials.types.LongRange;
import java.time.Clock;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.temporal.ChronoUnit;
import java.time.temporal.Temporal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jdbi.v3.core.statement.PreparedBatch;
import org.jdbi.v3.core.statement.Query;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dk/cloudcreate/essentials/components/eventsourced/eventstore/postgresql/gap/PostgresqlEventStreamGapHandler.class */
public final class PostgresqlEventStreamGapHandler<CONFIG extends AggregateEventStreamConfiguration> implements EventStreamGapHandler<CONFIG> {
    public static final String TRANSIENT_SUBSCRIBER_GAPS_TABLE_NAME = "transient_subscriber_gaps";
    private static final String TRANSIENT_SUBSCRIBER_GAPS_INDEX_NAME = "transient_subscriber_gaps_index";
    public static final String PERMANENT_GAPS_TABLE_NAME = "permanent_gaps";
    private final PostgresqlEventStore<CONFIG> postgresqlEventStore;
    private final EventStoreUnitOfWorkFactory<?> unitOfWorkFactory;
    private final ResolveTransientGapsToIncludeInQueryStrategy resolveTransientGapsToIncludeInQueryStrategy;
    private final ResolveTransientGapsToPermanentGapsPromotionStrategy resolveTransientGapsToPermanentGapsPromotionStrategy;
    private long refreshTransientGapsFromStorageEverySeconds;
    private static final Logger log = LoggerFactory.getLogger(PostgresqlEventStreamGapHandler.class);
    public static final List<GlobalEventOrder> NO_GAPS = List.of();

    /* loaded from: input_file:dk/cloudcreate/essentials/components/eventsourced/eventstore/postgresql/gap/PostgresqlEventStreamGapHandler$PostgresqlSubscriptionGapHandler.class */
    private class PostgresqlSubscriptionGapHandler implements SubscriptionGapHandler {
        private final SubscriberId subscriberId;
        private OffsetDateTime transientGapsLastRefreshedFromStorage;
        private ConcurrentMap<AggregateType, List<Pair<GlobalEventOrder, OffsetDateTime>>> allTransientGaps = new ConcurrentHashMap();

        public PostgresqlSubscriptionGapHandler(SubscriberId subscriberId) {
            this.subscriberId = (SubscriberId) FailFast.requireNonNull(subscriberId, "No subscriberId provided");
        }

        @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.gap.SubscriptionGapHandler
        public SubscriberId subscriberId() {
            return this.subscriberId;
        }

        @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.gap.SubscriptionGapHandler
        public List<GlobalEventOrder> findTransientGapsToIncludeInQuery(AggregateType aggregateType, LongRange longRange) {
            FailFast.requireNonNull(aggregateType, "No aggregateType provided");
            FailFast.requireNonNull(longRange, "No globalOrderQueryRange provided");
            getTransientGapsFor(aggregateType);
            List<Pair<GlobalEventOrder, OffsetDateTime>> list = this.allTransientGaps.get(aggregateType);
            return list.isEmpty() ? PostgresqlEventStreamGapHandler.NO_GAPS : PostgresqlEventStreamGapHandler.this.resolveTransientGapsToIncludeInQueryStrategy.resolveTransientGaps(aggregateType, longRange, Collections.unmodifiableList(list));
        }

        @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.gap.SubscriptionGapHandler
        public void reconcileGaps(AggregateType aggregateType, LongRange longRange, List<PersistedEvent> list, List<GlobalEventOrder> list2) {
            FailFast.requireNonNull(aggregateType, "No aggregateType provided");
            FailFast.requireNonNull(longRange, "No globalOrderQueryRange provided");
            FailFast.requireNonNull(list, "No persistedEvents provided");
            FailFast.requireNonNull(list2, "No transientGaps provided");
            PostgresqlEventStreamGapHandler.log.debug("[{}] Reconciling '{}' Gaps for query with globalOrderQueryRange: {},  persistedEvents size: {} and transientGaps size: {}", new Object[]{this.subscriberId, aggregateType, longRange, Integer.valueOf(list.size()), Integer.valueOf(list2.size())});
            Stream<R> map = list.stream().map((v0) -> {
                return v0.globalEventOrder();
            });
            Objects.requireNonNull(list2);
            List<GlobalEventOrder> list3 = (List) map.filter((v1) -> {
                return r1.contains(v1);
            }).collect(Collectors.toList());
            if ((list2.isEmpty() || list.isEmpty()) ? false : true) {
                deleteTransientGaps(aggregateType, list3);
            }
            if (!list.isEmpty()) {
                List list4 = (List) list.stream().map(persistedEvent -> {
                    return Long.valueOf(persistedEvent.globalEventOrder().longValue());
                }).collect(Collectors.toList());
                List<GlobalEventOrder> list5 = (List) LongRange.between(longRange.fromInclusive, ((Long) list4.stream().max((v0, v1) -> {
                    return v0.compareTo(v1);
                }).get()).longValue()).stream().boxed().filter(l -> {
                    return !list4.contains(l);
                }).map((v0) -> {
                    return GlobalEventOrder.of(v0);
                }).collect(Collectors.toList());
                Stream<GlobalEventOrder> permanentGapsFor = getPermanentGapsFor(aggregateType);
                Objects.requireNonNull(list5);
                List list6 = (List) permanentGapsFor.filter((v1) -> {
                    return r1.contains(v1);
                }).collect(Collectors.toList());
                if (list6.size() > 0) {
                    PostgresqlEventStreamGapHandler.log.debug("[{}] Removed {} permanent gaps among the newly discovered transient gaps for {}: {}", new Object[]{this.subscriberId, Integer.valueOf(list6.size()), aggregateType, list6});
                    list5.removeAll(list6);
                }
                if (PostgresqlEventStreamGapHandler.log.isDebugEnabled()) {
                    PostgresqlEventStreamGapHandler.log.debug("[{}] Detected {} New Transient '{}' gaps: {} based on persisted events: {}", new Object[]{this.subscriberId, Integer.valueOf(list5.size()), aggregateType, list5, list4});
                }
                addNewTransientGaps(aggregateType, list5);
            }
            PostgresqlEventStreamGapHandler.log.trace("[{}] Looking for Transient '{}' gaps that can be promoted to Permanent Gaps. All Transient Gaps: {}", new Object[]{this.subscriberId, aggregateType, this.allTransientGaps});
            promoteTransientGapsToPermanentGaps(aggregateType, PostgresqlEventStreamGapHandler.this.resolveTransientGapsToPermanentGapsPromotionStrategy.resolveTransientGapsReadyToBePromotedToPermanentGaps(aggregateType, this.allTransientGaps.get(aggregateType)));
        }

        private void promoteTransientGapsToPermanentGaps(AggregateType aggregateType, List<GlobalEventOrder> list) {
            if (list.isEmpty()) {
                return;
            }
            EventStoreUnitOfWork requiredUnitOfWork = PostgresqlEventStreamGapHandler.this.unitOfWorkFactory.getRequiredUnitOfWork();
            deleteTransientGaps(aggregateType, list);
            OffsetDateTime now = PostgresqlEventStreamGapHandler.now();
            PreparedBatch prepareBatch = requiredUnitOfWork.handle().prepareBatch("INSERT INTO permanent_gaps\n(aggregate_type, gap_global_event_order, added_timestamp) VALUES (:aggregate_type, :gap_global_event_order, :added_timestamp) ON CONFLICT DO NOTHING");
            Iterator<GlobalEventOrder> it = list.iterator();
            while (it.hasNext()) {
                prepareBatch.bind("aggregate_type", aggregateType).bind("gap_global_event_order", it.next()).bind("added_timestamp", now).add();
            }
            int orElse = Arrays.stream(prepareBatch.execute()).reduce(Integer::sum).orElse(0);
            if (orElse == list.size()) {
                PostgresqlEventStreamGapHandler.log.debug("[{}] Promoted {} Transient '{}' Gaps to be Permanent Gaps: {}", new Object[]{this.subscriberId, Integer.valueOf(list.size()), aggregateType, list});
            } else {
                PostgresqlEventStreamGapHandler.log.debug("[{}] Promoted {} out of {} Transient '{}' Gaps to be Permanent Gaps: {}", new Object[]{this.subscriberId, Integer.valueOf(orElse), Integer.valueOf(list.size()), aggregateType, list});
            }
        }

        private void addNewTransientGaps(AggregateType aggregateType, List<GlobalEventOrder> list) {
            if (list.isEmpty()) {
                return;
            }
            EventStoreUnitOfWork requiredUnitOfWork = PostgresqlEventStreamGapHandler.this.unitOfWorkFactory.getRequiredUnitOfWork();
            OffsetDateTime now = PostgresqlEventStreamGapHandler.now();
            internalGetTransientGapsFor(aggregateType).addAll((Collection) list.stream().map(globalEventOrder -> {
                return Pair.of(globalEventOrder, now);
            }).collect(Collectors.toList()));
            PreparedBatch prepareBatch = requiredUnitOfWork.handle().prepareBatch("INSERT INTO transient_subscriber_gaps\n(subscriber_id, aggregate_type, gap_global_event_order, first_discovered) VALUES (:subscriber_id, :aggregate_type, :gap_global_event_order, :first_discovered) ON CONFLICT DO NOTHING");
            Iterator<GlobalEventOrder> it = list.iterator();
            while (it.hasNext()) {
                prepareBatch.bind("subscriber_id", this.subscriberId).bind("aggregate_type", aggregateType).bind("gap_global_event_order", it.next()).bind("first_discovered", now).add();
            }
            int orElse = Arrays.stream(prepareBatch.execute()).reduce(Integer::sum).orElse(0);
            if (orElse == list.size()) {
                PostgresqlEventStreamGapHandler.log.debug("[{}] Added {} New Transient '{}' Gaps {}\nAll Transient '{}' Gaps: {}", new Object[]{this.subscriberId, Integer.valueOf(list.size()), aggregateType, list, aggregateType, this.allTransientGaps});
            } else {
                PostgresqlEventStreamGapHandler.log.warn("[{}] Added {} out of {} new Transient '{}' Gaps. Do you have multiple instances of the same subscriber '{}' running without using exclusive subscriptions?\nNew Transient Gaps to add: {}", new Object[]{this.subscriberId, Integer.valueOf(orElse), Integer.valueOf(list.size()), aggregateType, this.subscriberId, this.allTransientGaps});
            }
        }

        private void deleteTransientGaps(AggregateType aggregateType, List<GlobalEventOrder> list) {
            if (list.isEmpty()) {
                return;
            }
            EventStoreUnitOfWork requiredUnitOfWork = PostgresqlEventStreamGapHandler.this.unitOfWorkFactory.getRequiredUnitOfWork();
            this.allTransientGaps.put(aggregateType, (List) internalGetTransientGapsFor(aggregateType).stream().filter(pair -> {
                return !list.contains(pair._1);
            }).collect(Collectors.toList()));
            int execute = requiredUnitOfWork.handle().createUpdate("DELETE FROM transient_subscriber_gaps\n    WHERE aggregate_type = :aggregate_type and gap_global_event_order IN (<resolveTransientGaps>)").bind("aggregate_type", FailFast.requireNonNull(aggregateType, "No aggregateType provided")).bindList("resolveTransientGaps", list).execute();
            if (execute != list.size()) {
                PostgresqlEventStreamGapHandler.log.warn("[{}] Wanted to delete {} resolved Transient '{}' gaps, but was only able to delete {} transient gaps.\nDo you have multiple instances of the same subscriber '{}' running without using exclusive subscriptions?\nResolved Transient Gaps to delete: {}", new Object[]{this.subscriberId, Integer.valueOf(list.size()), aggregateType, Integer.valueOf(execute), this.subscriberId, list});
            } else {
                PostgresqlEventStreamGapHandler.log.debug("[{}] Deleted {} resolved Transient '{}' gaps. Resolved Transient Gaps deleted: {}\nAll Transient '{}' Gaps: {}", new Object[]{this.subscriberId, Integer.valueOf(list.size()), aggregateType, list, aggregateType, this.allTransientGaps});
            }
        }

        private List<Pair<GlobalEventOrder, OffsetDateTime>> internalGetTransientGapsFor(AggregateType aggregateType) {
            FailFast.requireNonNull(aggregateType, "No aggregateType provided");
            List<Pair<GlobalEventOrder, OffsetDateTime>> list = this.allTransientGaps.get(aggregateType);
            if (list == null || list.isEmpty() || this.transientGapsLastRefreshedFromStorage == null || ChronoUnit.SECONDS.between(this.transientGapsLastRefreshedFromStorage, PostgresqlEventStreamGapHandler.now()) >= PostgresqlEventStreamGapHandler.this.refreshTransientGapsFromStorageEverySeconds) {
                this.transientGapsLastRefreshedFromStorage = PostgresqlEventStreamGapHandler.now();
                this.allTransientGaps.put(aggregateType, PostgresqlEventStreamGapHandler.this.unitOfWorkFactory.getRequiredUnitOfWork().handle().createQuery("SELECT gap_global_event_order, first_discovered FROM transient_subscriber_gaps\n    WHERE aggregate_type = :aggregate_type and subscriber_id = :subscriber_id\n    ORDER BY gap_global_event_order ASC").bind("aggregate_type", FailFast.requireNonNull(aggregateType, "No aggregateType provided")).bind("subscriber_id", this.subscriberId).map((resultSet, statementContext) -> {
                    return Pair.of(GlobalEventOrder.of(resultSet.getLong("gap_global_event_order")), (OffsetDateTime) resultSet.getObject("first_discovered", OffsetDateTime.class));
                }).list());
            }
            return this.allTransientGaps.computeIfAbsent(aggregateType, aggregateType2 -> {
                return new ArrayList();
            });
        }

        @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.gap.SubscriptionGapHandler
        public List<GlobalEventOrder> resetTransientGapsFor(AggregateType aggregateType) {
            return (List) PostgresqlEventStreamGapHandler.this.unitOfWorkFactory.withUnitOfWork(eventStoreUnitOfWork -> {
                return eventStoreUnitOfWork.handle().createQuery("DELETE FROM transient_subscriber_gaps\n    WHERE aggregate_type = :aggregate_type and subscriber_id = :subscriber_id\n    RETURNING gap_global_event_order").bind("aggregate_type", FailFast.requireNonNull(aggregateType, "No aggregateType provided")).bind("subscriber_id", this.subscriberId).mapTo(GlobalEventOrder.class).list();
            });
        }

        @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.gap.SubscriptionGapHandler
        public List<GlobalEventOrder> getTransientGapsFor(AggregateType aggregateType) {
            return (List) PostgresqlEventStreamGapHandler.this.unitOfWorkFactory.withUnitOfWork(eventStoreUnitOfWork -> {
                return (List) internalGetTransientGapsFor(aggregateType).stream().map((v0) -> {
                    return v0._1();
                }).collect(Collectors.toList());
            });
        }

        @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.gap.SubscriptionGapHandler
        public Stream<GlobalEventOrder> getPermanentGapsFor(AggregateType aggregateType) {
            return PostgresqlEventStreamGapHandler.this.unitOfWorkFactory.getRequiredUnitOfWork().handle().createQuery("SELECT gap_global_event_order FROM permanent_gaps\n    WHERE aggregate_type = :aggregate_type\n    ORDER BY gap_global_event_order ASC").bind("aggregate_type", FailFast.requireNonNull(aggregateType, "No aggregateType provided")).mapTo(GlobalEventOrder.class).stream();
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj.getClass().equals(PostgresqlSubscriptionGapHandler.class)) {
                return this.subscriberId.equals(((PostgresqlSubscriptionGapHandler) obj).subscriberId);
            }
            return false;
        }

        public int hashCode() {
            return Objects.hash(this.subscriberId);
        }

        public String toString() {
            return "PostgresqlSubscriptionGapHandler{subscriberId=" + this.subscriberId + ", transientGapsLastRefreshedFromStorage=" + this.transientGapsLastRefreshedFromStorage + ", transientGaps(#" + this.allTransientGaps.size() + ")=" + this.allTransientGaps + "}";
        }
    }

    @FunctionalInterface
    /* loaded from: input_file:dk/cloudcreate/essentials/components/eventsourced/eventstore/postgresql/gap/PostgresqlEventStreamGapHandler$ResolveTransientGapsToIncludeInQueryStrategy.class */
    public interface ResolveTransientGapsToIncludeInQueryStrategy {
        List<GlobalEventOrder> resolveTransientGaps(AggregateType aggregateType, LongRange longRange, List<Pair<GlobalEventOrder, OffsetDateTime>> list);
    }

    @FunctionalInterface
    /* loaded from: input_file:dk/cloudcreate/essentials/components/eventsourced/eventstore/postgresql/gap/PostgresqlEventStreamGapHandler$ResolveTransientGapsToPermanentGapsPromotionStrategy.class */
    public interface ResolveTransientGapsToPermanentGapsPromotionStrategy {
        List<GlobalEventOrder> resolveTransientGapsReadyToBePromotedToPermanentGaps(AggregateType aggregateType, List<Pair<GlobalEventOrder, OffsetDateTime>> list);

        static ResolveTransientGapsToPermanentGapsPromotionStrategy thresholdBased(int i) {
            FailFast.requireTrue(i > 0, "permanentGapThresholdInSeconds must be > 0");
            return (aggregateType, list) -> {
                r0 = PostgresqlEventStreamGapHandler.now();
                return (List) list.stream().filter(pair -> {
                    long between = ChronoUnit.SECONDS.between((Temporal) pair._2, r8);
                    if (PostgresqlEventStreamGapHandler.log.isTraceEnabled()) {
                        PostgresqlEventStreamGapHandler.log.trace("{} seconds since '{}' Transient Gap with GlobalOrder {} was first discovered", new Object[]{Long.valueOf(between), aggregateType, pair._1});
                    }
                    return between > ((long) i);
                }).map((v0) -> {
                    return v0._1();
                }).collect(Collectors.toList());
            };
        }
    }

    public PostgresqlEventStreamGapHandler(PostgresqlEventStore<CONFIG> postgresqlEventStore, EventStoreUnitOfWorkFactory<?> eventStoreUnitOfWorkFactory) {
        this(postgresqlEventStore, eventStoreUnitOfWorkFactory, Duration.ofSeconds(60L), (aggregateType, longRange, list) -> {
            int min = Math.min(list.size(), 2);
            return min > 0 ? (List) list.subList(0, min).stream().map((v0) -> {
                return v0._1();
            }).collect(Collectors.toList()) : NO_GAPS;
        }, ResolveTransientGapsToPermanentGapsPromotionStrategy.thresholdBased(120));
    }

    public PostgresqlEventStreamGapHandler(PostgresqlEventStore<CONFIG> postgresqlEventStore, EventStoreUnitOfWorkFactory<?> eventStoreUnitOfWorkFactory, Duration duration, ResolveTransientGapsToIncludeInQueryStrategy resolveTransientGapsToIncludeInQueryStrategy, ResolveTransientGapsToPermanentGapsPromotionStrategy resolveTransientGapsToPermanentGapsPromotionStrategy) {
        this.postgresqlEventStore = (PostgresqlEventStore) FailFast.requireNonNull(postgresqlEventStore, "No postgresqlEventStore provided");
        this.unitOfWorkFactory = (EventStoreUnitOfWorkFactory) FailFast.requireNonNull(eventStoreUnitOfWorkFactory, "No unitOfWorkFactory provided");
        this.refreshTransientGapsFromStorageEverySeconds = ((Duration) FailFast.requireNonNull(duration, "No refreshTransientGapsFromStorageInterval provided")).toSeconds();
        this.resolveTransientGapsToIncludeInQueryStrategy = (ResolveTransientGapsToIncludeInQueryStrategy) FailFast.requireNonNull(resolveTransientGapsToIncludeInQueryStrategy, "No resolveTransientGapsToIncludeInQuery provided");
        this.resolveTransientGapsToPermanentGapsPromotionStrategy = (ResolveTransientGapsToPermanentGapsPromotionStrategy) FailFast.requireNonNull(resolveTransientGapsToPermanentGapsPromotionStrategy, "No resolveTransientGapsToPermanentGapsPromotionStrategy provided");
        createGapHandlingTablesAndIndexes();
    }

    private void createGapHandlingTablesAndIndexes() {
        PostgresqlUtil.checkIsValidTableOrColumnName(TRANSIENT_SUBSCRIBER_GAPS_TABLE_NAME);
        PostgresqlUtil.checkIsValidTableOrColumnName(TRANSIENT_SUBSCRIBER_GAPS_INDEX_NAME);
        PostgresqlUtil.checkIsValidTableOrColumnName(PERMANENT_GAPS_TABLE_NAME);
        this.unitOfWorkFactory.usingUnitOfWork(eventStoreUnitOfWork -> {
            eventStoreUnitOfWork.handle().getJdbi().registerArgument(new AggregateTypeArgumentFactory());
            eventStoreUnitOfWork.handle().getJdbi().registerColumnMapper(new AggregateTypeColumnMapper());
            eventStoreUnitOfWork.handle().getJdbi().registerArgument(new SubscriberIdArgumentFactory());
            eventStoreUnitOfWork.handle().getJdbi().registerColumnMapper(new SubscriberIdColumnMapper());
            log.info("{} '{}'", eventStoreUnitOfWork.handle().execute("CREATE TABLE IF NOT EXISTS transient_subscriber_gaps (\n   subscriber_id text NOT NULL,\n   aggregate_type text NOT NULL,\n   gap_global_event_order bigint NOT NULL\n,   first_discovered TIMESTAMP WITH TIME ZONE NOT NULL\n,   PRIMARY KEY (subscriber_id, aggregate_type, gap_global_event_order)\n)", new Object[0]) == 1 ? "Created table: " : "Table already exists: ", TRANSIENT_SUBSCRIBER_GAPS_TABLE_NAME);
            log.info("{} '{}'", eventStoreUnitOfWork.handle().execute("CREATE INDEX IF NOT EXISTS transient_subscriber_gaps_index ON \ntransient_subscriber_gaps(subscriber_id, aggregate_type)", new Object[0]) == 1 ? "Created index: " : "Index already exists: ", TRANSIENT_SUBSCRIBER_GAPS_INDEX_NAME);
            log.info("{} '{}'", eventStoreUnitOfWork.handle().execute("CREATE TABLE IF NOT EXISTS permanent_gaps (\n   aggregate_type text NOT NULL,\n   gap_global_event_order bigint NOT NULL\n,   added_timestamp TIMESTAMP WITH TIME ZONE NOT NULL,   PRIMARY KEY (aggregate_type, gap_global_event_order)\n)", new Object[0]) == 1 ? "Created table: " : "Table already exists: ", PERMANENT_GAPS_TABLE_NAME);
        });
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.gap.EventStreamGapHandler
    public SubscriptionGapHandler gapHandlerFor(SubscriberId subscriberId) {
        return new PostgresqlSubscriptionGapHandler(subscriberId);
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.gap.EventStreamGapHandler
    public List<GlobalEventOrder> resetPermanentGapsFor(AggregateType aggregateType) {
        List<GlobalEventOrder> list = (List) this.unitOfWorkFactory.withUnitOfWork(eventStoreUnitOfWork -> {
            return eventStoreUnitOfWork.handle().createQuery("DELETE FROM permanent_gaps WHERE aggregate_type = :aggregate_type RETURNING gap_global_event_order").bind("aggregate_type", FailFast.requireNonNull(aggregateType, "No aggregateType provided")).mapTo(GlobalEventOrder.class).list();
        });
        log.info("[{}] Removed {} Permanent Gap(s) with GlobalEventOrder: {}", new Object[]{aggregateType, Integer.valueOf(list.size()), list});
        return list;
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.gap.EventStreamGapHandler
    public List<GlobalEventOrder> resetPermanentGapsFor(AggregateType aggregateType, LongRange longRange) {
        FailFast.requireNonNull(longRange, "No resetForThisSpecificGlobalEventOrdersRange provided");
        List<GlobalEventOrder> list = (List) this.unitOfWorkFactory.withUnitOfWork(eventStoreUnitOfWork -> {
            Query bind = eventStoreUnitOfWork.handle().createQuery((longRange.isOpenRange() ? "DELETE FROM permanent_gaps WHERE aggregate_type = :aggregate_type \n" + " AND gap_global_event_order >= :gap_global_event_order_from_and_including" : "DELETE FROM permanent_gaps WHERE aggregate_type = :aggregate_type \n" + " AND gap_global_event_order >= :gap_global_event_order_from_and_including AND gap_global_event_order <= :gap_global_event_order_to_and_including") + "  RETURNING gap_global_event_order").bind("aggregate_type", FailFast.requireNonNull(aggregateType, "No aggregateType provided"));
            if (longRange.isOpenRange()) {
                bind.bind("gap_global_event_order_from_and_including", longRange.fromInclusive);
            } else {
                bind.bind("gap_global_event_order_from_and_including", longRange.fromInclusive);
                bind.bind("gap_global_event_order_to_and_including", longRange.toInclusive);
            }
            return bind.mapTo(GlobalEventOrder.class).list();
        });
        log.info("[{}] Removed {} Permanent Gap(s), according to reset range {}, with GlobalEventOrder: {}", new Object[]{aggregateType, Integer.valueOf(list.size()), longRange, list});
        return list;
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.gap.EventStreamGapHandler
    public List<GlobalEventOrder> resetPermanentGapsFor(AggregateType aggregateType, List<GlobalEventOrder> list) {
        FailFast.requireNonNull(list, "No resetForTheseSpecificGlobalEventOrders provided");
        if (list.isEmpty()) {
            return list;
        }
        List<GlobalEventOrder> list2 = (List) this.unitOfWorkFactory.withUnitOfWork(eventStoreUnitOfWork -> {
            return eventStoreUnitOfWork.handle().createQuery("DELETE FROM permanent_gaps WHERE aggregate_type = :aggregate_type \n AND gap_global_event_order IN (<globalEventOrders>)\n  RETURNING gap_global_event_order").bind("aggregate_type", FailFast.requireNonNull(aggregateType, "No aggregateType provided")).bindList("globalEventOrders", list).mapTo(GlobalEventOrder.class).list();
        });
        log.info("[{}] Removed {} Permanent Gap(s), according to reset list {}, with GlobalEventOrder: {}", new Object[]{aggregateType, Integer.valueOf(list2.size()), list, list2});
        return list2;
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.gap.EventStreamGapHandler
    public Stream<GlobalEventOrder> getPermanentGapsFor(AggregateType aggregateType) {
        return getPermanentGapsAsLongFor(aggregateType).map((v0) -> {
            return GlobalEventOrder.of(v0);
        });
    }

    private Stream<Long> getPermanentGapsAsLongFor(AggregateType aggregateType) {
        return (Stream) this.unitOfWorkFactory.withUnitOfWork(eventStoreUnitOfWork -> {
            return eventStoreUnitOfWork.handle().createQuery("SELECT gap_global_event_order FROM permanent_gaps WHERE aggregate_type = :aggregate_type").bind("aggregate_type", FailFast.requireNonNull(aggregateType, "No aggregateType provided")).mapTo(Long.class).stream();
        });
    }

    private static OffsetDateTime now() {
        return OffsetDateTime.now(Clock.systemUTC());
    }
}
