package org.minbox.framework.message.pipe.server.distribution;

import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import org.minbox.framework.message.pipe.core.Message;
import org.minbox.framework.message.pipe.core.exception.MessagePipeException;
import org.minbox.framework.message.pipe.core.grpc.MessageServiceGrpc;
import org.minbox.framework.message.pipe.core.grpc.proto.MessageRequest;
import org.minbox.framework.message.pipe.core.information.ClientInformation;
import org.minbox.framework.message.pipe.core.transport.MessageRequestBody;
import org.minbox.framework.message.pipe.core.transport.MessageResponseBody;
import org.minbox.framework.message.pipe.core.transport.MessageResponseStatus;
import org.minbox.framework.message.pipe.core.untis.JsonUtils;
import org.minbox.framework.message.pipe.server.MessagePipe;
import org.minbox.framework.message.pipe.server.config.MessagePipeConfiguration;
import org.minbox.framework.message.pipe.server.service.discovery.ServiceDiscovery;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;

/* loaded from: input_file:org/minbox/framework/message/pipe/server/distribution/MessageDistributionExecutor.class */
public class MessageDistributionExecutor {
    private static final Logger log = LoggerFactory.getLogger(MessageDistributionExecutor.class);
    private String pipeName;
    private MessagePipe messagePipe;
    private MessagePipeConfiguration configuration;
    private ServiceDiscovery serviceDiscovery;

    public MessageDistributionExecutor(MessagePipe messagePipe, ServiceDiscovery serviceDiscovery) {
        Assert.notNull(messagePipe, "The MessagePipe cannot be null.");
        Assert.notNull(serviceDiscovery, "The ServiceDiscovery cannot be null.");
        this.messagePipe = messagePipe;
        this.pipeName = messagePipe.getName();
        this.configuration = messagePipe.getConfiguration();
        this.serviceDiscovery = serviceDiscovery;
    }

    public void waitProcessing() {
        while (true) {
            try {
                synchronized (this) {
                    while (this.messagePipe.getLastMessageCount() > 0) {
                        try {
                            takeAndSend();
                        } catch (MessagePipeException e) {
                            log.error(e.getMessage(), e);
                        }
                    }
                    wait();
                }
            } catch (Exception e2) {
                log.error(e2.getMessage(), e2);
            }
        }
    }

    private void takeAndSend() {
        ClientInformation lookup = this.serviceDiscovery.lookup(this.pipeName);
        if (ObjectUtils.isEmpty(lookup)) {
            throw new MessagePipeException("Message Pipe: " + this.pipeName + ", no healthy clients were found.");
        }
        this.messagePipe.lockHandleTheFirst(message -> {
            return Boolean.valueOf(sendMessageToClient(message, lookup));
        });
    }

    private boolean sendMessageToClient(Message message, ClientInformation clientInformation) {
        boolean z = true;
        String clientId = clientInformation.getClientId();
        try {
            if (!MessageResponseStatus.SUCCESS.equals(((MessageResponseBody) JsonUtils.jsonToObject(MessageServiceGrpc.newBlockingStub(ClientChannelManager.establishChannel(clientInformation)).messageProcessing(MessageRequest.newBuilder().setBody(JsonUtils.objectToJson(new MessageRequestBody().setRequestId(this.configuration.getRequestIdGenerator().generate()).setClientId(clientId).setMessage(message).setPipeName(this.pipeName))).build()).getBody(), MessageResponseBody.class)).getStatus())) {
                z = false;
                log.error("To the client: {}, the message is sent abnormally, and the message is recovered.", clientId);
            }
        } catch (Exception e) {
            z = false;
            log.error(e.getMessage(), e);
        } catch (StatusRuntimeException e2) {
            z = false;
            Status.Code code = e2.getStatus().getCode();
            log.error("To the client: {}, exception when sending a message, Status Code: {}", clientId, code);
            if (Status.Code.UNAVAILABLE == code) {
                ClientChannelManager.removeChannel(clientId);
                log.error("The client is unavailable, and the cached channel is deleted.");
            }
        }
        if (z) {
            log.debug("To the client: {}, sending the message is complete.", clientId);
        }
        return z;
    }

    public String getPipeName() {
        return this.pipeName;
    }

    public MessagePipe getMessagePipe() {
        return this.messagePipe;
    }
}
