/*
 * Decompiled with CFR 0.152.
 */
package org.noear.water.protocol.solution;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.GetResponse;
import java.util.HashMap;
import org.noear.water.protocol.MessageQueue;
import org.noear.water.utils.RabbitMQX;
import org.noear.water.utils.ext.Act1;

public class MessageQueueRabbitMQ
implements MessageQueue {
    final RabbitMQX _rabbitX;
    final String rabbit_exchangeName = "water.message";
    final String rabbit_routingKey;
    final String rabbit_queueName;
    final BuiltinExchangeType rabbit_type = BuiltinExchangeType.DIRECT;
    final boolean rabbit_durable = true;
    final boolean rabbit_autoDelete = false;
    final boolean rabbit_internal = false;
    final AMQP.BasicProperties rabbit_msgProps;

    public MessageQueueRabbitMQ(String name, RabbitMQX rabbitX) {
        this._rabbitX = rabbitX;
        this.rabbit_routingKey = name;
        this.rabbit_queueName = "water.message." + name;
        this.rabbit_msgProps = new AMQP.BasicProperties().builder().deliveryMode(Integer.valueOf(2)).contentType("UTF-8").build();
        try {
            this.initDeclareAndBind();
        }
        catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }

    private void initDeclareAndBind() throws Exception {
        this._rabbitX.open0(channel -> {
            channel.exchangeDeclare("water.message", this.rabbit_type, true, false, false, new HashMap());
            channel.queueDeclare(this.rabbit_queueName, true, false, false, new HashMap());
            channel.queueBind(this.rabbit_queueName, "water.message", this.rabbit_routingKey, new HashMap());
        });
    }

    @Override
    public boolean push(String msg_id) {
        this._rabbitX.open0(channel -> channel.basicPublish("water.message", this.rabbit_routingKey, false, this.rabbit_msgProps, msg_id.getBytes()));
        return true;
    }

    @Override
    public String poll() {
        return (String)this._rabbitX.open1(channel -> {
            GetResponse rep = channel.basicGet(this.rabbit_queueName, true);
            if (rep == null) {
                return null;
            }
            return new String(rep.getBody());
        });
    }

    @Override
    public void pollGet(Act1<String> callback) {
        this._rabbitX.open0(channel -> {
            GetResponse rep;
            while ((rep = channel.basicGet(this.rabbit_queueName, true)) != null) {
                callback.run((Object)new String(rep.getBody()));
            }
        });
    }

    @Override
    public long count() {
        return (Long)this._rabbitX.open1(channel -> channel.messageCount(this.rabbit_queueName));
    }

    @Override
    public void close() throws Exception {
    }
}

