package org.axonframework.eventhandling;

import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.axonframework.common.stream.BlockingStream;
import org.axonframework.eventsourcing.eventstore.EventUtils;
import org.axonframework.eventsourcing.eventstore.TrackingEventStream;
import org.axonframework.eventsourcing.eventstore.TrackingToken;
import org.axonframework.monitoring.MessageMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/eventhandling/SimpleEventBus.class */
public class SimpleEventBus extends AbstractEventBus {
    private static final Logger logger = LoggerFactory.getLogger(SimpleEventBus.class);
    private static final int DEFAULT_QUEUE_CAPACITY = Integer.MAX_VALUE;
    private final Collection<EventConsumer> eventStreams;
    private final int queueCapacity;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/eventhandling/SimpleEventBus$EventConsumer.class */
    public class EventConsumer implements TrackingEventStream {
        private final BlockingQueue<TrackedEventMessage<?>> eventQueue;
        private TrackedEventMessage<?> peekEvent;

        private EventConsumer(int i) {
            this.eventQueue = new LinkedBlockingQueue(i);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void addEvents(List<? extends EventMessage<?>> list) {
            list.forEach(eventMessage -> {
                try {
                    this.eventQueue.put(EventUtils.asTrackedEventMessage(eventMessage, null));
                } catch (InterruptedException e) {
                    SimpleEventBus.logger.warn("Event producer thread was interrupted. Shutting down.", e);
                    Thread.currentThread().interrupt();
                }
            });
        }

        @Override // org.axonframework.common.stream.BlockingStream
        public Optional<TrackedEventMessage<?>> peek() {
            return Optional.ofNullable((this.peekEvent != null || hasNextAvailable()) ? this.peekEvent : null);
        }

        @Override // org.axonframework.common.stream.BlockingStream
        public boolean hasNextAvailable(int i, TimeUnit timeUnit) {
            try {
                if (this.peekEvent == null) {
                    TrackedEventMessage<?> poll = this.eventQueue.poll(i, timeUnit);
                    this.peekEvent = poll;
                    if (poll == null) {
                        return false;
                    }
                }
                return true;
            } catch (InterruptedException e) {
                SimpleEventBus.logger.warn("Consumer thread was interrupted. Returning thread to event processor.", e);
                Thread.currentThread().interrupt();
                return false;
            }
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.axonframework.common.stream.BlockingStream
        public TrackedEventMessage<?> nextAvailable() {
            try {
                return this.peekEvent == null ? this.eventQueue.take() : this.peekEvent;
            } catch (InterruptedException e) {
                SimpleEventBus.logger.warn("Consumer thread was interrupted. Returning thread to event processor.", e);
                Thread.currentThread().interrupt();
                return null;
            } finally {
                this.peekEvent = null;
            }
        }

        @Override // org.axonframework.common.stream.BlockingStream, java.lang.AutoCloseable
        public void close() {
            SimpleEventBus.this.eventStreams.remove(this);
        }
    }

    public SimpleEventBus() {
        this.eventStreams = new CopyOnWriteArraySet();
        this.queueCapacity = Integer.MAX_VALUE;
    }

    public SimpleEventBus(int i, MessageMonitor<? super EventMessage<?>> messageMonitor) {
        super(messageMonitor);
        this.eventStreams = new CopyOnWriteArraySet();
        this.queueCapacity = i;
    }

    @Override // org.axonframework.eventhandling.AbstractEventBus
    protected void afterCommit(List<? extends EventMessage<?>> list) {
        this.eventStreams.forEach(eventConsumer -> {
            eventConsumer.addEvents(list);
        });
    }

    @Override // org.axonframework.eventhandling.EventBus, org.axonframework.messaging.StreamableMessageSource
    /* renamed from: openStream */
    public BlockingStream<TrackedEventMessage<?>> openStream2(TrackingToken trackingToken) {
        if (trackingToken != null) {
            throw new UnsupportedOperationException("The simple event bus does not support non-null tracking tokens");
        }
        EventConsumer eventConsumer = new EventConsumer(this.queueCapacity);
        this.eventStreams.add(eventConsumer);
        return eventConsumer;
    }
}
