package gu.simplemq.jms;

import com.google.common.base.Preconditions;
import gu.simplemq.BaseMQDispatcher;
import gu.simplemq.Constant;
import gu.simplemq.MessageAck;
import gu.simplemq.SimplemqContext;
import gu.simplemq.exceptions.SmqRuntimeException;
import java.util.Hashtable;
import java.util.Iterator;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

/* loaded from: input_file:gu/simplemq/jms/BaseJmsDispatcher.class */
abstract class BaseJmsDispatcher extends BaseMQDispatcher<Connection> implements JMSReconnectCallback {
    private final Hashtable<String, ConsumerContext> consumers;
    private final AutoReconnectAdapter autoReconnectAdapter;
    private Connection connection;

    /* loaded from: input_file:gu/simplemq/jms/BaseJmsDispatcher$ActivemqListener.class */
    private class ActivemqListener implements MessageListener {
        private final String channel;
        private final boolean autoack;

        ActivemqListener(String str, boolean z) {
            this.channel = str;
            this.autoack = z;
        }

        private String textOf(Message message) throws JMSException {
            if (message instanceof TextMessage) {
                return ((TextMessage) message).getText();
            }
            if (!(message instanceof BytesMessage)) {
                throw new IllegalArgumentException(String.format("INVALID message type,%s,%s required", TextMessage.class.getName(), BytesMessage.class.getName()));
            }
            BytesMessage bytesMessage = (BytesMessage) message;
            byte[] bArr = new byte[(int) bytesMessage.getBodyLength()];
            bytesMessage.readBytes(bArr);
            return new String(bArr, Constant.UTF_8);
        }

        @Override // javax.jms.MessageListener
        public void onMessage(Message message) {
            try {
                if (!this.autoack) {
                    SimplemqContext.getCurrentContext().setMessageAck(new JMSMessageAck(message));
                }
                BaseJmsDispatcher.this.dispatch(this.channel, textOf(message));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    /* loaded from: input_file:gu/simplemq/jms/BaseJmsDispatcher$ConsumerContext.class */
    class ConsumerContext {
        MessageConsumer consumer;
        Session session;

        ConsumerContext() {
        }
    }

    /* loaded from: input_file:gu/simplemq/jms/BaseJmsDispatcher$JMSMessageAck.class */
    private static class JMSMessageAck implements MessageAck {
        private final Message message;

        private JMSMessageAck(Message message) {
            this.message = (Message) Preconditions.checkNotNull(message, "message is null");
        }

        @Override // gu.simplemq.MessageAck
        public void acknowledge() throws SmqRuntimeException {
            try {
                this.message.acknowledge();
            } catch (JMSException e) {
                throw new SmqRuntimeException(e);
            }
        }
    }

    protected abstract Destination makeDestination(Session session, String str) throws JMSException;

    /* JADX INFO: Access modifiers changed from: package-private */
    public BaseJmsDispatcher(JmsPoolLazy jmsPoolLazy) {
        super(jmsPoolLazy);
        this.consumers = new Hashtable<>();
        this.autoReconnectAdapter = new AutoReconnectAdapter(this);
    }

    @Override // gu.simplemq.BaseMQDispatcher
    protected void doInit() throws JMSException {
        this.connection = getConnection();
        new ExceptionListenerContainer(this.autoReconnectAdapter).bind(this.connection);
        this.connection.start();
    }

    @Override // gu.simplemq.BaseMQDispatcher
    protected void doUninit() {
    }

    @Override // gu.simplemq.BaseMQDispatcher
    protected void doSub(String str) throws JMSException {
        synchronized (this.consumers) {
            if (null == this.consumers.get(str)) {
                ConsumerContext consumerContext = new ConsumerContext();
                boolean isAutoack = getChannel(str).isAutoack();
                consumerContext.session = ((Connection) Preconditions.checkNotNull(this.connection, "connection is uninitialized")).createSession(Boolean.FALSE.booleanValue(), isAutoack ? 1 : 2);
                consumerContext.consumer = consumerContext.session.createConsumer(makeDestination(consumerContext.session, str));
                consumerContext.consumer.setMessageListener(new ActivemqListener(str, isAutoack));
                this.consumers.put(str, consumerContext);
            }
        }
    }

    @Override // gu.simplemq.BaseMQDispatcher
    protected void doUnsub(String str) throws JMSException {
        synchronized (this.consumers) {
            ConsumerContext remove = this.consumers.remove(str);
            if (null != remove) {
                remove.consumer.close();
            }
        }
    }

    @Override // gu.simplemq.jms.JMSReconnectCallback
    public void onConnectionLost() throws Exception {
        synchronized (this.consumers) {
            Iterator<ConsumerContext> it = this.consumers.values().iterator();
            while (it.hasNext()) {
                try {
                    it.next().consumer.close();
                } catch (Exception e) {
                    logger.error(e.getMessage());
                }
            }
            this.consumers.clear();
        }
        uninit();
    }

    @Override // gu.simplemq.jms.JMSReconnectCallback
    public void tryReconnecting() throws Exception {
        init();
        subscribe(new String[0]);
    }

    @Override // gu.simplemq.jms.JMSReconnectCallback
    public String ownerName() {
        return getClass().getSimpleName();
    }
}
