package com.github.jonasrutishauser.transactional.event.core.store;

import com.github.jonasrutishauser.transactional.event.api.Configuration;
import com.github.jonasrutishauser.transactional.event.api.Events;
import com.github.jonasrutishauser.transactional.event.core.PendingEvent;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.concurrent.LastExecution;
import jakarta.enterprise.concurrent.ManagedScheduledExecutorService;
import jakarta.enterprise.concurrent.Trigger;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.Initialized;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.event.TransactionPhase;
import jakarta.inject.Inject;
import java.time.Instant;
import java.util.Date;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eclipse.microprofile.metrics.annotation.Gauge;

@ApplicationScoped
/* loaded from: input_file:com/github/jonasrutishauser/transactional/event/core/store/DispatcherImpl.class */
class DispatcherImpl implements Dispatcher {
    private static final Logger LOGGER = LogManager.getLogger();
    private final Configuration configuration;
    private final Dispatcher dispatcher;
    private final ManagedScheduledExecutorService executor;
    private final PendingEventStore store;
    private final Worker worker;
    private final AtomicInteger dispatchedRunning;
    private volatile int intervalSeconds;
    private ScheduledFuture<?> scheduled;

    DispatcherImpl() {
        this.dispatchedRunning = new AtomicInteger();
        this.intervalSeconds = 30;
        this.configuration = null;
        this.dispatcher = null;
        this.executor = null;
        this.store = null;
        this.worker = null;
    }

    @Inject
    DispatcherImpl(Configuration configuration, Dispatcher dispatcher, @Events ManagedScheduledExecutorService managedScheduledExecutorService, PendingEventStore pendingEventStore, Worker worker) {
        this.dispatchedRunning = new AtomicInteger();
        this.intervalSeconds = 30;
        this.configuration = configuration;
        this.dispatcher = dispatcher;
        this.executor = managedScheduledExecutorService;
        this.store = pendingEventStore;
        this.worker = worker;
    }

    @PostConstruct
    void initIntervalSeconds() {
        this.intervalSeconds = this.configuration.getInitialDispatchInterval();
    }

    void directDispatch(@Observes(during = TransactionPhase.AFTER_SUCCESS) EventsPublished eventsPublished) {
        this.dispatcher.processDirect(eventsPublished);
    }

    @Override // com.github.jonasrutishauser.transactional.event.core.store.Dispatcher
    public void processDirect(EventsPublished eventsPublished) {
        try {
            Iterator<PendingEvent> it = eventsPublished.getEvents().iterator();
            while (it.hasNext()) {
                this.executor.execute(this.dispatcher.processor(it.next().getId()));
            }
        } catch (RejectedExecutionException e) {
            LOGGER.warn("Failed to submit events for processing: {}", e.getMessage());
        }
    }

    @Override // com.github.jonasrutishauser.transactional.event.core.store.Dispatcher
    public Runnable processor(String str) {
        return () -> {
            if (this.worker.process(str)) {
                return;
            }
            this.intervalSeconds = 0;
        };
    }

    void startup(@Initialized(ApplicationScoped.class) @Observes Object obj) {
        ManagedScheduledExecutorService managedScheduledExecutorService = this.executor;
        Dispatcher dispatcher = this.dispatcher;
        Objects.requireNonNull(dispatcher);
        this.scheduled = managedScheduledExecutorService.schedule(dispatcher::schedule, new Trigger() { // from class: com.github.jonasrutishauser.transactional.event.core.store.DispatcherImpl.1
            public Date getNextRunTime(LastExecution lastExecution, Date date) {
                return DispatcherImpl.this.maxAquire() <= 0 ? Date.from(Instant.now().plusMillis(DispatcherImpl.this.configuration.getAllInUseInterval())) : Date.from(Instant.now().plusSeconds(DispatcherImpl.this.intervalSeconds));
            }

            public boolean skipRun(LastExecution lastExecution, Date date) {
                return false;
            }
        });
    }

    @PreDestroy
    void stop() {
        if (this.scheduled != null) {
            this.scheduled.cancel(false);
            this.scheduled = null;
        }
    }

    @Override // com.github.jonasrutishauser.transactional.event.core.store.Dispatcher
    public synchronized void schedule() {
        boolean z = false;
        int maxAquire = maxAquire();
        try {
            Set<String> aquire = this.store.aquire(maxAquire);
            while (!aquire.isEmpty()) {
                z = true;
                Stream<String> stream = aquire.stream();
                Dispatcher dispatcher = this.dispatcher;
                Objects.requireNonNull(dispatcher);
                stream.map(dispatcher::processor).forEach(this::executeCounting);
                maxAquire = maxAquire();
                aquire = this.store.aquire(maxAquire);
            }
        } catch (RejectedExecutionException e) {
            LOGGER.warn("Failed to dispatch events: {}", e.getMessage());
        }
        if (z || maxAquire <= 0) {
            this.intervalSeconds = 0;
        } else {
            this.intervalSeconds = Math.min(this.configuration.getMaxDispatchInterval(), Math.max(this.intervalSeconds * 2, 1));
        }
    }

    @Gauge(name = "com.github.jonasrutishauser.transaction.event.dispatched.processing", description = "Number of dispatched events being processed", unit = "none", absolute = true)
    public int getDispatchedRunning() {
        return this.dispatchedRunning.get();
    }

    @Gauge(name = "com.github.jonasrutishauser.transaction.event.dispatch.interval", description = "Interval between lookups for events to process", unit = "seconds", absolute = true)
    public int getIntervalSeconds() {
        return this.intervalSeconds;
    }

    private int maxAquire() {
        return this.configuration.getMaxConcurrentDispatching() - this.dispatchedRunning.get();
    }

    private void executeCounting(Runnable runnable) {
        try {
            this.executor.execute(counting(runnable));
        } catch (RejectedExecutionException e) {
            this.dispatchedRunning.decrementAndGet();
            throw e;
        }
    }

    private Runnable counting(Runnable runnable) {
        this.dispatchedRunning.incrementAndGet();
        return () -> {
            try {
                runnable.run();
            } finally {
                this.dispatchedRunning.decrementAndGet();
            }
        };
    }
}
