package com.tvd12.ezymq.activemq.endpoint;

import com.tvd12.ezyfox.concurrent.EzyThreadList;
import com.tvd12.ezyfox.util.EzyProcessor;
import com.tvd12.ezyfox.util.EzyStartable;
import com.tvd12.ezymq.activemq.concurrent.EzyActiveThreadFactory;
import com.tvd12.ezymq.activemq.endpoint.EzyActiveTopicEndpoint;
import com.tvd12.ezymq.activemq.handler.EzyActiveMessageHandler;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;

/* loaded from: input_file:com/tvd12/ezymq/activemq/endpoint/EzyActiveTopicServer.class */
public class EzyActiveTopicServer extends EzyActiveTopicEndpoint implements EzyStartable {
    protected volatile boolean active;
    protected final int threadPoolSize;
    protected final MessageConsumer consumer;
    protected final EzyThreadList executorService;
    protected EzyActiveMessageHandler messageHandler;

    /* loaded from: input_file:com/tvd12/ezymq/activemq/endpoint/EzyActiveTopicServer$Builder.class */
    public static class Builder extends EzyActiveTopicEndpoint.Builder<Builder> {
        protected int threadPoolSize = 1;

        public Builder threadPoolSize(int i) {
            this.threadPoolSize = i;
            return this;
        }

        @Override // com.tvd12.ezymq.activemq.endpoint.EzyActiveTopicEndpoint.Builder
        /* renamed from: build */
        public EzyActiveTopicServer mo12build() {
            return (EzyActiveTopicServer) super.mo12build();
        }

        @Override // com.tvd12.ezymq.activemq.endpoint.EzyActiveTopicEndpoint.Builder
        protected EzyActiveTopicEndpoint newEndpoint() throws Exception {
            return new EzyActiveTopicServer(this.session, this.topic, this.threadPoolSize);
        }
    }

    public EzyActiveTopicServer(Session session, Destination destination, int i) throws Exception {
        super(session, destination);
        this.threadPoolSize = i;
        this.executorService = newExecutorService();
        this.consumer = session.createConsumer(destination);
    }

    public static Builder builder() {
        return new Builder();
    }

    public void start() throws Exception {
        this.active = true;
        this.executorService.execute();
    }

    protected void loop() {
        while (this.active) {
            Message message = null;
            try {
                message = (BytesMessage) this.consumer.receive();
            } catch (JMSException e) {
                this.logger.warn("receive topic message error", e);
            } catch (Throwable th) {
                this.logger.warn("process message: {} error", message, th);
            }
            if (message == null) {
                return;
            }
            this.messageHandler.handle(getMessageProperties(message), getMessageBody(message));
        }
    }

    protected EzyThreadList newExecutorService() {
        return new EzyThreadList(this.threadPoolSize, this::loop, EzyActiveThreadFactory.create("topic-server"));
    }

    public void close() {
        this.active = false;
        EzyThreadList ezyThreadList = this.executorService;
        ezyThreadList.getClass();
        EzyProcessor.processWithLogException(ezyThreadList::interrupt);
        MessageConsumer messageConsumer = this.consumer;
        messageConsumer.getClass();
        EzyProcessor.processWithLogException(messageConsumer::close);
    }

    public void setMessageHandler(EzyActiveMessageHandler ezyActiveMessageHandler) {
        this.messageHandler = ezyActiveMessageHandler;
    }
}
