/*
 * Decompiled with CFR 0.152.
 */
package org.noear.water.protocol.solution;

import java.util.List;
import java.util.Properties;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.noear.water.protocol.MessageQueue;
import org.noear.water.utils.ext.Act1;

public class MessageQueueRocketMQ
implements MessageQueue {
    String _queue_name;
    final String group_name = "water_message";
    final String server;
    DefaultMQProducer producer;
    DefaultLitePullConsumer consumer;

    public MessageQueueRocketMQ(String name, Properties prop) {
        this._queue_name = name;
        this.server = prop.getProperty("server");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void initProducer() {
        if (this.producer != null) {
            return;
        }
        String string = "water_message";
        synchronized ("water_message") {
            if (this.producer != null) {
                // ** MonitorExit[var1_1] (shouldn't be in output)
                return;
            }
            this.producer = new DefaultMQProducer("water_message");
            this.producer.setNamesrvAddr(this.server);
            this.producer.setSendMsgTimeout(3000);
            try {
                this.producer.start();
            }
            catch (Exception ex) {
                throw new RuntimeException(ex);
            }
            return;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void initConsumer() {
        if (this.consumer != null) {
            return;
        }
        String string = "water_message";
        synchronized ("water_message") {
            if (this.consumer != null) {
                // ** MonitorExit[var1_1] (shouldn't be in output)
                return;
            }
            this.consumer = new DefaultLitePullConsumer("water_message");
            this.consumer.setNamesrvAddr(this.server);
            this.consumer.setPullBatchSize(1);
            this.consumer.setPollTimeoutMillis(3000L);
            try {
                this.consumer.subscribe(this._queue_name, "*");
                this.consumer.start();
            }
            catch (Exception ex) {
                ex.printStackTrace();
                throw new RuntimeException(ex);
            }
            return;
        }
    }

    @Override
    public boolean push(String msg_id) {
        this.initProducer();
        try {
            Message msgX = new Message(this._queue_name, msg_id.getBytes());
            msgX.setKeys(msg_id);
            SendResult send = this.producer.send(msgX);
            return send.getSendStatus().equals((Object)SendStatus.SEND_OK);
        }
        catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }

    @Override
    public String poll() {
        this.initConsumer();
        try {
            List msgs = this.consumer.poll();
            if (msgs.size() == 0) {
                return null;
            }
            this.consumer.commitSync();
            return new String(((MessageExt)msgs.get(0)).getBody());
        }
        catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }

    @Override
    public void pollGet(Act1<String> callback) {
        this.initConsumer();
        try {
            List msgs;
            while ((msgs = this.consumer.poll()).size() != 0) {
                this.consumer.commitSync();
                callback.run((Object)new String(((MessageExt)msgs.get(0)).getBody()));
            }
        }
        catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }

    @Override
    public long count() {
        return 0L;
    }

    @Override
    public void close() throws Exception {
        if (this.producer != null) {
            this.producer.shutdown();
        }
        if (this.consumer != null) {
            this.consumer.shutdown();
        }
    }
}

