package io.es4j.core.tasks;

import io.es4j.Aggregate;
import io.es4j.core.objects.AggregateState;
import io.es4j.core.objects.LoadAggregate;
import io.es4j.core.objects.OffsetKey;
import io.es4j.core.objects.StateProjectionWrapper;
import io.es4j.infrastructure.EventStore;
import io.es4j.infrastructure.OffsetStore;
import io.es4j.infrastructure.models.EventStreamBuilder;
import io.es4j.infrastructure.proxy.AggregateEventBusPoxy;
import io.es4j.task.CronTask;
import io.es4j.task.CronTaskConfiguration;
import io.es4j.task.CronTaskConfigurationBuilder;
import io.es4j.task.LockLevel;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.tuples.Tuple2;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

/* loaded from: input_file:io/es4j/core/tasks/StateProjectionPoller.class */
public class StateProjectionPoller<T extends Aggregate> implements CronTask {
    private final StateProjectionWrapper<T> stateProjectionWrapper;
    private final AggregateEventBusPoxy<T> proxy;
    private final EventStore eventStore;
    private final OffsetStore offsetStore;
    private final Class<T> aggregateClass;

    public StateProjectionPoller(Class<T> cls, StateProjectionWrapper<T> stateProjectionWrapper, AggregateEventBusPoxy<T> aggregateEventBusPoxy, EventStore eventStore, OffsetStore offsetStore) {
        this.aggregateClass = cls;
        this.stateProjectionWrapper = stateProjectionWrapper;
        this.proxy = aggregateEventBusPoxy;
        this.eventStore = eventStore;
        this.offsetStore = offsetStore;
    }

    public Uni<Void> performTask() {
        this.stateProjectionWrapper.logger().debug("Polling events");
        return this.offsetStore.get(new OffsetKey(this.stateProjectionWrapper.asyncStateTransfer().getClass().getName(), "default")).flatMap(offset -> {
            this.stateProjectionWrapper.logger().debug("Journal idOffset at {}", offset.idOffSet());
            return this.eventStore.fetch(EventStreamBuilder.builder().offset(offset.idOffSet()).batchSize(5000).build());
        }).flatMap(list -> {
            Set keySet = ((Map) list.stream().collect(Collectors.groupingBy(event -> {
                return Tuple2.of(event.aggregateId(), event.tenantId());
            }))).keySet();
            this.stateProjectionWrapper.logger().debug("Updating {} IDs : {}", this.aggregateClass.getSimpleName(), keySet);
            return Multi.createFrom().iterable(keySet).onItem().transformToUniAndMerge(tuple2 -> {
                Uni<AggregateState<T>> proxyCommand = this.proxy.proxyCommand(new LoadAggregate((String) tuple2.getItem1(), (String) tuple2.getItem2(), null, null));
                StateProjectionWrapper<T> stateProjectionWrapper = this.stateProjectionWrapper;
                Objects.requireNonNull(stateProjectionWrapper);
                return proxyCommand.flatMap(stateProjectionWrapper::update);
            }).collect().asList().replaceWithVoid();
        }).replaceWithVoid();
    }

    public CronTaskConfiguration configuration() {
        return CronTaskConfigurationBuilder.builder().lockLevel(LockLevel.CLUSTER_WIDE).cron(this.stateProjectionWrapper.asyncStateTransfer().pollingPolicy()).build();
    }
}
