/*
 * Decompiled with CFR 0.152.
 */
package com.networknt.eventuate.cdc.polling;

import com.networknt.eventuate.cdc.polling.PollingDao;
import com.networknt.eventuate.server.common.CdcProcessor;
import com.networknt.eventuate.server.common.PublishedEvent;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PollingCdcProcessor<EVENT_BEAN, EVENT, ID>
implements CdcProcessor<EVENT> {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    private PollingDao pollingDao;
    private int pollingIntervalInMilliseconds;
    private AtomicBoolean watcherRunning = new AtomicBoolean(false);

    public PollingCdcProcessor(PollingDao<EVENT_BEAN, EVENT, ID> pollingDao, int pollingIntervalInMilliseconds) {
        this.pollingDao = pollingDao;
        this.pollingIntervalInMilliseconds = pollingIntervalInMilliseconds;
    }

    public void start(final Consumer<EVENT> eventConsumer) {
        this.watcherRunning.set(true);
        new Thread(){

            @Override
            public void run() {
                while (PollingCdcProcessor.this.watcherRunning.get()) {
                    try {
                        List<PublishedEvent> eventsToPublish = PollingCdcProcessor.this.pollingDao.findEventsToPublish();
                        eventsToPublish.forEach(eventConsumer::accept);
                        if (!eventsToPublish.isEmpty()) {
                            PollingCdcProcessor.this.pollingDao.markEventsAsPublished(eventsToPublish);
                        }
                        try {
                            Thread.sleep(PollingCdcProcessor.this.pollingIntervalInMilliseconds);
                        }
                        catch (Exception e) {
                            PollingCdcProcessor.this.logger.error(e.getMessage(), (Throwable)e);
                        }
                    }
                    catch (Exception e) {
                        PollingCdcProcessor.this.logger.error(e.getMessage(), (Throwable)e);
                    }
                }
            }
        }.start();
    }

    public void stop() {
        this.watcherRunning.set(false);
    }
}

