package io.pipelite.spi.endpoint;

import io.pipelite.spi.context.AbstractService;
import io.pipelite.spi.flow.ExceptionHandler;
import io.pipelite.spi.flow.concurrent.DefaultThreadFactory;
import io.pipelite.spi.flow.exchange.Exchange;
import io.pipelite.spi.flow.exchange.ExchangeFactory;
import io.pipelite.spi.flow.exchange.ExchangeFactoryAware;
import io.pipelite.spi.flow.exchange.FlowNode;
import io.pipelite.spi.flow.process.ExchangePostProcessor;
import io.pipelite.spi.flow.process.ExchangePreProcessor;
import java.util.concurrent.ThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pipelite/spi/endpoint/EventDrivenConsumerService.class */
public class EventDrivenConsumerService extends AbstractService implements Consumer, ExchangeFactoryAware {
    private static final String WORKER_PREFIX = "EDC";
    private final EventDrivenConsumer eventDrivenConsumer;
    protected ExchangeFactory exchangeFactory;
    private Thread receiveTask;
    private Thread poisonPillTask;
    private final Logger logger = LoggerFactory.getLogger(getClass());
    protected final ThreadFactory threadFactory = new DefaultThreadFactory(WORKER_PREFIX);

    /* loaded from: input_file:io/pipelite/spi/endpoint/EventDrivenConsumerService$EventDrivenConsumerPoisonPillTask.class */
    private final class EventDrivenConsumerPoisonPillTask implements Runnable {
        private EventDrivenConsumerPoisonPillTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            EventDrivenConsumerService.this.eventDrivenConsumer.consume(EventDrivenConsumerService.this.exchangeFactory.createExchange(EventDrivenConsumer.POISON_PILL));
        }
    }

    /* loaded from: input_file:io/pipelite/spi/endpoint/EventDrivenConsumerService$EventDrivenConsumerReceiveTask.class */
    private final class EventDrivenConsumerReceiveTask implements Runnable {
        private EventDrivenConsumerReceiveTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            if (!EventDrivenConsumerService.this.isRunAllowed()) {
                throw new IllegalStateException("Service run not allowed, status is " + EventDrivenConsumerService.this.getStatus());
            }
            while (EventDrivenConsumerService.this.isRunAllowed() && EventDrivenConsumerService.this.eventDrivenConsumer.receive() >= 1) {
            }
        }
    }

    public EventDrivenConsumerService(EventDrivenConsumer eventDrivenConsumer) {
        this.eventDrivenConsumer = eventDrivenConsumer;
    }

    @Override // io.pipelite.spi.context.AbstractService
    public void doStart() {
        if (this.receiveTask == null) {
            this.receiveTask = this.threadFactory.newThread(new EventDrivenConsumerReceiveTask());
        }
        this.receiveTask.start();
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("EventDrivenConsumerReceiveTask successfully started");
        }
    }

    @Override // io.pipelite.spi.context.AbstractService
    public void doStop() {
        if (this.poisonPillTask == null) {
            this.poisonPillTask = this.threadFactory.newThread(new EventDrivenConsumerPoisonPillTask());
        }
        this.poisonPillTask.start();
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("EventDrivenWorker successfully started");
        }
    }

    @Override // io.pipelite.spi.flow.exchange.ExchangeFactoryAware
    public void setExchangeFactory(ExchangeFactory exchangeFactory) {
        this.exchangeFactory = exchangeFactory;
    }

    @Override // io.pipelite.spi.endpoint.Consumer
    public Endpoint getEndpoint() {
        return this.eventDrivenConsumer.getEndpoint();
    }

    @Override // io.pipelite.spi.endpoint.Consumer
    public void consume(Exchange exchange) {
        this.eventDrivenConsumer.consume(exchange);
    }

    @Override // io.pipelite.spi.flow.exchange.FlowNode
    public void process(Exchange exchange) {
        this.eventDrivenConsumer.process(exchange);
    }

    @Override // io.pipelite.spi.flow.exchange.FlowNode
    public void setFlowName(String str) {
        this.eventDrivenConsumer.setFlowName(str);
    }

    @Override // io.pipelite.spi.flow.exchange.FlowNode
    public void setProcessorName(String str) {
        this.eventDrivenConsumer.setProcessorName(str);
    }

    @Override // io.pipelite.spi.flow.exchange.FlowNode
    public void setSourceEndpointResource(String str) {
        this.eventDrivenConsumer.setSourceEndpointResource(str);
    }

    @Override // io.pipelite.spi.flow.exchange.FlowNode
    public void setNext(FlowNode flowNode) {
        this.eventDrivenConsumer.setNext(flowNode);
    }

    @Override // io.pipelite.spi.flow.exchange.FlowNode
    public boolean hasNext() {
        return this.eventDrivenConsumer.hasNext();
    }

    @Override // io.pipelite.spi.endpoint.Consumer, io.pipelite.spi.flow.exchange.FlowNode
    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
        this.eventDrivenConsumer.setExceptionHandler(exceptionHandler);
    }

    @Override // io.pipelite.spi.endpoint.Consumer, io.pipelite.spi.flow.exchange.FlowNode
    public void tag(String str) {
        this.eventDrivenConsumer.tag(str);
    }

    @Override // io.pipelite.spi.flow.exchange.FlowNode
    public void addExchangePreProcessor(ExchangePreProcessor exchangePreProcessor) {
        this.eventDrivenConsumer.addExchangePreProcessor(exchangePreProcessor);
    }

    @Override // io.pipelite.spi.flow.exchange.FlowNode
    public void addExchangePostProcessor(ExchangePostProcessor exchangePostProcessor) {
        this.eventDrivenConsumer.addExchangePostProcessor(exchangePostProcessor);
    }
}
