package io.pipelite.spi.endpoint;

import io.pipelite.spi.context.AbstractService;
import io.pipelite.spi.flow.ExceptionHandler;
import io.pipelite.spi.flow.exchange.Exchange;
import io.pipelite.spi.flow.exchange.FlowNode;
import io.pipelite.spi.flow.process.ExchangePostProcessor;
import io.pipelite.spi.flow.process.ExchangePreProcessor;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:io/pipelite/spi/endpoint/ScheduledPollingConsumerService.class */
public class ScheduledPollingConsumerService extends AbstractService implements PollingConsumer {
    protected static final String INITIAL_DELAY_PROPERTY_NAME = "initialDelay";
    protected static final String PERIOD_PROPERTY_NAME = "period";
    protected static final String TIME_UNIT_PROPERTY_NAME = "timeUnit";
    protected final PollingConsumer pollingConsumer;
    protected final ScheduledExecutorService consumerPool;
    private final Collection<ScheduledFuture<?>> scheduledWorkers;

    public ScheduledPollingConsumerService(PollingConsumer pollingConsumer, ScheduledExecutorService scheduledExecutorService) {
        Objects.requireNonNull(pollingConsumer, "pollingConsumer is required and cannot be null");
        Objects.requireNonNull(scheduledExecutorService, "consumerPool is required and cannot be null");
        this.pollingConsumer = pollingConsumer;
        this.consumerPool = scheduledExecutorService;
        this.scheduledWorkers = new ArrayList();
    }

    @Override // io.pipelite.spi.context.AbstractService
    public void doStart() {
        EndpointProperties properties = this.pollingConsumer.getEndpoint().getProperties();
        Long asLongOrDefault = properties.getAsLongOrDefault(PERIOD_PROPERTY_NAME, 1000L);
        Long asLongOrDefault2 = properties.getAsLongOrDefault(INITIAL_DELAY_PROPERTY_NAME, 0L);
        addScheduledWorker(this.consumerPool.scheduleAtFixedRate(() -> {
            Exchange receive = this.pollingConsumer.receive();
            if (receive != null) {
                this.pollingConsumer.process(receive);
            }
        }, asLongOrDefault2.longValue(), asLongOrDefault.longValue(), TimeUnit.valueOf(properties.getOrDefault(TIME_UNIT_PROPERTY_NAME, TimeUnit.MILLISECONDS.name()))));
    }

    protected final void addScheduledWorker(ScheduledFuture<?> scheduledFuture) {
        this.scheduledWorkers.add(scheduledFuture);
    }

    @Override // io.pipelite.spi.context.AbstractService
    public void doStop() {
        this.scheduledWorkers.forEach(scheduledFuture -> {
            scheduledFuture.cancel(true);
        });
    }

    @Override // io.pipelite.spi.endpoint.Consumer
    public Endpoint getEndpoint() {
        return this.pollingConsumer.getEndpoint();
    }

    @Override // io.pipelite.spi.endpoint.Consumer
    public void consume(Exchange exchange) {
        this.pollingConsumer.consume(exchange);
    }

    @Override // io.pipelite.spi.flow.exchange.FlowNode
    public void process(Exchange exchange) {
        this.pollingConsumer.process(exchange);
    }

    @Override // io.pipelite.spi.flow.exchange.FlowNode
    public void setFlowName(String str) {
        this.pollingConsumer.setFlowName(str);
    }

    @Override // io.pipelite.spi.flow.exchange.FlowNode
    public void setSourceEndpointResource(String str) {
        this.pollingConsumer.setSourceEndpointResource(str);
    }

    @Override // io.pipelite.spi.flow.exchange.FlowNode
    public void setProcessorName(String str) {
        this.pollingConsumer.setProcessorName(str);
    }

    @Override // io.pipelite.spi.flow.exchange.FlowNode
    public void setNext(FlowNode flowNode) {
        this.pollingConsumer.setNext(flowNode);
    }

    @Override // io.pipelite.spi.flow.exchange.FlowNode
    public boolean hasNext() {
        return this.pollingConsumer.hasNext();
    }

    @Override // io.pipelite.spi.endpoint.Consumer, io.pipelite.spi.flow.exchange.FlowNode
    public void tag(String str) {
        this.pollingConsumer.tag(str);
    }

    @Override // io.pipelite.spi.flow.exchange.FlowNode
    public void addExchangePreProcessor(ExchangePreProcessor exchangePreProcessor) {
        this.pollingConsumer.addExchangePreProcessor(exchangePreProcessor);
    }

    @Override // io.pipelite.spi.flow.exchange.FlowNode
    public void addExchangePostProcessor(ExchangePostProcessor exchangePostProcessor) {
        this.pollingConsumer.addExchangePostProcessor(exchangePostProcessor);
    }

    @Override // io.pipelite.spi.endpoint.PollingConsumer
    public Exchange receive() {
        Exchange receive;
        synchronized (this) {
            receive = this.pollingConsumer.receive();
        }
        return receive;
    }

    @Override // io.pipelite.spi.endpoint.PollingConsumer
    public Exchange receive(long j) {
        return this.pollingConsumer.receive(j);
    }

    @Override // io.pipelite.spi.endpoint.PollingConsumer
    public Exchange receiveNoWait() {
        return this.pollingConsumer.receiveNoWait();
    }

    @Override // io.pipelite.spi.endpoint.Consumer, io.pipelite.spi.flow.exchange.FlowNode
    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
        this.pollingConsumer.setExceptionHandler(exceptionHandler);
    }
}
