package com.networknt.tram.cdc.polling.connector;

import com.networknt.eventuate.server.common.CdcProcessor;
import com.networknt.tram.cdc.mysql.connector.MessageWithDestination;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/networknt/tram/cdc/polling/connector/PollingCdcProcessor.class */
public class PollingCdcProcessor<EVENT_BEAN, EVENT, ID> implements CdcProcessor<EVENT> {
    private PollingDao pollingDao;
    private int pollingIntervalInMilliseconds;
    private Logger logger = LoggerFactory.getLogger(getClass());
    private AtomicBoolean watcherRunning = new AtomicBoolean(false);

    public PollingCdcProcessor(PollingDao<EVENT_BEAN, EVENT, ID> pollingDao, int i) {
        this.pollingDao = pollingDao;
        this.pollingIntervalInMilliseconds = i;
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [com.networknt.tram.cdc.polling.connector.PollingCdcProcessor$1] */
    public void start(final Consumer<EVENT> consumer) {
        this.watcherRunning.set(true);
        new Thread() { // from class: com.networknt.tram.cdc.polling.connector.PollingCdcProcessor.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                while (PollingCdcProcessor.this.watcherRunning.get()) {
                    try {
                        List<MessageWithDestination> findEventsToPublish = PollingCdcProcessor.this.pollingDao.findEventsToPublish();
                        Consumer consumer2 = consumer;
                        Objects.requireNonNull(consumer2);
                        findEventsToPublish.forEach(consumer2::accept);
                        if (!findEventsToPublish.isEmpty()) {
                            PollingCdcProcessor.this.pollingDao.markEventsAsPublished(findEventsToPublish);
                        }
                        try {
                            Thread.sleep(PollingCdcProcessor.this.pollingIntervalInMilliseconds);
                        } catch (Exception e) {
                            PollingCdcProcessor.this.logger.error(e.getMessage(), e);
                        }
                    } catch (Exception e2) {
                        PollingCdcProcessor.this.logger.error(e2.getMessage(), e2);
                    }
                }
            }
        }.start();
    }

    public void stop() {
        this.watcherRunning.set(false);
    }
}
