package com.github.linyuzai.event.rabbitmq.subscriber;

import com.github.linyuzai.event.core.context.EventContext;
import com.github.linyuzai.event.core.error.EventErrorHandler;
import com.github.linyuzai.event.core.subscriber.Subscription;
import com.github.linyuzai.event.rabbitmq.binding.RabbitBinding;
import com.github.linyuzai.event.rabbitmq.endpoint.RabbitEventEndpoint;
import com.github.linyuzai.event.rabbitmq.exception.RabbitEventException;
import com.rabbitmq.client.Channel;
import java.util.Iterator;
import java.util.Objects;
import java.util.function.Consumer;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;

/* loaded from: input_file:com/github/linyuzai/event/rabbitmq/subscriber/AbstractRabbitEventSubscriber.class */
public abstract class AbstractRabbitEventSubscriber extends RabbitEventSubscriber {
    @Override // com.github.linyuzai.event.rabbitmq.subscriber.RabbitEventSubscriber
    public Subscription subscribeRabbit(RabbitEventEndpoint rabbitEventEndpoint, EventContext eventContext, Consumer<Object> consumer) {
        MessageListener createMessageListener = createMessageListener(rabbitEventEndpoint, eventContext, consumer);
        MessageListenerContainer createMessageListenerContainer = createMessageListenerContainer(rabbitEventEndpoint, eventContext, createMessageListener);
        if (createMessageListenerContainer.getMessageListener() == null) {
            createMessageListenerContainer.setupMessageListener(createMessageListener);
        }
        binding(new RabbitBinding(rabbitEventEndpoint.getAdmin()));
        createMessageListenerContainer.start();
        return new RabbitSubscription(createMessageListenerContainer);
    }

    public void binding(RabbitBinding rabbitBinding) {
    }

    public abstract MessageListenerContainer createMessageListenerContainer(RabbitEventEndpoint rabbitEventEndpoint, EventContext eventContext, MessageListener messageListener);

    public MessageListener createMessageListener(RabbitEventEndpoint rabbitEventEndpoint, EventContext eventContext, Consumer<Object> consumer) {
        RabbitProperties.Listener listener = rabbitEventEndpoint.getProperties().getListener();
        if (listener.getType() == RabbitProperties.ContainerType.SIMPLE) {
            RabbitProperties.SimpleContainer simple = listener.getSimple();
            return createMessageListener(simple.getAcknowledgeMode(), simple.isConsumerBatchEnabled(), rabbitEventEndpoint, eventContext, consumer);
        }
        if (listener.getType() == RabbitProperties.ContainerType.DIRECT) {
            return createMessageListener(listener.getDirect().getAcknowledgeMode(), false, rabbitEventEndpoint, eventContext, consumer);
        }
        throw new RabbitEventException("Unsupported listener type: " + listener.getType());
    }

    protected MessageListener createMessageListener(AcknowledgeMode acknowledgeMode, boolean z, RabbitEventEndpoint rabbitEventEndpoint, EventContext eventContext, Consumer<Object> consumer) {
        EventErrorHandler eventErrorHandler = (EventErrorHandler) eventContext.get(EventErrorHandler.class);
        return acknowledgeMode == AcknowledgeMode.MANUAL ? z ? (list, channel) -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                Message message = (Message) it.next();
                handleMessage(message, rabbitEventEndpoint, eventContext, () -> {
                    try {
                        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
                    } catch (Throwable th) {
                        eventErrorHandler.onError(th, rabbitEventEndpoint, eventContext);
                    }
                }, consumer);
            }
        } : (message, channel2) -> {
            handleMessage(message, rabbitEventEndpoint, eventContext, () -> {
                try {
                    ((Channel) Objects.requireNonNull(channel2)).basicAck(message.getMessageProperties().getDeliveryTag(), false);
                } catch (Throwable th) {
                    eventErrorHandler.onError(th, rabbitEventEndpoint, eventContext);
                }
            }, consumer);
        } : z ? list2 -> {
            Iterator it = list2.iterator();
            while (it.hasNext()) {
                handleMessage((Message) it.next(), rabbitEventEndpoint, eventContext, null, consumer);
            }
        } : message2 -> {
            handleMessage(message2, rabbitEventEndpoint, eventContext, null, consumer);
        };
    }

    public void handleMessage(Message message, RabbitEventEndpoint rabbitEventEndpoint, EventContext eventContext, Runnable runnable, Consumer<Object> consumer) {
        EventErrorHandler eventErrorHandler = (EventErrorHandler) eventContext.get(EventErrorHandler.class);
        try {
            consumer.accept(getPayload(message, rabbitEventEndpoint, eventContext));
            if (runnable != null) {
                runnable.run();
            }
        } catch (Throwable th) {
            eventErrorHandler.onError(th, rabbitEventEndpoint, eventContext);
        }
    }

    public Object getPayload(Message message, RabbitEventEndpoint rabbitEventEndpoint, EventContext eventContext) {
        return message.getBody();
    }
}
