package io.kestra.jdbc.runner;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.CaseFormat;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.queues.MessageTooBigException;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.queues.QueueService;
import io.kestra.core.utils.Either;
import io.kestra.core.utils.ExecutorsUtils;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.Rethrow;
import io.kestra.jdbc.JdbcMapper;
import io.kestra.jdbc.JdbcTableConfigs;
import io.kestra.jdbc.JooqDSLContextWrapper;
import io.kestra.jdbc.repository.AbstractJdbcRepository;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.transaction.exceptions.CannotCreateTransactionException;
import java.io.IOException;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.time.chrono.ChronoZonedDateTime;
import java.time.temporal.TemporalAmount;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import lombok.Generated;
import org.jooq.Condition;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.JSONB;
import org.jooq.Record;
import org.jooq.Result;
import org.jooq.SelectConditionStep;
import org.jooq.SelectForStep;
import org.jooq.Table;
import org.jooq.impl.DSL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/kestra/jdbc/runner/JdbcQueue.class */
public abstract class JdbcQueue<T> implements QueueInterface<T> {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(JdbcQueue.class);
    private static final int MAX_ASYNC_THREADS = Runtime.getRuntime().availableProcessors() * 2;
    protected static final ObjectMapper MAPPER = JdbcMapper.of();
    private final ExecutorService poolExecutor;
    private final ExecutorService asyncPoolExecutor;
    protected final QueueService queueService;
    protected final Class<T> cls;
    protected final JooqDSLContextWrapper dslContextWrapper;
    protected final Configuration configuration;
    protected final MessageProtectionConfiguration messageProtectionConfiguration;
    private final MetricRegistry metricRegistry;
    protected final Table<Record> table;
    protected final JdbcQueueIndexer jdbcQueueIndexer;
    private final AtomicBoolean isClosed = new AtomicBoolean(false);

    @ConfigurationProperties("kestra.jdbc.queues")
    /* loaded from: input_file:io/kestra/jdbc/runner/JdbcQueue$Configuration.class */
    public static class Configuration {
        Duration minPollInterval = Duration.ofMillis(100);
        Duration maxPollInterval = Duration.ofMillis(500);
        Duration pollSwitchInterval = Duration.ofSeconds(30);
        Integer pollSize = 100;

        @Generated
        public Duration getMinPollInterval() {
            return this.minPollInterval;
        }

        @Generated
        public Duration getMaxPollInterval() {
            return this.maxPollInterval;
        }

        @Generated
        public Duration getPollSwitchInterval() {
            return this.pollSwitchInterval;
        }

        @Generated
        public Integer getPollSize() {
            return this.pollSize;
        }
    }

    public JdbcQueue(Class<T> cls, ApplicationContext applicationContext) {
        ExecutorsUtils executorsUtils = (ExecutorsUtils) applicationContext.getBean(ExecutorsUtils.class);
        this.poolExecutor = executorsUtils.cachedThreadPool("jdbc-queue-" + cls.getSimpleName());
        this.asyncPoolExecutor = executorsUtils.maxCachedThreadPool(MAX_ASYNC_THREADS, "jdbc-queue-async-" + cls.getSimpleName());
        this.queueService = (QueueService) applicationContext.getBean(QueueService.class);
        this.cls = cls;
        this.dslContextWrapper = (JooqDSLContextWrapper) applicationContext.getBean(JooqDSLContextWrapper.class);
        this.configuration = (Configuration) applicationContext.getBean(Configuration.class);
        this.messageProtectionConfiguration = (MessageProtectionConfiguration) applicationContext.getBean(MessageProtectionConfiguration.class);
        this.metricRegistry = (MetricRegistry) applicationContext.getBean(MetricRegistry.class);
        this.table = DSL.table(((JdbcTableConfigs) applicationContext.getBean(JdbcTableConfigs.class)).tableConfig("queues").table());
        this.jdbcQueueIndexer = (JdbcQueueIndexer) applicationContext.getBean(JdbcQueueIndexer.class);
    }

    protected Map<Field<Object>, Object> produceFields(String str, String str2, T t) throws QueueException {
        try {
            byte[] writeValueAsBytes = MAPPER.writeValueAsBytes(t);
            if (this.messageProtectionConfiguration.enabled && writeValueAsBytes.length >= this.messageProtectionConfiguration.limit.intValue()) {
                this.metricRegistry.counter("queue.big_message.count", new String[]{"class_name", this.cls.getName()}).increment();
                if (!(t instanceof Execution) || !((Execution) t).getState().isTerminated()) {
                    throw new MessageTooBigException("Message of size " + writeValueAsBytes.length + " has exceeded the configured limit of " + this.messageProtectionConfiguration.limit);
                }
            }
            HashMap hashMap = new HashMap();
            hashMap.put(AbstractJdbcRepository.field("type"), this.cls.getName());
            hashMap.put(AbstractJdbcRepository.field("key"), str2 != null ? str2 : IdUtils.create());
            hashMap.put(AbstractJdbcRepository.field("value"), JSONB.valueOf(new String(writeValueAsBytes)));
            if (str != null) {
                hashMap.put(AbstractJdbcRepository.field("consumer_group"), str);
            }
            return hashMap;
        } catch (JsonProcessingException e) {
            throw new QueueException("Unable to serialize the message", e);
        }
    }

    private void produce(String str, String str2, T t, Boolean bool) throws QueueException {
        if (log.isTraceEnabled()) {
            log.trace("New message: topic '{}', value {}", this.cls.getName(), t);
        }
        Map<Field<Object>, Object> produceFields = produceFields(str, str2, t);
        this.dslContextWrapper.transaction(configuration -> {
            DSLContext using = DSL.using(configuration);
            if (!bool.booleanValue()) {
                this.jdbcQueueIndexer.accept(using, t);
            }
            using.insertInto(this.table).set(produceFields).execute();
        });
    }

    public void emitOnly(String str, T t) throws QueueException {
        produce(str, this.queueService.key(t), t, true);
    }

    public void emit(String str, T t) throws QueueException {
        produce(str, this.queueService.key(t), t, false);
    }

    public void emitAsync(String str, T t) throws QueueException {
        this.asyncPoolExecutor.submit(Rethrow.throwRunnable(() -> {
            emit(str, t);
        }));
    }

    public void delete(String str, T t) throws QueueException {
        this.dslContextWrapper.transaction(configuration -> {
            DSL.using(configuration).delete(this.table).where(AbstractJdbcRepository.field("key").eq(this.queueService.key(t))).execute();
        });
    }

    protected Result<Record> receiveFetch(DSLContext dSLContext, String str, Integer num) {
        return receiveFetch(dSLContext, str, num, true);
    }

    protected Result<Record> receiveFetch(DSLContext dSLContext, String str, Integer num, boolean z) {
        SelectConditionStep where = dSLContext.select(AbstractJdbcRepository.field("value"), AbstractJdbcRepository.field("offset")).from(this.table).where(buildTypeCondition(this.cls.getName()));
        if (num.intValue() != 0) {
            where = where.and(AbstractJdbcRepository.field("offset").gt(num));
        }
        SelectForStep limit = (str != null ? where.and(AbstractJdbcRepository.field("consumer_group").eq(str)) : where.and(AbstractJdbcRepository.field("consumer_group").isNull())).orderBy(AbstractJdbcRepository.field("offset").asc()).limit(this.configuration.getPollSize());
        SelectForStep selectForStep = limit;
        if (z) {
            selectForStep = limit.forUpdate().skipLocked();
        }
        return (Result) selectForStep.fetchMany().getFirst();
    }

    protected Result<Record> receiveFetch(DSLContext dSLContext, String str, String str2) {
        return receiveFetch(dSLContext, str, str2, true);
    }

    protected abstract Result<Record> receiveFetch(DSLContext dSLContext, String str, String str2, boolean z);

    protected abstract void updateGroupOffsets(DSLContext dSLContext, String str, String str2, List<Integer> list);

    protected abstract Condition buildTypeCondition(String str);

    public Runnable receive(String str, Consumer<Either<T, DeserializationException>> consumer, boolean z) {
        AtomicInteger atomicInteger = new AtomicInteger();
        this.dslContextWrapper.transaction(configuration -> {
            SelectConditionStep where = DSL.using(configuration).select(DSL.max(AbstractJdbcRepository.field("offset")).as("max")).from(this.table).where(buildTypeCondition(this.cls.getName()));
            Integer num = (Integer) (str != null ? where.and(AbstractJdbcRepository.field("consumer_group").eq(str)) : where.and(AbstractJdbcRepository.field("consumer_group").isNull())).fetchAny("max", Integer.class);
            if (num != null) {
                atomicInteger.set(num.intValue());
            }
        });
        return poll(() -> {
            Result<Record> result = (Result) this.dslContextWrapper.transactionResult(configuration2 -> {
                Result<Record> receiveFetch = receiveFetch(DSL.using(configuration2), str, Integer.valueOf(atomicInteger.get()), z);
                if (!receiveFetch.isEmpty()) {
                    atomicInteger.set(((Integer) receiveFetch.map(record -> {
                        return (Integer) record.get("offset", Integer.class);
                    }).getLast()).intValue());
                }
                return receiveFetch;
            });
            send(result, consumer);
            return Integer.valueOf(result.size());
        });
    }

    public Runnable receive(String str, Class<?> cls, Consumer<Either<T, DeserializationException>> consumer, boolean z) {
        return receiveImpl(str, cls, (dSLContext, list) -> {
            list.forEach(consumer);
        }, false, z);
    }

    public Runnable receiveBatch(Class<?> cls, Consumer<List<Either<T, DeserializationException>>> consumer) {
        return receiveBatch(null, cls, consumer);
    }

    public Runnable receiveBatch(String str, Class<?> cls, Consumer<List<Either<T, DeserializationException>>> consumer) {
        return receiveBatch(str, cls, consumer, true);
    }

    public Runnable receiveBatch(String str, Class<?> cls, Consumer<List<Either<T, DeserializationException>>> consumer, boolean z) {
        return receiveImpl(str, cls, (dSLContext, list) -> {
            consumer.accept(list);
        }, false, z);
    }

    public Runnable receiveTransaction(String str, Class<?> cls, BiConsumer<DSLContext, List<Either<T, DeserializationException>>> biConsumer) {
        return receiveImpl(str, cls, biConsumer, true, true);
    }

    public Runnable receiveImpl(String str, Class<?> cls, BiConsumer<DSLContext, List<Either<T, DeserializationException>>> biConsumer, Boolean bool, boolean z) {
        String queueName = queueName(cls);
        return poll(() -> {
            Result<Record> result = (Result) this.dslContextWrapper.transactionResult(configuration -> {
                DSLContext using = DSL.using(configuration);
                Result<Record> receiveFetch = receiveFetch(using, str, queueName, z);
                if (!receiveFetch.isEmpty()) {
                    if (bool.booleanValue()) {
                        biConsumer.accept(using, map(receiveFetch));
                    }
                    updateGroupOffsets(using, str, queueName, receiveFetch.map(record -> {
                        return (Integer) record.get("offset", Integer.class);
                    }));
                }
                return receiveFetch;
            });
            if (!bool.booleanValue()) {
                biConsumer.accept(null, map(result));
            }
            return Integer.valueOf(result.size());
        });
    }

    protected String queueName(Class<?> cls) {
        return CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, cls.getSimpleName());
    }

    protected Runnable poll(Supplier<Integer> supplier) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        AtomicLong atomicLong = new AtomicLong(this.configuration.getMaxPollInterval().toMillis());
        AtomicReference atomicReference = new AtomicReference(ZonedDateTime.now());
        this.poolExecutor.execute(() -> {
            while (atomicBoolean.get() && !this.isClosed.get()) {
                try {
                    if (((Integer) supplier.get()).intValue() > 0) {
                        atomicReference.set(ZonedDateTime.now());
                    }
                    atomicLong.set(((ZonedDateTime) atomicReference.get()).plus((TemporalAmount) this.configuration.getPollSwitchInterval()).compareTo((ChronoZonedDateTime<?>) ZonedDateTime.now()) < 0 ? this.configuration.getMaxPollInterval().toMillis() : this.configuration.getMinPollInterval().toMillis());
                } catch (CannotCreateTransactionException e) {
                    if (log.isDebugEnabled()) {
                        log.debug("Can't poll on receive", e);
                    }
                }
                try {
                    Thread.sleep(atomicLong.get());
                } catch (InterruptedException e2) {
                    throw new RuntimeException(e2);
                }
            }
        });
        return () -> {
            atomicBoolean.set(false);
        };
    }

    protected List<Either<T, DeserializationException>> map(Result<Record> result) {
        return result.map(record -> {
            try {
                return Either.left(MAPPER.readValue((String) record.get("value", String.class), this.cls));
            } catch (JsonProcessingException e) {
                return Either.right(new DeserializationException(e, (String) record.get("value", String.class)));
            }
        });
    }

    protected void send(Result<Record> result, Consumer<Either<T, DeserializationException>> consumer) {
        map(result).forEach(consumer);
    }

    public void close() throws IOException {
        if (this.isClosed.compareAndSet(false, true)) {
            this.poolExecutor.shutdown();
            this.asyncPoolExecutor.shutdown();
        }
    }
}
