package io.es4j.core.tasks;

import io.es4j.AsyncProjection;
import io.es4j.core.objects.AggregateEvent;
import io.es4j.core.objects.Offset;
import io.es4j.core.objects.OffsetKey;
import io.es4j.infrastructure.EventStore;
import io.es4j.infrastructure.OffsetStore;
import io.es4j.infrastructure.misc.EventParser;
import io.es4j.infrastructure.models.Event;
import io.es4j.infrastructure.models.EventStream;
import io.es4j.infrastructure.models.EventStreamBuilder;
import io.es4j.task.CronTask;
import io.es4j.task.CronTaskConfiguration;
import io.es4j.task.CronTaskConfigurationBuilder;
import io.es4j.task.LockLevel;
import io.smallrye.mutiny.Uni;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/es4j/core/tasks/EventProjectionPoller.class */
public class EventProjectionPoller implements CronTask {
    private static final Logger logger = LoggerFactory.getLogger(EventProjectionPoller.class);
    private final AsyncProjection asyncProjection;
    private final EventStore eventStore;
    private final OffsetStore offsetStore;

    public EventProjectionPoller(AsyncProjection asyncProjection, EventStore eventStore, OffsetStore offsetStore) {
        this.asyncProjection = asyncProjection;
        this.eventStore = eventStore;
        this.offsetStore = offsetStore;
    }

    public Uni<Void> performTask() {
        return this.offsetStore.get(getOffset()).flatMap(offset -> {
            return this.eventStore.fetch(streamStatement(this.asyncProjection, offset)).flatMap(list -> {
                return this.asyncProjection.apply(parseEvents(list)).flatMap(r7 -> {
                    return this.offsetStore.put(offset.updateOffset(list));
                });
            });
        }).onFailure().invoke(th -> {
            logger.error("Unable to update projection {}", this.asyncProjection.getClass().getName(), th);
        }).replaceWithVoid();
    }

    private OffsetKey getOffset() {
        return new OffsetKey(this.asyncProjection.getClass().getName(), "default");
    }

    private static EventStream streamStatement(AsyncProjection asyncProjection, Offset offset) {
        AtomicReference atomicReference = new AtomicReference();
        asyncProjection.filter().ifPresentOrElse(eventJournalFilter -> {
            atomicReference.set(EventStreamBuilder.builder().eventTypes(eventJournalFilter.eventTypes()).tenantId(eventJournalFilter.tenant()).offset(offset.idOffSet()).batchSize(1000).tags(eventJournalFilter.tags()).build());
        }, () -> {
            atomicReference.set(EventStreamBuilder.builder().offset(offset.idOffSet()).batchSize(1000).build());
        });
        return (EventStream) atomicReference.get();
    }

    private List<AggregateEvent> parseEvents(List<Event> list) {
        return list.stream().map(event -> {
            return new AggregateEvent(event.aggregateId(), event.tenantId(), event.journalOffset(), event.eventVersion(), EventParser.getEvent(event.eventType(), event.event()));
        }).toList();
    }

    public CronTaskConfiguration configuration() {
        return CronTaskConfigurationBuilder.builder().lockLevel(LockLevel.CLUSTER_WIDE).cron(this.asyncProjection.pollingPolicy()).build();
    }
}
