package io.kestra.jdbc.runner;

import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.kestra.core.repositories.MetricRepositoryInterface;
import io.kestra.core.repositories.SaveRepositoryInterface;
import io.kestra.core.runners.Indexer;
import io.kestra.core.runners.IndexerInterface;
import io.kestra.core.server.Service;
import io.kestra.core.server.ServiceStateChangeEvent;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.ListUtils;
import io.micronaut.context.event.ApplicationEventPublisher;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
@JdbcRunnerEnabled
/* loaded from: input_file:io/kestra/jdbc/runner/JdbcIndexer.class */
public class JdbcIndexer implements IndexerInterface {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(JdbcIndexer.class);
    private final LogRepositoryInterface logRepository;
    private final JdbcQueue<LogEntry> logQueue;
    private final MetricRepositoryInterface metricRepository;
    private final JdbcQueue<MetricEntry> metricQueue;
    private final MetricRegistry metricRegistry;
    private final List<Runnable> receiveCancellations = new ArrayList();
    private final String id = IdUtils.create();
    private final AtomicReference<Service.ServiceState> state = new AtomicReference<>();
    private final ApplicationEventPublisher<ServiceStateChangeEvent> eventPublisher;

    @Inject
    public JdbcIndexer(LogRepositoryInterface logRepositoryInterface, @Named("workerTaskLogQueue") QueueInterface<LogEntry> queueInterface, MetricRepositoryInterface metricRepositoryInterface, @Named("workerTaskMetricQueue") QueueInterface<MetricEntry> queueInterface2, MetricRegistry metricRegistry, ApplicationEventPublisher<ServiceStateChangeEvent> applicationEventPublisher) {
        this.logRepository = logRepositoryInterface;
        this.logQueue = (JdbcQueue) queueInterface;
        this.metricRepository = metricRepositoryInterface;
        this.metricQueue = (JdbcQueue) queueInterface2;
        this.metricRegistry = metricRegistry;
        this.eventPublisher = applicationEventPublisher;
        setState(Service.ServiceState.CREATED);
    }

    public void run() {
        log.debug("Starting the indexer");
        startQueues();
        setState(Service.ServiceState.RUNNING);
    }

    protected void startQueues() {
        sendBatch(this.logQueue, this.logRepository);
        sendBatch(this.metricQueue, this.metricRepository);
    }

    protected <T> void sendBatch(JdbcQueue<T> jdbcQueue, SaveRepositoryInterface<T> saveRepositoryInterface) {
        this.receiveCancellations.addFirst(jdbcQueue.receiveBatch(Indexer.class, list -> {
            list.stream().filter(either -> {
                return either.isRight();
            }).forEach(either2 -> {
                log.error("unable to deserialize an item: {}", ((DeserializationException) either2.getRight()).getMessage());
            });
            List list = list.stream().filter(either3 -> {
                return either3.isLeft();
            }).map(either4 -> {
                return either4.getLeft();
            }).toList();
            if (ListUtils.isEmpty(list)) {
                return;
            }
            String name = list.getFirst().getClass().getName();
            this.metricRegistry.counter("indexer.request.count", new String[]{"type", name}).increment();
            this.metricRegistry.counter("indexer.message.in.count", new String[]{"type", name}).increment(list.size());
            this.metricRegistry.timer("indexer.request.duration", new String[]{"type", name}).record(() -> {
                this.metricRegistry.counter("indexer.message.out.count", new String[]{"type", name}).increment(saveRepositoryInterface.saveBatch(list));
            });
        }));
    }

    private void setState(Service.ServiceState serviceState) {
        this.state.set(serviceState);
        this.eventPublisher.publishEvent(new ServiceStateChangeEvent(this));
    }

    public String getId() {
        return this.id;
    }

    public Service.ServiceType getType() {
        return Service.ServiceType.INDEXER;
    }

    public Service.ServiceState getState() {
        return this.state.get();
    }

    @PreDestroy
    public void close() {
        setState(Service.ServiceState.TERMINATING);
        this.receiveCancellations.forEach((v0) -> {
            v0.run();
        });
        try {
            stopQueue();
            setState(Service.ServiceState.TERMINATED_GRACEFULLY);
        } catch (IOException e) {
            log.error("Failed to close the queue", e);
            setState(Service.ServiceState.TERMINATED_FORCED);
        }
    }

    protected void stopQueue() throws IOException {
        this.logQueue.close();
        this.metricQueue.close();
    }
}
