package org.apache.james.events;

import com.google.common.base.Preconditions;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Predicate;
import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
import org.apache.james.backends.rabbitmq.ReceiverProvider;
import org.apache.james.events.EventListener;
import org.apache.james.util.MDCBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.ConsumeOptions;
import reactor.rabbitmq.QueueSpecification;
import reactor.rabbitmq.Sender;
import reactor.util.retry.Retry;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/james/events/GroupRegistration.class */
public class GroupRegistration implements Registration {
    private static final Logger LOGGER = LoggerFactory.getLogger(GroupRegistration.class);
    static final String RETRY_COUNT = "retry-count";
    static final int DEFAULT_RETRY_COUNT = 0;
    private final NamingStrategy namingStrategy;
    private final ReactorRabbitMQChannelPool channelPool;
    private final EventListener.ReactiveEventListener listener;
    private final WorkQueueName queueName;
    private final Runnable unregisterGroup;
    private final EventSerializer eventSerializer;
    private final GroupConsumerRetry retryHandler;
    private final WaitDelayGenerator delayGenerator;
    private final Group group;
    private final RetryBackoffConfiguration retryBackoff;
    private final ListenerExecutor listenerExecutor;
    private final RabbitMQConfiguration configuration;
    private Optional<Disposable> receiverSubscriber = Optional.empty();
    private final ReceiverProvider receiverProvider;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/james/events/GroupRegistration$WorkQueueName.class */
    public static class WorkQueueName {
        private final String prefix;
        private final Group group;

        /* JADX INFO: Access modifiers changed from: package-private */
        public WorkQueueName(String str, Group group) {
            this.prefix = str;
            Preconditions.checkNotNull(group, "Group must be specified");
            this.group = group;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public String asString() {
            return this.prefix + "-workQueue-" + this.group.asString();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public GroupRegistration(NamingStrategy namingStrategy, ReactorRabbitMQChannelPool reactorRabbitMQChannelPool, Sender sender, ReceiverProvider receiverProvider, EventSerializer eventSerializer, EventListener.ReactiveEventListener reactiveEventListener, Group group, RetryBackoffConfiguration retryBackoffConfiguration, EventDeadLetters eventDeadLetters, Runnable runnable, ListenerExecutor listenerExecutor, RabbitMQConfiguration rabbitMQConfiguration) {
        this.namingStrategy = namingStrategy;
        this.channelPool = reactorRabbitMQChannelPool;
        this.eventSerializer = eventSerializer;
        this.listener = reactiveEventListener;
        this.configuration = rabbitMQConfiguration;
        this.queueName = namingStrategy.workQueue(group);
        this.receiverProvider = receiverProvider;
        this.retryBackoff = retryBackoffConfiguration;
        this.listenerExecutor = listenerExecutor;
        this.unregisterGroup = runnable;
        this.retryHandler = new GroupConsumerRetry(namingStrategy, sender, group, retryBackoffConfiguration, eventDeadLetters, eventSerializer);
        this.delayGenerator = WaitDelayGenerator.of(retryBackoffConfiguration);
        this.group = group;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public GroupRegistration start() {
        this.receiverSubscriber = Optional.of((Disposable) createGroupWorkQueue().then(this.retryHandler.createRetryExchange(this.queueName)).then(Mono.fromCallable(this::consumeWorkQueue)).retryWhen(Retry.backoff(this.retryBackoff.getMaxRetries(), this.retryBackoff.getFirstBackoff()).jitter(this.retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic())).block());
        return this;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void restart() {
        Optional<Disposable> optional = this.receiverSubscriber;
        this.receiverSubscriber = Optional.of(consumeWorkQueue());
        optional.filter(Predicate.not((v0) -> {
            return v0.isDisposed();
        })).ifPresent((v0) -> {
            v0.dispose();
        });
    }

    private Mono<Void> createGroupWorkQueue() {
        return this.channelPool.createWorkQueue(QueueSpecification.queue(this.queueName.asString()).durable(true).exclusive(false).autoDelete(false).arguments(this.configuration.workQueueArgumentsBuilder(false).deadLetter(this.namingStrategy.deadLetterExchange()).build()));
    }

    private Disposable consumeWorkQueue() {
        ReceiverProvider receiverProvider = this.receiverProvider;
        Objects.requireNonNull(receiverProvider);
        return Flux.using(receiverProvider::createReceiver, receiver -> {
            return receiver.consumeManualAck(this.queueName.asString(), new ConsumeOptions().qos(10));
        }, (v0) -> {
            v0.close();
        }).publishOn(Schedulers.parallel()).filter(acknowledgableDelivery -> {
            return Objects.nonNull(acknowledgableDelivery.getBody());
        }).flatMap(this::deliver, 10).subscribeOn(Schedulers.elastic()).subscribe();
    }

    private Mono<Void> deliver(AcknowledgableDelivery acknowledgableDelivery) {
        byte[] body = acknowledgableDelivery.getBody();
        int retryCount = getRetryCount(acknowledgableDelivery);
        return deserializeEvent(body).flatMap(event -> {
            Mono flatMap = this.delayGenerator.delayIfHaveTo(retryCount).flatMap(num -> {
                return runListenerReliably(retryCount, event);
            });
            Objects.requireNonNull(acknowledgableDelivery);
            return flatMap.then(Mono.fromRunnable(acknowledgableDelivery::ack).subscribeOn(Schedulers.elastic()));
        }).onErrorResume(th -> {
            LOGGER.error("Unable to process delivery for group {}", this.group, th);
            return Mono.fromRunnable(() -> {
                acknowledgableDelivery.nack(false);
            }).subscribeOn(Schedulers.elastic()).then();
        });
    }

    public Mono<Void> runListenerReliably(int i, Event event) {
        return runListener(event).onErrorResume(th -> {
            return this.retryHandler.handleRetry(event, i, th);
        });
    }

    private Mono<Event> deserializeEvent(byte[] bArr) {
        return Mono.fromCallable(() -> {
            return this.eventSerializer.fromBytes(bArr);
        }).subscribeOn(Schedulers.parallel());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> reDeliver(Event event) {
        return this.retryHandler.retryOrStoreToDeadLetter(event, DEFAULT_RETRY_COUNT);
    }

    private Mono<Void> runListener(Event event) {
        return this.listenerExecutor.execute(this.listener, MDCBuilder.create().addToContext("group", this.group.asString()), event);
    }

    private int getRetryCount(AcknowledgableDelivery acknowledgableDelivery) {
        Optional flatMap = Optional.ofNullable(acknowledgableDelivery.getProperties().getHeaders()).flatMap(map -> {
            return Optional.ofNullable(map.get(RETRY_COUNT));
        });
        Class<Integer> cls = Integer.class;
        Objects.requireNonNull(Integer.class);
        Optional filter = flatMap.filter(cls::isInstance);
        Class<Integer> cls2 = Integer.class;
        Objects.requireNonNull(Integer.class);
        return ((Integer) filter.map(cls2::cast).orElse(Integer.valueOf(DEFAULT_RETRY_COUNT))).intValue();
    }

    public void unregister() {
        this.receiverSubscriber.filter(Predicate.not((v0) -> {
            return v0.isDisposed();
        })).ifPresent((v0) -> {
            v0.dispose();
        });
        this.unregisterGroup.run();
    }
}
