/*
 * Decompiled with CFR 0.152.
 */
package gu.simplemq.jms;

import com.google.common.base.Preconditions;
import gu.simplemq.BaseMQDispatcher;
import gu.simplemq.Constant;
import gu.simplemq.MessageAck;
import gu.simplemq.SimplemqContext;
import gu.simplemq.exceptions.SmqRuntimeException;
import gu.simplemq.jms.AutoReconnectAdapter;
import gu.simplemq.jms.ExceptionListenerContainer;
import gu.simplemq.jms.JMSReconnectCallback;
import gu.simplemq.jms.JmsPoolLazy;
import java.util.Hashtable;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

abstract class BaseJmsDispatcher
extends BaseMQDispatcher<Connection>
implements JMSReconnectCallback {
    private final Hashtable<String, ConsumerContext> consumers = new Hashtable();
    private final AutoReconnectAdapter autoReconnectAdapter = new AutoReconnectAdapter(this);
    private Connection connection;

    protected abstract Destination makeDestination(Session var1, String var2) throws JMSException;

    BaseJmsDispatcher(JmsPoolLazy pool) {
        super(pool);
    }

    @Override
    protected void doInit() throws JMSException {
        this.connection = (Connection)this.getConnection();
        new ExceptionListenerContainer(this.autoReconnectAdapter).bind(this.connection);
        this.connection.start();
    }

    @Override
    protected void doUninit() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void doSub(String channel) throws JMSException {
        Hashtable<String, ConsumerContext> hashtable = this.consumers;
        synchronized (hashtable) {
            ConsumerContext ctx = this.consumers.get(channel);
            if (null == ctx) {
                ctx = new ConsumerContext();
                boolean autoack = this.getChannel(channel).isAutoack();
                ctx.session = Preconditions.checkNotNull(this.connection, "connection is uninitialized").createSession(Boolean.FALSE, autoack ? 1 : 2);
                Destination destination = this.makeDestination(ctx.session, channel);
                ctx.consumer = ctx.session.createConsumer(destination);
                ctx.consumer.setMessageListener(new ActivemqListener(channel, autoack));
                this.consumers.put(channel, ctx);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void doUnsub(String channel) throws JMSException {
        Hashtable<String, ConsumerContext> hashtable = this.consumers;
        synchronized (hashtable) {
            ConsumerContext ctx = this.consumers.remove(channel);
            if (null != ctx) {
                ctx.consumer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onConnectionLost() throws Exception {
        Hashtable<String, ConsumerContext> hashtable = this.consumers;
        synchronized (hashtable) {
            for (ConsumerContext ctx : this.consumers.values()) {
                try {
                    ctx.consumer.close();
                }
                catch (Exception e) {
                    logger.error(e.getMessage());
                }
            }
            this.consumers.clear();
        }
        this.uninit();
    }

    @Override
    public void tryReconnecting() throws Exception {
        this.init();
        this.subscribe(new String[0]);
    }

    @Override
    public String ownerName() {
        return this.getClass().getSimpleName();
    }

    class ConsumerContext {
        MessageConsumer consumer;
        Session session;

        ConsumerContext() {
        }
    }

    private static class JMSMessageAck
    implements MessageAck {
        private final Message message;

        private JMSMessageAck(Message message) {
            this.message = Preconditions.checkNotNull(message, "message is null");
        }

        @Override
        public void acknowledge() throws SmqRuntimeException {
            try {
                this.message.acknowledge();
            }
            catch (JMSException e) {
                throw new SmqRuntimeException(e);
            }
        }
    }

    private class ActivemqListener
    implements MessageListener {
        private final String channel;
        private final boolean autoack;

        ActivemqListener(String channel, boolean autoack) {
            this.channel = channel;
            this.autoack = autoack;
        }

        private String textOf(Message message) throws JMSException {
            if (message instanceof TextMessage) {
                return ((TextMessage)message).getText();
            }
            if (message instanceof BytesMessage) {
                BytesMessage bytesMessage = (BytesMessage)message;
                byte[] buf = new byte[(int)bytesMessage.getBodyLength()];
                bytesMessage.readBytes(buf);
                return new String(buf, Constant.UTF_8);
            }
            throw new IllegalArgumentException(String.format("INVALID message type,%s,%s required", TextMessage.class.getName(), BytesMessage.class.getName()));
        }

        @Override
        public void onMessage(Message message) {
            try {
                if (!this.autoack) {
                    SimplemqContext.getCurrentContext().setMessageAck(new JMSMessageAck(message));
                }
                BaseJmsDispatcher.this.dispatch(this.channel, this.textOf(message));
            }
            catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}

