package io.pipelite.spi.endpoint;

import io.pipelite.spi.flow.exchange.Exchange;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pipelite/spi/endpoint/EventDrivenConsumer.class */
public class EventDrivenConsumer extends DefaultConsumer {
    private final Logger sysLogger;
    public static final Object POISON_PILL = new Object();
    private static final int DEFAULT_QUEUE_SIZE = 20;
    protected final BlockingQueue<PriorityExchange> queue;
    private final AtomicLong exchangeCount;

    /* loaded from: input_file:io/pipelite/spi/endpoint/EventDrivenConsumer$PriorityExchange.class */
    public static class PriorityExchange implements Comparable<PriorityExchange> {
        private final Exchange exchange;
        private final long priority;

        public static PriorityExchange withMaxPriority(Exchange exchange) {
            return new PriorityExchange(exchange, -2147483648L);
        }

        public static PriorityExchange withNormalPriority(Exchange exchange, long j) {
            return new PriorityExchange(exchange, j);
        }

        private PriorityExchange(Exchange exchange, long j) {
            this.exchange = exchange;
            this.priority = j;
        }

        public Exchange getExchange() {
            return this.exchange;
        }

        @Override // java.lang.Comparable
        public int compareTo(PriorityExchange priorityExchange) {
            return Long.compare(this.priority, priorityExchange.priority);
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            return Objects.equals(this.exchange, ((PriorityExchange) obj).exchange);
        }

        public int hashCode() {
            return Objects.hash(this.exchange);
        }
    }

    public EventDrivenConsumer(Endpoint endpoint) {
        this(endpoint, DEFAULT_QUEUE_SIZE);
    }

    public EventDrivenConsumer(Endpoint endpoint, int i) {
        super(endpoint);
        this.sysLogger = LoggerFactory.getLogger(getClass());
        this.queue = new PriorityBlockingQueue(i);
        this.exchangeCount = new AtomicLong(0L);
    }

    @Override // io.pipelite.spi.endpoint.DefaultConsumer, io.pipelite.spi.flow.exchange.FlowNode
    public void process(Exchange exchange) {
        long incrementAndGet = this.exchangeCount.incrementAndGet();
        try {
            preProcessExchange(exchange);
            this.queue.put(PriorityExchange.withNormalPriority(exchange, incrementAndGet));
            postProcessExchange(exchange);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (RuntimeException e2) {
            if (this.sysLogger.isErrorEnabled()) {
                this.sysLogger.error("{} - An underlying error occurred enqueueing Exchange #{}", new Object[]{this.tag, Long.valueOf(incrementAndGet), e2});
            }
            throw e2;
        }
    }

    public int receive() {
        try {
            if (!hasNext()) {
                throw new IllegalStateException("DefaultConsumer does not have a next FlowNode");
            }
            PriorityExchange take = this.queue.take();
            Exchange exchange = take.getExchange();
            synchronized (this) {
                if (this.tag != null && this.sysLogger.isTraceEnabled()) {
                    this.sysLogger.trace("{} - Exchange #{} extracted from queue, processing.", this.tag, Long.valueOf(take.priority));
                }
            }
            if (!POISON_PILL.equals(exchange.getInputPayloadAs(Object.class))) {
                super.process(exchange);
                return 1;
            }
            if (this.tag == null || !this.sysLogger.isTraceEnabled()) {
                return 0;
            }
            this.sysLogger.trace("{} - Poison pill acquired, terminating.", this.tag);
            return 0;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return 1;
        }
    }
}
