package org.apache.eventmesh.storage.rabbitmq.consumer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import io.cloudevents.CloudEvent;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.api.EventListener;
import org.apache.eventmesh.api.consumer.Consumer;
import org.apache.eventmesh.common.ThreadPoolFactory;
import org.apache.eventmesh.common.config.Config;
import org.apache.eventmesh.storage.rabbitmq.client.RabbitmqClient;
import org.apache.eventmesh.storage.rabbitmq.client.RabbitmqConnectionFactory;
import org.apache.eventmesh.storage.rabbitmq.config.ConfigurationHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Config(field = "configurationHolder")
/* loaded from: input_file:org/apache/eventmesh/storage/rabbitmq/consumer/RabbitmqConsumer.class */
public class RabbitmqConsumer implements Consumer {
    private static final Logger log = LoggerFactory.getLogger(RabbitmqConsumer.class);
    private RabbitmqClient rabbitmqClient;
    private Connection connection;
    private Channel channel;
    private ConfigurationHolder configurationHolder;
    private RabbitmqConsumerHandler rabbitmqConsumerHandler;
    private RabbitmqConnectionFactory rabbitmqConnectionFactory = new RabbitmqConnectionFactory();
    private volatile boolean started = false;
    private final ThreadPoolExecutor executor = ThreadPoolFactory.createThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2, Runtime.getRuntime().availableProcessors() * 2, "EventMesh-Rabbitmq-Consumer");

    public boolean isStarted() {
        return this.started;
    }

    public boolean isClosed() {
        return !isStarted();
    }

    public void start() {
        if (this.started) {
            return;
        }
        this.started = true;
    }

    public void shutdown() {
        if (this.started) {
            try {
                this.rabbitmqClient.closeConnection(this.connection);
                this.rabbitmqClient.closeChannel(this.channel);
                this.rabbitmqConsumerHandler.stop();
            } finally {
                this.started = false;
            }
        }
    }

    public void init(Properties properties) throws Exception {
        this.rabbitmqClient = new RabbitmqClient(this.rabbitmqConnectionFactory);
        this.connection = this.rabbitmqClient.getConnection(this.configurationHolder.getHost(), this.configurationHolder.getUsername(), this.configurationHolder.getPasswd(), this.configurationHolder.getPort(), this.configurationHolder.getVirtualHost());
        this.channel = this.rabbitmqConnectionFactory.createChannel(this.connection);
        this.rabbitmqConsumerHandler = new RabbitmqConsumerHandler(this.channel, this.configurationHolder);
    }

    public void updateOffset(List<CloudEvent> list, AbstractContext abstractContext) {
    }

    public void subscribe(String str) {
        this.rabbitmqClient.binding(this.channel, this.configurationHolder.getExchangeType(), this.configurationHolder.getExchangeName(), this.configurationHolder.getRoutingKey(), this.configurationHolder.getQueueName());
        this.executor.execute(this.rabbitmqConsumerHandler);
    }

    public void unsubscribe(String str) {
        try {
            this.rabbitmqClient.unbinding(this.channel, this.configurationHolder.getExchangeName(), this.configurationHolder.getRoutingKey(), this.configurationHolder.getQueueName());
            this.rabbitmqConsumerHandler.stop();
        } catch (Exception e) {
            log.error("[RabbitmqConsumer] unsubscribe happen exception.", e);
        }
    }

    public void registerEventListener(EventListener eventListener) {
        this.rabbitmqConsumerHandler.setEventListener(eventListener);
    }

    public void setRabbitmqConnectionFactory(RabbitmqConnectionFactory rabbitmqConnectionFactory) {
        this.rabbitmqConnectionFactory = rabbitmqConnectionFactory;
    }

    public ConfigurationHolder getClientConfiguration() {
        return this.configurationHolder;
    }
}
