/*
 * Decompiled with CFR 0.152.
 */
package io.kestra.jdbc.runner;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.CaseFormat;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.queues.QueueService;
import io.kestra.core.utils.Either;
import io.kestra.core.utils.ExecutorsUtils;
import io.kestra.core.utils.IdUtils;
import io.kestra.jdbc.JdbcMapper;
import io.kestra.jdbc.JdbcTableConfigs;
import io.kestra.jdbc.JooqDSLContextWrapper;
import io.kestra.jdbc.repository.AbstractJdbcRepository;
import io.kestra.jdbc.runner.JdbcQueueIndexer;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.transaction.exceptions.CannotCreateTransactionException;
import java.io.IOException;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import lombok.Generated;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.JSONB;
import org.jooq.Record;
import org.jooq.Result;
import org.jooq.SelectField;
import org.jooq.Table;
import org.jooq.impl.DSL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class JdbcQueue<T>
implements QueueInterface<T> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(JdbcQueue.class);
    protected static final ObjectMapper MAPPER = JdbcMapper.of();
    private final ExecutorService poolExecutor;
    protected final QueueService queueService;
    protected final Class<T> cls;
    protected final JooqDSLContextWrapper dslContextWrapper;
    protected final Configuration configuration;
    protected final Table<Record> table;
    protected final JdbcQueueIndexer jdbcQueueIndexer;
    protected volatile boolean isShutdown = false;

    public JdbcQueue(Class<T> cls, ApplicationContext applicationContext) {
        ExecutorsUtils executorsUtils = (ExecutorsUtils)applicationContext.getBean(ExecutorsUtils.class);
        this.poolExecutor = executorsUtils.cachedThreadPool("jdbc-queue-" + cls.getSimpleName());
        this.queueService = (QueueService)applicationContext.getBean(QueueService.class);
        this.cls = cls;
        this.dslContextWrapper = (JooqDSLContextWrapper)applicationContext.getBean(JooqDSLContextWrapper.class);
        this.configuration = (Configuration)applicationContext.getBean(Configuration.class);
        JdbcTableConfigs jdbcTableConfigs = (JdbcTableConfigs)applicationContext.getBean(JdbcTableConfigs.class);
        this.table = DSL.table((String)jdbcTableConfigs.tableConfig("queues").table());
        this.jdbcQueueIndexer = (JdbcQueueIndexer)applicationContext.getBean(JdbcQueueIndexer.class);
    }

    protected Map<Field<Object>, Object> produceFields(String consumerGroup, String key, T message) {
        HashMap<Field<Object>, Object> fields = new HashMap<Field<Object>, Object>();
        fields.put(AbstractJdbcRepository.field("type"), this.cls.getName());
        fields.put(AbstractJdbcRepository.field("key"), key != null ? key : IdUtils.create());
        fields.put(AbstractJdbcRepository.field("value"), JSONB.valueOf((String)MAPPER.writeValueAsString(message)));
        if (consumerGroup != null) {
            fields.put(AbstractJdbcRepository.field("consumer_group"), consumerGroup);
        }
        return fields;
    }

    private void produce(String consumerGroup, String key, T message, Boolean skipIndexer) {
        if (log.isTraceEnabled()) {
            log.trace("New message: topic '{}', value {}", (Object)this.cls.getName(), message);
        }
        this.dslContextWrapper.transaction(configuration -> {
            DSLContext context = DSL.using((org.jooq.Configuration)configuration);
            if (!skipIndexer.booleanValue()) {
                this.jdbcQueueIndexer.accept(context, message);
            }
            context.insertInto(this.table).set(this.produceFields(consumerGroup, key, message)).execute();
        });
    }

    public void emitOnly(String consumerGroup, T message) {
        this.produce(consumerGroup, this.queueService.key(message), message, true);
    }

    public void emit(String consumerGroup, T message) {
        this.produce(consumerGroup, this.queueService.key(message), message, false);
    }

    public void emitAsync(String consumerGroup, T message) throws QueueException {
        this.emit(consumerGroup, message);
    }

    public void delete(String consumerGroup, T message) throws QueueException {
        this.dslContextWrapper.transaction(configuration -> DSL.using((org.jooq.Configuration)configuration).delete(this.table).where(AbstractJdbcRepository.field("key").eq((Object)this.queueService.key(message))).execute());
    }

    protected abstract Result<Record> receiveFetch(DSLContext var1, String var2, Integer var3);

    protected abstract Result<Record> receiveFetch(DSLContext var1, String var2, String var3);

    protected abstract void updateGroupOffsets(DSLContext var1, String var2, String var3, List<Integer> var4);

    public Runnable receive(String consumerGroup, Consumer<Either<T, DeserializationException>> consumer) {
        AtomicInteger maxOffset = new AtomicInteger();
        this.dslContextWrapper.transaction(configuration -> {
            Integer integer = (Integer)DSL.using((org.jooq.Configuration)configuration).select((SelectField)DSL.max(AbstractJdbcRepository.field("offset")).as("max")).from(this.table).fetchAny("max", Integer.class);
            if (integer != null) {
                maxOffset.set(integer);
            }
        });
        return this.poll(() -> {
            Result fetch = (Result)this.dslContextWrapper.transactionResult(configuration -> {
                DSLContext ctx = DSL.using((org.jooq.Configuration)configuration);
                Result<Record> result = this.receiveFetch(ctx, consumerGroup, maxOffset.get());
                if (!result.isEmpty()) {
                    List offsets = result.map(record -> (Integer)record.get("offset", Integer.class));
                    maxOffset.set((Integer)offsets.get(offsets.size() - 1));
                }
                return result;
            });
            this.send((Result<Record>)fetch, consumer);
            return fetch.size();
        });
    }

    public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer) {
        return this.receiveImpl(consumerGroup, queueType, (dslContext, eithers) -> eithers.forEach(consumer), false);
    }

    public Runnable receiveTransaction(String consumerGroup, Class<?> queueType, BiConsumer<DSLContext, List<Either<T, DeserializationException>>> consumer) {
        return this.receiveImpl(consumerGroup, queueType, consumer, true);
    }

    public Runnable receiveImpl(String consumerGroup, Class<?> queueType, BiConsumer<DSLContext, List<Either<T, DeserializationException>>> consumer, Boolean inTransaction) {
        String queueName = this.queueName(queueType);
        return this.poll(() -> {
            Result fetch = (Result)this.dslContextWrapper.transactionResult(configuration -> {
                DSLContext ctx = DSL.using((org.jooq.Configuration)configuration);
                Result<Record> result = this.receiveFetch(ctx, consumerGroup, queueName);
                if (!result.isEmpty()) {
                    if (inTransaction.booleanValue()) {
                        consumer.accept(ctx, this.map(result));
                    }
                    this.updateGroupOffsets(ctx, consumerGroup, queueName, result.map(record -> (Integer)record.get("offset", Integer.class)));
                }
                return result;
            });
            if (!inTransaction.booleanValue()) {
                consumer.accept(null, this.map((Result<Record>)fetch));
            }
            return fetch.size();
        });
    }

    protected String queueName(Class<?> queueType) {
        return CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, queueType.getSimpleName());
    }

    protected Runnable poll(Supplier<Integer> runnable) {
        AtomicBoolean running = new AtomicBoolean(true);
        AtomicLong sleep = new AtomicLong(this.configuration.getMaxPollInterval().toMillis());
        AtomicReference<ZonedDateTime> lastPoll = new AtomicReference<ZonedDateTime>(ZonedDateTime.now());
        this.poolExecutor.execute(() -> {
            while (running.get() && !this.isShutdown) {
                block6: {
                    try {
                        Integer count = (Integer)runnable.get();
                        if (count > 0) {
                            lastPoll.set(ZonedDateTime.now());
                        }
                        sleep.set(((ZonedDateTime)lastPoll.get()).plus(this.configuration.getPollSwitchInterval()).compareTo(ZonedDateTime.now()) < 0 ? this.configuration.getMaxPollInterval().toMillis() : this.configuration.getMinPollInterval().toMillis());
                    }
                    catch (CannotCreateTransactionException e) {
                        if (!log.isDebugEnabled()) break block6;
                        log.debug("Can't poll on receive", (Throwable)e);
                    }
                }
                try {
                    Thread.sleep(sleep.get());
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        return () -> running.set(false);
    }

    protected List<Either<T, DeserializationException>> map(Result<Record> fetch) {
        return fetch.map(record -> {
            try {
                return Either.left((Object)MAPPER.readValue((String)record.get("value", String.class), this.cls));
            }
            catch (JsonProcessingException e) {
                return Either.right((Object)((Object)new DeserializationException((IOException)((Object)e), (String)record.get("value", String.class))));
            }
        });
    }

    protected void send(Result<Record> fetch, Consumer<Either<T, DeserializationException>> consumer) {
        this.map(fetch).forEach(consumer);
    }

    public void pause() {
        this.isShutdown = true;
    }

    public void close() throws IOException {
        this.isShutdown = true;
        this.poolExecutor.shutdown();
    }

    @ConfigurationProperties(value="kestra.jdbc.queues")
    public static class Configuration {
        Duration minPollInterval = Duration.ofMillis(100L);
        Duration maxPollInterval = Duration.ofMillis(500L);
        Duration pollSwitchInterval = Duration.ofSeconds(30L);
        Integer pollSize = 100;

        @Generated
        public Duration getMinPollInterval() {
            return this.minPollInterval;
        }

        @Generated
        public Duration getMaxPollInterval() {
            return this.maxPollInterval;
        }

        @Generated
        public Duration getPollSwitchInterval() {
            return this.pollSwitchInterval;
        }

        @Generated
        public Integer getPollSize() {
            return this.pollSize;
        }
    }
}

