package com.networknt.tram.cdc.polling.connector;

import com.networknt.tram.cdc.mysql.connector.MessageWithDestination;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/networknt/tram/cdc/polling/connector/PollingDao.class */
public class PollingDao<EVENT_BEAN, EVENT, ID> {
    private Logger logger = LoggerFactory.getLogger(getClass());
    private MessagePollingDataProvider pollingDataParser;
    private DataSource dataSource;
    private int maxEventsPerPolling;
    private int maxAttemptsForPolling;
    private int pollingRetryIntervalInMilliseconds;

    public PollingDao(MessagePollingDataProvider messagePollingDataProvider, DataSource dataSource, int i, int i2, int i3) {
        if (i <= 0) {
            throw new IllegalArgumentException("Max events per polling parameter should be greater than 0.");
        }
        this.pollingDataParser = messagePollingDataProvider;
        this.dataSource = dataSource;
        this.maxEventsPerPolling = i;
        this.maxAttemptsForPolling = i2;
        this.pollingRetryIntervalInMilliseconds = i3;
    }

    public int getMaxEventsPerPolling() {
        return this.maxEventsPerPolling;
    }

    public void setMaxEventsPerPolling(int i) {
        this.maxEventsPerPolling = i;
    }

    public List<MessageWithDestination> findEventsToPublish() {
        String format = String.format("SELECT * FROM %s WHERE %s = 0 and ROWNUM <= ? ORDER BY %s ASC", this.pollingDataParser.table(), this.pollingDataParser.publishedField(), this.pollingDataParser.idField());
        Stream stream = ((List) handleConnectionLost(() -> {
            return handleFindQuery(format, this.maxEventsPerPolling);
        })).stream();
        MessagePollingDataProvider messagePollingDataProvider = this.pollingDataParser;
        Objects.requireNonNull(messagePollingDataProvider);
        return (List) stream.map(messagePollingDataProvider::transformEventBeanToEvent).collect(Collectors.toList());
    }

    private List<PublishedMessageBean> handleFindQuery(String str, int i) throws SQLException {
        this.logger.info("cdc polling query:" + str);
        ArrayList arrayList = new ArrayList();
        try {
            Connection connection = this.dataSource.getConnection();
            try {
                PreparedStatement prepareStatement = connection.prepareStatement(str);
                prepareStatement.setInt(1, i);
                ResultSet executeQuery = prepareStatement.executeQuery();
                while (executeQuery.next()) {
                    arrayList.add(new PublishedMessageBean(executeQuery.getString(this.pollingDataParser.idField()), executeQuery.getString(this.pollingDataParser.destinationField()), executeQuery.getString(this.pollingDataParser.headersField()), executeQuery.getString(this.pollingDataParser.payloadField())));
                }
                if (connection != null) {
                    connection.close();
                }
                return arrayList;
            } finally {
            }
        } catch (SQLException e) {
            this.logger.error("SqlException:", (Throwable) e);
            throw new SQLException(e);
        }
    }

    public void markEventsAsPublished(List<MessageWithDestination> list) {
        List list2 = (List) list.stream().map(messageWithDestination -> {
            return this.pollingDataParser.getId(messageWithDestination);
        }).collect(Collectors.toList());
        String format = String.format("UPDATE %s SET %s = 1 WHERE %s in (%s)", this.pollingDataParser.table(), this.pollingDataParser.publishedField(), this.pollingDataParser.idField(), preparePlaceHolders(list2.size()));
        handleConnectionLost(() -> {
            return Integer.valueOf(handleUpdatePublished(format, list2));
        });
    }

    private int handleUpdatePublished(String str, List<String> list) throws SQLException {
        this.logger.info("mark Events As Published query:" + str);
        try {
            Connection connection = this.dataSource.getConnection();
            try {
                PreparedStatement prepareStatement = connection.prepareStatement(str);
                setValues(prepareStatement, list.toArray());
                int executeUpdate = prepareStatement.executeUpdate();
                System.out.println("result:" + executeUpdate);
                if (connection != null) {
                    connection.close();
                }
                return executeUpdate;
            } finally {
            }
        } catch (SQLException e) {
            this.logger.error("SqlException:", (Throwable) e);
            throw new SQLException(e);
        }
    }

    private <T> T handleConnectionLost(Callable<T> callable) {
        T call;
        int i = 0;
        while (true) {
            try {
                call = callable.call();
                if (i <= 0) {
                    break;
                }
                this.logger.info("Reconnected to database");
                break;
            } catch (SQLException e) {
                this.logger.error(String.format("Could not access database %s - retrying in %s milliseconds", e.getMessage(), Integer.valueOf(this.pollingRetryIntervalInMilliseconds)), (Throwable) e);
                int i2 = i;
                i++;
                if (i2 >= this.maxAttemptsForPolling) {
                    throw new RuntimeException(e);
                }
                try {
                    Thread.sleep(this.pollingRetryIntervalInMilliseconds);
                } catch (InterruptedException e2) {
                    this.logger.error(e2.getMessage(), (Throwable) e2);
                }
            } catch (Exception e3) {
                throw new RuntimeException(e3);
            }
        }
        return call;
    }

    public static String preparePlaceHolders(int i) {
        return String.join(",", Collections.nCopies(i, "?"));
    }

    public static void setValues(PreparedStatement preparedStatement, Object... objArr) throws SQLException {
        for (int i = 0; i < objArr.length; i++) {
            preparedStatement.setObject(i + 1, objArr[i]);
        }
    }
}
