package com.netflix.conductor.postgres.dao;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.postgres.util.ExecutorsUtil;
import com.netflix.conductor.postgres.util.Query;
import jakarta.annotation.PreDestroy;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.springframework.retry.support.RetryTemplate;

/* loaded from: input_file:com/netflix/conductor/postgres/dao/PostgresQueueDAO.class */
public class PostgresQueueDAO extends PostgresBaseDAO implements QueueDAO {
    private static final Long UNACK_SCHEDULE_MS = 60000L;
    private final ScheduledExecutorService scheduledExecutorService;

    /* loaded from: input_file:com/netflix/conductor/postgres/dao/PostgresQueueDAO$QueueMessage.class */
    private class QueueMessage {
        public String queueName;
        public String messageId;

        private QueueMessage() {
        }
    }

    public PostgresQueueDAO(RetryTemplate retryTemplate, ObjectMapper objectMapper, DataSource dataSource) {
        super(retryTemplate, objectMapper, dataSource);
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(ExecutorsUtil.newNamedThreadFactory("postgres-queue-"));
        this.scheduledExecutorService.scheduleAtFixedRate(this::processAllUnacks, UNACK_SCHEDULE_MS.longValue(), UNACK_SCHEDULE_MS.longValue(), TimeUnit.MILLISECONDS);
        this.logger.debug("{} is ready to serve", PostgresQueueDAO.class.getName());
    }

    @PreDestroy
    public void destroy() {
        try {
            this.scheduledExecutorService.shutdown();
            if (this.scheduledExecutorService.awaitTermination(30L, TimeUnit.SECONDS)) {
                this.logger.debug("tasks completed, shutting down");
            } else {
                this.logger.warn("Forcing shutdown after waiting for 30 seconds");
                this.scheduledExecutorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            this.logger.warn("Shutdown interrupted, invoking shutdownNow on scheduledExecutorService for processAllUnacks", e);
            this.scheduledExecutorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    public void push(String str, String str2, long j) {
        push(str, str2, 0, j);
    }

    public void push(String str, String str2, int i, long j) {
        withTransaction(connection -> {
            pushMessage(connection, str, str2, null, Integer.valueOf(i), j);
        });
    }

    public void push(String str, List<Message> list) {
        withTransaction(connection -> {
            list.forEach(message -> {
                pushMessage(connection, str, message.getId(), message.getPayload(), Integer.valueOf(message.getPriority()), 0L);
            });
        });
    }

    public boolean pushIfNotExists(String str, String str2, long j) {
        return pushIfNotExists(str, str2, 0, j);
    }

    public boolean pushIfNotExists(String str, String str2, int i, long j) {
        return ((Boolean) getWithRetriedTransactions(connection -> {
            if (existsMessage(connection, str, str2)) {
                return false;
            }
            pushMessage(connection, str, str2, null, Integer.valueOf(i), j);
            return true;
        })).booleanValue();
    }

    public List<String> pop(String str, int i, int i2) {
        return (List) pollMessages(str, i, i2).stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList());
    }

    /* JADX WARN: Code restructure failed: missing block: B:20:0x009b, code lost:
    
        return r0;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public java.util.List<com.netflix.conductor.core.events.queue.Message> pollMessages(java.lang.String r8, int r9, int r10) {
        /*
            r7 = this;
            r0 = r10
            r1 = 1
            if (r0 >= r1) goto L27
            r0 = r7
            r1 = r7
            r2 = r8
            r3 = r9
            r4 = r10
            java.util.List<com.netflix.conductor.core.events.queue.Message> r1 = (v4) -> { // com.netflix.conductor.postgres.util.TransactionalFunction.apply(java.sql.Connection):java.lang.Object
                return r1.lambda$pollMessages$4(r2, r3, r4, v4);
            }
            java.lang.Object r0 = r0.getWithTransactionWithOutErrorPropagation(r1)
            java.util.List r0 = (java.util.List) r0
            r11 = r0
            r0 = r11
            if (r0 != 0) goto L24
            java.util.ArrayList r0 = new java.util.ArrayList
            r1 = r0
            r1.<init>()
            return r0
        L24:
            r0 = r11
            return r0
        L27:
            long r0 = java.lang.System.currentTimeMillis()
            r11 = r0
            java.util.ArrayList r0 = new java.util.ArrayList
            r1 = r0
            r1.<init>()
            r13 = r0
        L35:
            r0 = r7
            r1 = r7
            r2 = r8
            r3 = r9
            r4 = r13
            r5 = r10
            java.util.List<com.netflix.conductor.core.events.queue.Message> r1 = (v5) -> { // com.netflix.conductor.postgres.util.TransactionalFunction.apply(java.sql.Connection):java.lang.Object
                return r1.lambda$pollMessages$5(r2, r3, r4, r5, v5);
            }
            java.lang.Object r0 = r0.getWithTransactionWithOutErrorPropagation(r1)
            java.util.List r0 = (java.util.List) r0
            r14 = r0
            r0 = r14
            if (r0 != 0) goto L78
            r0 = r7
            org.slf4j.Logger r0 = r0.logger
            java.lang.String r1 = "Unable to poll {} messages from {} due to tx conflict, only {} popped"
            r2 = 3
            java.lang.Object[] r2 = new java.lang.Object[r2]
            r3 = r2
            r4 = 0
            r5 = r9
            java.lang.Integer r5 = java.lang.Integer.valueOf(r5)
            r3[r4] = r5
            r3 = r2
            r4 = 1
            r5 = r8
            r3[r4] = r5
            r3 = r2
            r4 = 2
            r5 = r13
            int r5 = r5.size()
            java.lang.Integer r5 = java.lang.Integer.valueOf(r5)
            r3[r4] = r5
            r0.warn(r1, r2)
            r0 = r13
            return r0
        L78:
            r0 = r13
            r1 = r14
            boolean r0 = r0.addAll(r1)
            r0 = r13
            int r0 = r0.size()
            r1 = r9
            if (r0 >= r1) goto L99
            long r0 = java.lang.System.currentTimeMillis()
            r1 = r11
            long r0 = r0 - r1
            r1 = r10
            long r1 = (long) r1
            int r0 = (r0 > r1 ? 1 : (r0 == r1 ? 0 : -1))
            if (r0 <= 0) goto L9c
        L99:
            r0 = r13
            return r0
        L9c:
            r0 = 100
            java.util.concurrent.TimeUnit r1 = java.util.concurrent.TimeUnit.MILLISECONDS
            com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly(r0, r1)
            goto L35
        */
        throw new UnsupportedOperationException("Method not decompiled: com.netflix.conductor.postgres.dao.PostgresQueueDAO.pollMessages(java.lang.String, int, int):java.util.List");
    }

    public void remove(String str, String str2) {
        withTransaction(connection -> {
            removeMessage(connection, str, str2);
        });
    }

    public int getSize(String str) {
        return ((Integer) queryWithTransaction("SELECT COUNT(*) FROM queue_message WHERE queue_name = ?", query -> {
            return Integer.valueOf(Long.valueOf(query.addParameter(str).executeCount()).intValue());
        })).intValue();
    }

    public boolean ack(String str, String str2) {
        return ((Boolean) getWithRetriedTransactions(connection -> {
            return Boolean.valueOf(removeMessage(connection, str, str2));
        })).booleanValue();
    }

    public boolean setUnackTimeout(String str, String str2, long j) {
        long j2 = j / 1000;
        return ((Integer) queryWithTransaction("UPDATE queue_message SET offset_time_seconds = ?, deliver_on = (current_timestamp + (? ||' seconds')::interval) WHERE queue_name = ? AND message_id = ?", query -> {
            return Integer.valueOf(query.addParameter(j2).addParameter(j2).addParameter(str).addParameter(str2).executeUpdate());
        })).intValue() == 1;
    }

    public void flush(String str) {
        executeWithTransaction("DELETE FROM queue_message WHERE queue_name = ?", query -> {
            query.addParameter(str).executeDelete();
        });
    }

    public Map<String, Long> queuesDetail() {
        return (Map) queryWithTransaction("SELECT queue_name, (SELECT count(*) FROM queue_message WHERE popped = false AND queue_name = q.queue_name) AS size FROM queue q FOR SHARE SKIP LOCKED", query -> {
            return (Map) query.executeAndFetch(resultSet -> {
                HashMap newHashMap = Maps.newHashMap();
                while (resultSet.next()) {
                    newHashMap.put(resultSet.getString("queue_name"), Long.valueOf(resultSet.getLong("size")));
                }
                return newHashMap;
            });
        });
    }

    public Map<String, Map<String, Map<String, Long>>> queuesDetailVerbose() {
        return (Map) queryWithTransaction("SELECT queue_name, \n       (SELECT count(*) FROM queue_message WHERE popped = false AND queue_name = q.queue_name) AS size,\n       (SELECT count(*) FROM queue_message WHERE popped = true AND queue_name = q.queue_name) AS uacked \nFROM queue q FOR SHARE SKIP LOCKED", query -> {
            return (Map) query.executeAndFetch(resultSet -> {
                HashMap newHashMap = Maps.newHashMap();
                while (resultSet.next()) {
                    newHashMap.put(resultSet.getString("queue_name"), ImmutableMap.of("a", ImmutableMap.of("size", Long.valueOf(resultSet.getLong("size")), "uacked", Long.valueOf(resultSet.getLong("uacked")))));
                }
                return newHashMap;
            });
        });
    }

    public void processAllUnacks() {
        this.logger.trace("processAllUnacks started");
        getWithRetriedTransactions(connection -> {
            List<QueueMessage> list = (List) query(connection, "SELECT queue_name, message_id FROM queue_message WHERE popped = true AND (deliver_on + (60 ||' seconds')::interval)  <  current_timestamp limit 1000 FOR UPDATE SKIP LOCKED", query -> {
                return (List) query.executeAndFetch(resultSet -> {
                    ArrayList arrayList = new ArrayList();
                    while (resultSet.next()) {
                        QueueMessage queueMessage = new QueueMessage();
                        queueMessage.queueName = resultSet.getString("queue_name");
                        queueMessage.messageId = resultSet.getString("message_id");
                        arrayList.add(queueMessage);
                    }
                    return arrayList;
                });
            });
            if (list.size() == 0) {
                return 0;
            }
            HashMap hashMap = new HashMap();
            for (QueueMessage queueMessage : list) {
                if (!hashMap.containsKey(queueMessage.queueName)) {
                    hashMap.put(queueMessage.queueName, new ArrayList());
                }
                ((List) hashMap.get(queueMessage.queueName)).add(queueMessage.messageId);
            }
            int i = 0;
            for (String str : hashMap.keySet()) {
                Integer num = 0;
                try {
                    List list2 = (List) hashMap.get(str);
                    num = (Integer) query(connection, String.format("UPDATE queue_message SET popped = false WHERE queue_name = ? and message_id IN (%s)", Query.generateInBindings(list2.size())), query2 -> {
                        return Integer.valueOf(query2.addParameter(str).addParameters(list2).executeUpdate());
                    });
                } catch (Exception e) {
                    e.printStackTrace();
                }
                i += num.intValue();
                this.logger.debug("Unacked {} messages from all queues", num);
            }
            if (i > 0) {
                this.logger.debug("Unacked {} messages from all queues", Integer.valueOf(i));
            }
            return Integer.valueOf(i);
        });
    }

    public void processUnacks(String str) {
        executeWithTransaction("UPDATE queue_message SET popped = false WHERE queue_name = ? AND popped = true AND (current_timestamp - (60 ||' seconds')::interval)  > deliver_on", query -> {
            query.addParameter(str).executeUpdate();
        });
    }

    public boolean resetOffsetTime(String str, String str2) {
        long j = 0;
        return ((Boolean) queryWithTransaction("UPDATE queue_message SET offset_time_seconds = ?, deliver_on = (current_timestamp + (? ||' seconds')::interval) \nWHERE queue_name = ? AND message_id = ?", query -> {
            return Boolean.valueOf(query.addParameter(j).addParameter(j).addParameter(str).addParameter(str2).executeUpdate() == 1);
        })).booleanValue();
    }

    private boolean existsMessage(Connection connection, String str, String str2) {
        return ((Boolean) query(connection, "SELECT EXISTS(SELECT 1 FROM queue_message WHERE queue_name = ? AND message_id = ?) FOR SHARE", query -> {
            return Boolean.valueOf(query.addParameter(str).addParameter(str2).exists());
        })).booleanValue();
    }

    private void pushMessage(Connection connection, String str, String str2, String str3, Integer num, long j) {
        createQueueIfNotExists(connection, str);
        if (((Integer) query(connection, "UPDATE queue_message SET payload=?, deliver_on=(current_timestamp + (? ||' seconds')::interval) WHERE queue_name = ? AND message_id = ?", query -> {
            return Integer.valueOf(query.addParameter(str3).addParameter(j).addParameter(str).addParameter(str2).executeUpdate());
        })).intValue() == 0) {
            execute(connection, "INSERT INTO queue_message (deliver_on, queue_name, message_id, priority, offset_time_seconds, payload) VALUES ((current_timestamp + (? ||' seconds')::interval), ?,?,?,?,?) ON CONFLICT (queue_name,message_id) DO UPDATE SET payload=excluded.payload, deliver_on=excluded.deliver_on", query2 -> {
                query2.addParameter(j).addParameter(str).addParameter(str2).addParameter(num.intValue()).addParameter(j).addParameter(str3).executeUpdate();
            });
        }
    }

    private boolean removeMessage(Connection connection, String str, String str2) {
        return ((Boolean) query(connection, "DELETE FROM queue_message WHERE queue_name = ? AND message_id = ?", query -> {
            return Boolean.valueOf(query.addParameter(str).addParameter(str2).executeDelete());
        })).booleanValue();
    }

    private List<Message> peekMessages(Connection connection, String str, int i) {
        return i < 1 ? Collections.emptyList() : (List) query(connection, "SELECT message_id, priority, payload FROM queue_message WHERE queue_name = ? AND popped = false AND deliver_on <= (current_timestamp + (1000 ||' microseconds')::interval) ORDER BY priority DESC, deliver_on, created_on LIMIT ? FOR UPDATE SKIP LOCKED", query -> {
            return (List) query.addParameter(str).addParameter(i).executeAndFetch(resultSet -> {
                ArrayList arrayList = new ArrayList();
                while (resultSet.next()) {
                    Message message = new Message();
                    message.setId(resultSet.getString("message_id"));
                    message.setPriority(resultSet.getInt("priority"));
                    message.setPayload(resultSet.getString("payload"));
                    arrayList.add(message);
                }
                return arrayList;
            });
        });
    }

    private List<Message> popMessages(Connection connection, String str, int i, int i2) {
        List<Message> peekMessages = peekMessages(connection, str, i);
        if (peekMessages.isEmpty()) {
            return peekMessages;
        }
        ArrayList arrayList = new ArrayList();
        for (Message message : peekMessages) {
            if (((Integer) query(connection, "UPDATE queue_message SET popped = true WHERE queue_name = ? AND message_id = ? AND popped = false", query -> {
                return Integer.valueOf(query.addParameter(str).addParameter(message.getId()).executeUpdate());
            })).intValue() == 1) {
                arrayList.add(message);
            }
        }
        return arrayList;
    }

    public boolean containsMessage(String str, String str2) {
        return ((Boolean) getWithRetriedTransactions(connection -> {
            return Boolean.valueOf(existsMessage(connection, str, str2));
        })).booleanValue();
    }

    private void createQueueIfNotExists(Connection connection, String str) {
        this.logger.trace("Creating new queue '{}'", str);
        if (((Boolean) query(connection, "SELECT EXISTS(SELECT 1 FROM queue WHERE queue_name = ?) FOR SHARE", query -> {
            return Boolean.valueOf(query.addParameter(str).exists());
        })).booleanValue()) {
            return;
        }
        execute(connection, "INSERT INTO queue (queue_name) VALUES (?) ON CONFLICT (queue_name) DO NOTHING", query2 -> {
            query2.addParameter(str).executeUpdate();
        });
    }
}
