package io.kestra.jdbc;

import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.WorkerTriggerResult;
import io.kestra.core.utils.Either;
import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository;
import io.kestra.jdbc.runner.JdbcQueue;
import io.micronaut.context.ApplicationContext;
import io.micronaut.inject.qualifiers.Qualifiers;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.util.function.Consumer;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:io/kestra/jdbc/JdbcWorkerTriggerResultQueueService.class */
public class JdbcWorkerTriggerResultQueueService {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(JdbcWorkerTriggerResultQueueService.class);
    private final JdbcQueue<WorkerTriggerResult> workerTriggerResultQueue;

    @Inject
    private AbstractJdbcWorkerJobRunningRepository jdbcWorkerJobRunningRepository;
    private Runnable queueStop;

    public JdbcWorkerTriggerResultQueueService(ApplicationContext applicationContext) {
        this.workerTriggerResultQueue = (JdbcQueue) applicationContext.getBean(QueueInterface.class, Qualifiers.byName("workerTriggerResultQueue"));
    }

    public Runnable receive(String str, Class<?> cls, Consumer<Either<WorkerTriggerResult, DeserializationException>> consumer) {
        this.queueStop = this.workerTriggerResultQueue.receiveTransaction(str, cls, (dSLContext, list) -> {
            list.forEach(either -> {
                if (either.isRight()) {
                    log.error("Unable to deserialize a worker job: {}", ((DeserializationException) either.getRight()).getMessage());
                } else {
                    this.jdbcWorkerJobRunningRepository.deleteByKey(((WorkerTriggerResult) either.getLeft()).getTriggerContext().uid());
                }
            });
            list.forEach(consumer);
        });
        return this.queueStop;
    }

    public void pause() {
        stopQueue();
    }

    private void stopQueue() {
        synchronized (this) {
            if (this.queueStop != null) {
                this.queueStop.run();
                this.queueStop = null;
            }
        }
    }

    public void cleanup() {
    }

    public void close() {
        stopQueue();
    }
}
