package ms.dew.core.cluster.spi.rabbit;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import ms.dew.core.cluster.AbsClusterMQ;
import ms.dew.core.cluster.dto.MessageWrap;
import org.springframework.amqp.rabbit.connection.Connection;

/* loaded from: input_file:ms/dew/core/cluster/spi/rabbit/RabbitClusterMQ.class */
public class RabbitClusterMQ extends AbsClusterMQ {
    private static SendBeforeFun sendBeforeFun = (str, str2, basicProperties) -> {
        return null;
    };
    private static SendErrorFun sendErrorFun = (exc, obj) -> {
    };
    private static SendFinishFun sendFinishFun = obj -> {
    };
    private static ReceiveBeforeFun receiveBeforeFun = (str, str2, str3, basicProperties) -> {
        return null;
    };
    private static ReceiveErrorFun receiveErrorFun = (exc, obj) -> {
    };
    private static ReceiveFinishFun receiveFinishFun = obj -> {
    };
    private RabbitAdapter rabbitAdapter;

    public RabbitClusterMQ(RabbitAdapter rabbitAdapter) {
        this.rabbitAdapter = rabbitAdapter;
    }

    public static void setSendBeforeFun(SendBeforeFun sendBeforeFun2) {
        sendBeforeFun = sendBeforeFun2;
    }

    public static void setSendErrorFun(SendErrorFun sendErrorFun2) {
        sendErrorFun = sendErrorFun2;
    }

    public static void setSendFinishFun(SendFinishFun sendFinishFun2) {
        sendFinishFun = sendFinishFun2;
    }

    public static void setReceiveBeforeFun(ReceiveBeforeFun receiveBeforeFun2) {
        receiveBeforeFun = receiveBeforeFun2;
    }

    public static void setReceiveErrorFun(ReceiveErrorFun receiveErrorFun2) {
        receiveErrorFun = receiveErrorFun2;
    }

    public static void setReceiveFinishFun(ReceiveFinishFun receiveFinishFun2) {
        receiveFinishFun = receiveFinishFun2;
    }

    public boolean doPublish(String str, String str2, Optional<Map<String, Object>> optional, boolean z) {
        Connection connection = this.rabbitAdapter.getConnection();
        Channel createChannel = connection.createChannel(false);
        Object obj = null;
        try {
            if (z) {
                try {
                    createChannel.confirmSelect();
                } catch (IOException e) {
                    logger.error("[MQ] Rabbit publish error.", e);
                    sendErrorFun.invoke(e, obj);
                    try {
                        createChannel.close();
                        sendFinishFun.invoke(obj);
                    } catch (IOException | TimeoutException e2) {
                        logger.error("[MQ] Rabbit publish error.", e2);
                    }
                    connection.close();
                    return false;
                }
            }
            createChannel.exchangeDeclare(str, BuiltinExchangeType.FANOUT, true);
            Map mQHeader = getMQHeader(str);
            mQHeader.getClass();
            optional.ifPresent(mQHeader::putAll);
            AMQP.BasicProperties basicProperties = new AMQP.BasicProperties("text/plain", (String) null, mQHeader, 2, 0, (String) null, (String) null, (String) null, (String) null, (Date) null, (String) null, (String) null, (String) null, (String) null);
            obj = sendBeforeFun.invoke(str, "", basicProperties);
            createChannel.basicPublish(str, "", basicProperties, str2.getBytes());
            if (!z) {
                try {
                    createChannel.close();
                    sendFinishFun.invoke(obj);
                } catch (IOException | TimeoutException e3) {
                    logger.error("[MQ] Rabbit publish error.", e3);
                }
                connection.close();
                return true;
            }
            try {
                boolean waitForConfirms = createChannel.waitForConfirms();
                try {
                    createChannel.close();
                    sendFinishFun.invoke(obj);
                } catch (IOException | TimeoutException e4) {
                    logger.error("[MQ] Rabbit publish error.", e4);
                }
                connection.close();
                return waitForConfirms;
            } catch (InterruptedException e5) {
                Thread.currentThread().interrupt();
                logger.error("[MQ] Rabbit publish error.", e5);
                sendErrorFun.invoke(e5, obj);
                try {
                    createChannel.close();
                    sendFinishFun.invoke(obj);
                } catch (IOException | TimeoutException e6) {
                    logger.error("[MQ] Rabbit publish error.", e6);
                }
                connection.close();
                return false;
            }
        } catch (Throwable th) {
            try {
                createChannel.close();
                sendFinishFun.invoke(obj);
            } catch (IOException | TimeoutException e7) {
                logger.error("[MQ] Rabbit publish error.", e7);
            }
            connection.close();
            throw th;
        }
    }

    protected void doSubscribe(String str, Consumer<MessageWrap> consumer) {
        Channel createChannel = this.rabbitAdapter.getConnection().createChannel(false);
        try {
            createChannel.exchangeDeclare(str, BuiltinExchangeType.FANOUT, true);
            String queue = createChannel.queueDeclare().getQueue();
            createChannel.queueBind(queue, str, "");
            createChannel.basicQos(1);
            createChannel.basicConsume(queue, false, getDefaultConsumer(createChannel, str, str, "", queue, consumer));
        } catch (IOException e) {
            logger.error("[MQ] Rabbit response error.", e);
        }
    }

    public boolean doRequest(String str, String str2, Optional<Map<String, Object>> optional, boolean z) {
        Connection connection = this.rabbitAdapter.getConnection();
        Channel createChannel = connection.createChannel(false);
        Object obj = null;
        try {
            if (z) {
                try {
                    createChannel.confirmSelect();
                } catch (IOException e) {
                    logger.error("[MQ] Rabbit request error.", e);
                    sendErrorFun.invoke(e, obj);
                    try {
                        createChannel.close();
                        sendFinishFun.invoke(obj);
                    } catch (IOException | TimeoutException e2) {
                        logger.error("[MQ] Rabbit request error.", e2);
                    }
                    connection.close();
                    return false;
                }
            }
            createChannel.queueDeclare(str, true, false, false, (Map) null);
            Map mQHeader = getMQHeader(str);
            mQHeader.getClass();
            optional.ifPresent(mQHeader::putAll);
            AMQP.BasicProperties basicProperties = new AMQP.BasicProperties("text/plain", (String) null, mQHeader, 2, 0, (String) null, (String) null, (String) null, (String) null, (Date) null, (String) null, (String) null, (String) null, (String) null);
            obj = sendBeforeFun.invoke("", str, basicProperties);
            createChannel.basicPublish("", str, basicProperties, str2.getBytes());
            if (!z) {
                try {
                    createChannel.close();
                    sendFinishFun.invoke(obj);
                } catch (IOException | TimeoutException e3) {
                    logger.error("[MQ] Rabbit request error.", e3);
                }
                connection.close();
                return true;
            }
            try {
                boolean waitForConfirms = createChannel.waitForConfirms();
                try {
                    createChannel.close();
                    sendFinishFun.invoke(obj);
                } catch (IOException | TimeoutException e4) {
                    logger.error("[MQ] Rabbit request error.", e4);
                }
                connection.close();
                return waitForConfirms;
            } catch (InterruptedException e5) {
                Thread.currentThread().interrupt();
                logger.error("[MQ] Rabbit request error.", e5);
                sendErrorFun.invoke(e5, obj);
                try {
                    createChannel.close();
                    sendFinishFun.invoke(obj);
                } catch (IOException | TimeoutException e6) {
                    logger.error("[MQ] Rabbit request error.", e6);
                }
                connection.close();
                return false;
            }
        } catch (Throwable th) {
            try {
                createChannel.close();
                sendFinishFun.invoke(obj);
            } catch (IOException | TimeoutException e7) {
                logger.error("[MQ] Rabbit request error.", e7);
            }
            connection.close();
            throw th;
        }
    }

    public boolean publishWithTopic(String str, String str2, String str3, String str4, Optional<Map<String, Object>> optional, boolean z) {
        logger.trace("[MQ] publishWithTopic {}:{}", str, str4);
        Connection connection = this.rabbitAdapter.getConnection();
        Channel createChannel = connection.createChannel(false);
        try {
            if (z) {
                try {
                    createChannel.confirmSelect();
                } catch (IOException e) {
                    logger.error("[MQ] Rabbit publishWithTopic error.", e);
                    sendErrorFun.invoke(e, null);
                    try {
                        createChannel.close();
                        sendFinishFun.invoke(null);
                    } catch (IOException | TimeoutException e2) {
                        logger.error("[MQ] Rabbit publishWithTopic error.", e2);
                    }
                    connection.close();
                    return false;
                }
            }
            createChannel.queueDeclare(str3, true, false, false, (Map) null);
            createChannel.exchangeDeclare(str, BuiltinExchangeType.TOPIC, true);
            Map mQHeader = getMQHeader(str);
            mQHeader.getClass();
            optional.ifPresent(mQHeader::putAll);
            AMQP.BasicProperties basicProperties = new AMQP.BasicProperties("text/plain", (String) null, mQHeader, 2, 0, (String) null, (String) null, (String) null, (String) null, (Date) null, (String) null, (String) null, (String) null, (String) null);
            Object invoke = sendBeforeFun.invoke(str, str2, basicProperties);
            createChannel.basicPublish(str, str2, basicProperties, str4.getBytes());
            if (!z) {
                try {
                    createChannel.close();
                    sendFinishFun.invoke(invoke);
                } catch (IOException | TimeoutException e3) {
                    logger.error("[MQ] Rabbit publishWithTopic error.", e3);
                }
                connection.close();
                return true;
            }
            try {
                boolean waitForConfirms = createChannel.waitForConfirms();
                try {
                    createChannel.close();
                    sendFinishFun.invoke(invoke);
                } catch (IOException | TimeoutException e4) {
                    logger.error("[MQ] Rabbit publishWithTopic error.", e4);
                }
                connection.close();
                return waitForConfirms;
            } catch (InterruptedException e5) {
                Thread.currentThread().interrupt();
                logger.error("[MQ] Rabbit publishWithTopic error.", e5);
                sendErrorFun.invoke(e5, invoke);
                try {
                    createChannel.close();
                    sendFinishFun.invoke(invoke);
                } catch (IOException | TimeoutException e6) {
                    logger.error("[MQ] Rabbit publishWithTopic error.", e6);
                }
                connection.close();
                return false;
            }
        } catch (Throwable th) {
            try {
                createChannel.close();
                sendFinishFun.invoke(null);
            } catch (IOException | TimeoutException e7) {
                logger.error("[MQ] Rabbit publishWithTopic error.", e7);
            }
            connection.close();
            throw th;
        }
    }

    public void subscribeWithTopic(String str, String str2, String str3, Consumer<MessageWrap> consumer) {
        Channel createChannel = this.rabbitAdapter.getConnection().createChannel(false);
        try {
            createChannel.queueDeclare(str3, true, false, false, (Map) null);
            createChannel.exchangeDeclare(str, BuiltinExchangeType.TOPIC, true);
            createChannel.queueBind(str3, str, str2);
            createChannel.basicQos(1);
            createChannel.basicConsume(str3, false, getDefaultConsumer(createChannel, str, str, str2, str3, consumer));
        } catch (IOException e) {
            logger.error("[MQ] Rabbit subscribeWithTopic error.", e);
        }
    }

    protected void doResponse(String str, Consumer<MessageWrap> consumer) {
        Channel createChannel = this.rabbitAdapter.getConnection().createChannel(false);
        try {
            createChannel.queueDeclare(str, true, false, false, (Map) null);
            createChannel.basicQos(1);
            createChannel.basicConsume(str, false, getDefaultConsumer(createChannel, str, "", str, str, consumer));
        } catch (IOException e) {
            logger.error("[MQ] Rabbit response error.", e);
        }
    }

    private DefaultConsumer getDefaultConsumer(final Channel channel, final String str, final String str2, final String str3, final String str4, final Consumer<MessageWrap> consumer) {
        return new DefaultConsumer(channel) { // from class: ms.dew.core.cluster.spi.rabbit.RabbitClusterMQ.1
            public void handleDelivery(String str5, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                Object invoke = RabbitClusterMQ.receiveBeforeFun.invoke(str2, str3, str4, basicProperties);
                try {
                    try {
                        Map mQHeader = RabbitClusterMQ.this.setMQHeader(str, basicProperties.getHeaders());
                        consumer.accept(new MessageWrap(str, Optional.of(mQHeader), new String(bArr, StandardCharsets.UTF_8)));
                        channel.basicAck(envelope.getDeliveryTag(), false);
                        RabbitClusterMQ.receiveFinishFun.invoke(invoke);
                    } catch (RuntimeException e) {
                        RabbitClusterMQ.receiveErrorFun.invoke(e, invoke);
                        throw e;
                    }
                } catch (Throwable th) {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                    RabbitClusterMQ.receiveFinishFun.invoke(invoke);
                    throw th;
                }
            }
        };
    }

    public boolean supportHeader() {
        return true;
    }
}
