/*
 * Decompiled with CFR 0.152.
 */
package com.github.jonasrutishauser.transactional.event.core.store;

import com.github.jonasrutishauser.transactional.event.api.Configuration;
import com.github.jonasrutishauser.transactional.event.api.Events;
import com.github.jonasrutishauser.transactional.event.core.PendingEvent;
import com.github.jonasrutishauser.transactional.event.core.store.EventsPublished;
import com.github.jonasrutishauser.transactional.event.core.store.PendingEventStore;
import com.github.jonasrutishauser.transactional.event.core.store.Scheduler;
import com.github.jonasrutishauser.transactional.event.core.store.Worker;
import java.time.Instant;
import java.util.Date;
import java.util.Set;
import javax.annotation.PostConstruct;
import javax.enterprise.concurrent.ContextService;
import javax.enterprise.concurrent.LastExecution;
import javax.enterprise.concurrent.ManagedScheduledExecutorService;
import javax.enterprise.concurrent.Trigger;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.Initialized;
import javax.enterprise.context.control.ActivateRequestContext;
import javax.enterprise.event.Observes;
import javax.enterprise.event.TransactionPhase;
import javax.enterprise.inject.TransientReference;
import javax.inject.Inject;

@ApplicationScoped
class Dispatcher
implements Scheduler {
    private final Configuration configuration;
    private final Scheduler scheduler;
    private final ManagedScheduledExecutorService executor;
    private final PendingEventStore store;
    private final Worker worker;
    private volatile int intervalSeconds = 30;

    Dispatcher() {
        this.configuration = null;
        this.scheduler = null;
        this.executor = null;
        this.store = null;
        this.worker = null;
    }

    @Inject
    Dispatcher(Configuration configuration, Scheduler scheduler, @Events ManagedScheduledExecutorService executor, PendingEventStore store, Worker worker, @Events @TransientReference ContextService contextService) {
        this.configuration = configuration;
        this.scheduler = (Scheduler)contextService.createContextualProxy((Object)scheduler, Scheduler.class);
        this.executor = executor;
        this.store = store;
        this.worker = (Worker)contextService.createContextualProxy((Object)worker, Worker.class);
    }

    @PostConstruct
    void initIntervalSeconds() {
        this.intervalSeconds = this.configuration.getInitialDispatchInterval();
    }

    void directDispatch(@Observes(during=TransactionPhase.AFTER_SUCCESS) EventsPublished events) {
        for (PendingEvent event : events.getEvents()) {
            this.executor.execute(this.processor(event.getId()));
        }
    }

    private Runnable processor(String eventId) {
        return () -> {
            if (!this.worker.process(eventId)) {
                this.intervalSeconds = 1;
            }
        };
    }

    void startup(@Observes @Initialized(value=ApplicationScoped.class) Object event) {
        this.executor.schedule(this.scheduler::schedule, new Trigger(){

            public Date getNextRunTime(LastExecution lastExecutionInfo, Date taskScheduledTime) {
                return Date.from(Instant.now().plusSeconds(Dispatcher.this.intervalSeconds));
            }

            public boolean skipRun(LastExecution lastExecutionInfo, Date scheduledRunTime) {
                return false;
            }
        });
    }

    @Override
    @ActivateRequestContext
    public synchronized void schedule() {
        boolean processed = false;
        Set<String> events = this.store.aquire();
        while (!events.isEmpty()) {
            processed = true;
            events.stream().map(this::processor).forEach(arg_0 -> this.executor.execute(arg_0));
            events = this.store.aquire();
        }
        this.intervalSeconds = processed ? 0 : Math.min(this.configuration.getMaxDispatchInterval(), Math.max(this.intervalSeconds * 2, 1));
    }
}

