package io.kestra.jdbc;

import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.Worker;
import io.kestra.core.runners.WorkerInstance;
import io.kestra.core.runners.WorkerJob;
import io.kestra.core.runners.WorkerTask;
import io.kestra.core.runners.WorkerTaskRunning;
import io.kestra.core.runners.WorkerTrigger;
import io.kestra.core.runners.WorkerTriggerRunning;
import io.kestra.core.server.Service;
import io.kestra.core.server.ServiceRegistry;
import io.kestra.core.utils.Either;
import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository;
import io.kestra.jdbc.runner.JdbcQueue;
import io.micronaut.context.ApplicationContext;
import io.micronaut.inject.qualifiers.Qualifiers;
import jakarta.inject.Singleton;
import java.io.Closeable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:io/kestra/jdbc/JdbcWorkerJobQueueService.class */
public class JdbcWorkerJobQueueService implements Closeable {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(JdbcWorkerJobQueueService.class);
    private final JdbcQueue<WorkerJob> workerTaskQueue;
    private final AbstractJdbcWorkerJobRunningRepository jdbcWorkerJobRunningRepository;
    private final ServiceRegistry serviceRegistry;
    private final AtomicReference<Runnable> disposable = new AtomicReference<>();
    private final AtomicBoolean isStopped = new AtomicBoolean(false);

    public JdbcWorkerJobQueueService(ApplicationContext applicationContext) {
        this.workerTaskQueue = (JdbcQueue) applicationContext.getBean(QueueInterface.class, Qualifiers.byName("workerJobQueue"));
        this.serviceRegistry = (ServiceRegistry) applicationContext.getBean(ServiceRegistry.class);
        this.jdbcWorkerJobRunningRepository = (AbstractJdbcWorkerJobRunningRepository) applicationContext.getBean(AbstractJdbcWorkerJobRunningRepository.class);
    }

    public Runnable receive(String str, Class<?> cls, Consumer<Either<WorkerJob, DeserializationException>> consumer) {
        this.disposable.set(this.workerTaskQueue.receiveTransaction(str, cls, (dSLContext, list) -> {
            Worker unwrap = this.serviceRegistry.waitForServiceAndGet(Service.ServiceType.WORKER).unwrap();
            WorkerInstance workerInstance = new WorkerInstance(unwrap.getId(), unwrap.getWorkerGroup());
            list.forEach(either -> {
                WorkerTaskRunning of;
                if (either.isRight()) {
                    log.error("Unable to deserialize a worker job: {}", ((DeserializationException) either.getRight()).getMessage());
                    return;
                }
                WorkerTask workerTask = (WorkerJob) either.getLeft();
                if (workerTask instanceof WorkerTask) {
                    of = WorkerTaskRunning.of(workerTask, workerInstance, 0);
                } else {
                    if (!(workerTask instanceof WorkerTrigger)) {
                        throw new IllegalArgumentException("Message is of type " + String.valueOf(workerTask.getClass()) + " which should never occurs");
                    }
                    of = WorkerTriggerRunning.of((WorkerTrigger) workerTask, workerInstance, 0);
                }
                this.jdbcWorkerJobRunningRepository.save(of, dSLContext);
                if (log.isTraceEnabled()) {
                    log.trace("Sending a workerJobRunning: {}", of);
                }
            });
            list.forEach(consumer);
        }));
        return this.disposable.get();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        Runnable runnable;
        if (this.isStopped.compareAndSet(true, false) && (runnable = this.disposable.get()) != null) {
            runnable.run();
            this.disposable.set(null);
        }
    }
}
