package org.apache.eventmesh.client.grpc.consumer;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.collections4.MapUtils;
import org.apache.eventmesh.client.grpc.config.EventMeshGrpcClientConfig;
import org.apache.eventmesh.client.grpc.util.EventMeshCloudEventBuilder;
import org.apache.eventmesh.common.EventMeshThreadFactory;
import org.apache.eventmesh.common.enums.EventMeshDataContentType;
import org.apache.eventmesh.common.enums.EventMeshProtocolType;
import org.apache.eventmesh.common.protocol.HeartbeatItem;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.grpc.cloudevents.CloudEvent;
import org.apache.eventmesh.common.protocol.grpc.cloudevents.ConsumerServiceGrpc;
import org.apache.eventmesh.common.protocol.grpc.cloudevents.HeartbeatServiceGrpc;
import org.apache.eventmesh.common.protocol.grpc.common.ClientType;
import org.apache.eventmesh.common.protocol.grpc.common.EventMeshCloudEventUtils;
import org.apache.eventmesh.common.protocol.grpc.common.GrpcType;
import org.apache.eventmesh.common.protocol.grpc.common.Response;
import org.apache.eventmesh.common.protocol.grpc.common.StatusCode;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.class */
public class EventMeshGrpcConsumer implements AutoCloseable {

    @Generated
    private static final Logger log;
    private static final String SDK_STREAM_URL = "grpc_stream";
    private ManagedChannel channel;
    private final EventMeshGrpcClientConfig clientConfig;
    private final Map<String, SubscriptionInfo> subscriptionMap = new ConcurrentHashMap();
    private final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), (ThreadFactory) new EventMeshThreadFactory("GRPCClientScheduler", true));
    private ConsumerServiceGrpc.ConsumerServiceBlockingStub consumerClient;
    private ConsumerServiceGrpc.ConsumerServiceStub consumerAsyncClient;
    private HeartbeatServiceGrpc.HeartbeatServiceBlockingStub heartbeatClient;
    private ReceiveMsgHook<?> listener;
    private SubStreamHandler<?> subStreamHandler;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer$SubscriptionInfo.class */
    public static class SubscriptionInfo {
        private transient SubscriptionItem subscriptionItem;
        private transient String url;
        private GrpcType grpcType;

        SubscriptionInfo(SubscriptionItem subscriptionItem, String str, GrpcType grpcType) {
            this.subscriptionItem = subscriptionItem;
            this.url = str;
            this.grpcType = grpcType;
        }

        @Generated
        public SubscriptionItem getSubscriptionItem() {
            return this.subscriptionItem;
        }

        @Generated
        public String getUrl() {
            return this.url;
        }

        @Generated
        public GrpcType getGrpcType() {
            return this.grpcType;
        }

        @Generated
        public void setSubscriptionItem(SubscriptionItem subscriptionItem) {
            this.subscriptionItem = subscriptionItem;
        }

        @Generated
        public void setUrl(String str) {
            this.url = str;
        }

        @Generated
        public void setGrpcType(GrpcType grpcType) {
            this.grpcType = grpcType;
        }

        @Generated
        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof SubscriptionInfo)) {
                return false;
            }
            SubscriptionInfo subscriptionInfo = (SubscriptionInfo) obj;
            if (!subscriptionInfo.canEqual(this)) {
                return false;
            }
            GrpcType grpcType = getGrpcType();
            GrpcType grpcType2 = subscriptionInfo.getGrpcType();
            return grpcType == null ? grpcType2 == null : grpcType.equals(grpcType2);
        }

        @Generated
        protected boolean canEqual(Object obj) {
            return obj instanceof SubscriptionInfo;
        }

        @Generated
        public int hashCode() {
            GrpcType grpcType = getGrpcType();
            return (1 * 59) + (grpcType == null ? 43 : grpcType.hashCode());
        }

        @Generated
        public String toString() {
            return "EventMeshGrpcConsumer.SubscriptionInfo(subscriptionItem=" + getSubscriptionItem() + ", url=" + getUrl() + ", grpcType=" + getGrpcType() + ")";
        }
    }

    public EventMeshGrpcConsumer(EventMeshGrpcClientConfig eventMeshGrpcClientConfig) {
        this.clientConfig = eventMeshGrpcClientConfig;
    }

    public void init() {
        this.channel = ManagedChannelBuilder.forAddress(this.clientConfig.getServerAddr(), this.clientConfig.getServerPort()).usePlaintext().build();
        this.consumerClient = ConsumerServiceGrpc.newBlockingStub(this.channel);
        this.consumerAsyncClient = ConsumerServiceGrpc.newStub(this.channel);
        this.heartbeatClient = HeartbeatServiceGrpc.newBlockingStub(this.channel);
        heartBeat();
    }

    public Response subscribe(List<SubscriptionItem> list, String str) {
        log.info("Create subscription: {} , url: {}", list, str);
        addSubscription(list, str, GrpcType.WEBHOOK);
        return subscribeWebhook(list, str);
    }

    public void subscribe(List<SubscriptionItem> list) {
        log.info("Create streaming subscription: {}", list);
        if (this.listener == null) {
            log.error("Error in subscriber, no Event Listener is registered.");
            return;
        }
        addSubscription(list, SDK_STREAM_URL, GrpcType.STREAM);
        CloudEvent buildEventSubscription = EventMeshCloudEventBuilder.buildEventSubscription(this.clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, null, list);
        synchronized (this) {
            if (this.subStreamHandler == null) {
                this.subStreamHandler = new SubStreamHandler<>(this.consumerAsyncClient, this.clientConfig, this.listener);
                this.subStreamHandler.start();
            }
        }
        this.subStreamHandler.sendSubscription(buildEventSubscription);
    }

    private Response subscribeWebhook(List<SubscriptionItem> list, String str) {
        try {
            CloudEvent subscribe = this.consumerClient.subscribe(EventMeshCloudEventBuilder.buildEventSubscription(this.clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, str, list));
            log.info("Received response:{}", subscribe);
            return Response.builder().respCode(EventMeshCloudEventUtils.getResponseCode(subscribe)).respMsg(EventMeshCloudEventUtils.getResponseMessage(subscribe)).respTime(EventMeshCloudEventUtils.getResponseTime(subscribe)).build();
        } catch (Exception e) {
            log.error("Error in subscribe.", e);
            return null;
        }
    }

    private void addSubscription(List<SubscriptionItem> list, String str, GrpcType grpcType) {
        for (SubscriptionItem subscriptionItem : list) {
            this.subscriptionMap.putIfAbsent(subscriptionItem.getTopic(), new SubscriptionInfo(subscriptionItem, str, grpcType));
        }
    }

    private void removeSubscription(List<SubscriptionItem> list) {
        Objects.requireNonNull(list, "subscriptionItems can not be null");
        list.forEach(subscriptionItem -> {
            this.subscriptionMap.remove(subscriptionItem.getTopic());
        });
    }

    public Response unsubscribe(List<SubscriptionItem> list, String str) {
        log.info("Removing subscription: {}, url:{}", list, str);
        removeSubscription(list);
        try {
            CloudEvent unsubscribe = this.consumerClient.unsubscribe(EventMeshCloudEventBuilder.buildEventSubscription(this.clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, str, list));
            log.info("Received response:{}", unsubscribe);
            return Response.builder().respCode(EventMeshCloudEventUtils.getResponseCode(unsubscribe)).respMsg(EventMeshCloudEventUtils.getResponseMessage(unsubscribe)).respTime(EventMeshCloudEventUtils.getResponseTime(unsubscribe)).build();
        } catch (Exception e) {
            log.error("Error in unsubscribe.", e);
            return null;
        }
    }

    public Response unsubscribe(List<SubscriptionItem> list) {
        Objects.requireNonNull(list, "subscriptionItems can not be null");
        log.info("Removing subscription stream: {}", list);
        removeSubscription(list);
        try {
            CloudEvent unsubscribe = this.consumerClient.unsubscribe(EventMeshCloudEventBuilder.buildEventSubscription(this.clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, null, list));
            Response build = Response.builder().respCode(EventMeshCloudEventUtils.getResponseCode(unsubscribe)).respMsg(EventMeshCloudEventUtils.getResponseMessage(unsubscribe)).respTime(EventMeshCloudEventUtils.getResponseTime(unsubscribe)).build();
            log.info("Received response:{}", build);
            synchronized (this) {
                if (MapUtils.isEmpty(this.subscriptionMap) && this.subStreamHandler != null) {
                    this.subStreamHandler.close();
                }
            }
            return build;
        } catch (Exception e) {
            log.error("Error in unsubscribe.", e);
            return null;
        }
    }

    public synchronized void registerListener(ReceiveMsgHook<?> receiveMsgHook) {
        if (this.listener == null) {
            this.listener = receiveMsgHook;
        }
    }

    private void heartBeat() {
        Map<String, CloudEvent.CloudEventAttributeValue> buildCommonCloudEventAttributes = EventMeshCloudEventBuilder.buildCommonCloudEventAttributes(this.clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE);
        this.scheduler.scheduleAtFixedRate(() -> {
            if (MapUtils.isEmpty(this.subscriptionMap)) {
                return;
            }
            HashMap hashMap = new HashMap(buildCommonCloudEventAttributes);
            hashMap.put("consumergroup", CloudEvent.CloudEventAttributeValue.newBuilder().setCeString(this.clientConfig.getConsumerGroup()).build());
            hashMap.put("clienttype", CloudEvent.CloudEventAttributeValue.newBuilder().setCeInteger(ClientType.SUB.getType()).build());
            hashMap.put("datacontenttype", CloudEvent.CloudEventAttributeValue.newBuilder().setCeString(EventMeshDataContentType.JSON.getCode()).build());
            try {
                CloudEvent heartbeat = this.heartbeatClient.heartbeat(CloudEvent.newBuilder().putAllAttributes(hashMap).setTextData(JsonUtils.toJSONString((List) this.subscriptionMap.entrySet().stream().map(entry -> {
                    return HeartbeatItem.builder().topic((String) entry.getKey()).url(((SubscriptionInfo) entry.getValue()).getUrl()).build();
                }).collect(Collectors.toList()))).build());
                if (!$assertionsDisabled && heartbeat == null) {
                    throw new AssertionError();
                }
                Response build = Response.builder().respCode(EventMeshCloudEventUtils.getResponseCode(heartbeat)).respMsg(EventMeshCloudEventUtils.getResponseMessage(heartbeat)).respTime(EventMeshCloudEventUtils.getResponseTime(heartbeat)).build();
                log.debug("Grpc Consumer Heartbeat cloudEvent: {}", build);
                if (StatusCode.CLIENT_RESUBSCRIBE.getRetCode().equals(build.getRespCode())) {
                    resubscribe();
                }
            } catch (Exception e) {
                log.error("Error in sending out heartbeat.", e);
            }
        }, 10000L, 30000L, TimeUnit.MILLISECONDS);
        log.info("Grpc Consumer Heartbeat started.");
    }

    private void resubscribe() {
        if (this.subscriptionMap.isEmpty()) {
            return;
        }
        Collection<SubscriptionInfo> values = this.subscriptionMap.values();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Iterator<SubscriptionInfo> it = values.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            } else if (it.next().grpcType == GrpcType.STREAM) {
                atomicBoolean.compareAndSet(false, true);
                break;
            }
        }
        ((Map) values.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getUrl();
        }, Collectors.mapping((v0) -> {
            return v0.getSubscriptionItem();
        }, Collectors.toList())))).forEach((str, list) -> {
            if (!atomicBoolean.get()) {
                subscribeWebhook(list, str);
            } else {
                this.subStreamHandler.sendSubscription(EventMeshCloudEventBuilder.buildEventSubscription(this.clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, str, list));
            }
        });
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.subStreamHandler != null) {
            this.subStreamHandler.close();
        }
        if (this.channel != null) {
            this.channel.shutdown();
        }
        if (this.scheduler != null) {
            this.scheduler.shutdown();
        }
    }

    @Generated
    public ManagedChannel getChannel() {
        return this.channel;
    }

    @Generated
    public EventMeshGrpcClientConfig getClientConfig() {
        return this.clientConfig;
    }

    @Generated
    public Map<String, SubscriptionInfo> getSubscriptionMap() {
        return this.subscriptionMap;
    }

    @Generated
    public ScheduledThreadPoolExecutor getScheduler() {
        return this.scheduler;
    }

    @Generated
    public ConsumerServiceGrpc.ConsumerServiceBlockingStub getConsumerClient() {
        return this.consumerClient;
    }

    @Generated
    public ConsumerServiceGrpc.ConsumerServiceStub getConsumerAsyncClient() {
        return this.consumerAsyncClient;
    }

    @Generated
    public HeartbeatServiceGrpc.HeartbeatServiceBlockingStub getHeartbeatClient() {
        return this.heartbeatClient;
    }

    @Generated
    public ReceiveMsgHook<?> getListener() {
        return this.listener;
    }

    @Generated
    public SubStreamHandler<?> getSubStreamHandler() {
        return this.subStreamHandler;
    }

    @Generated
    public void setChannel(ManagedChannel managedChannel) {
        this.channel = managedChannel;
    }

    @Generated
    public void setConsumerClient(ConsumerServiceGrpc.ConsumerServiceBlockingStub consumerServiceBlockingStub) {
        this.consumerClient = consumerServiceBlockingStub;
    }

    @Generated
    public void setConsumerAsyncClient(ConsumerServiceGrpc.ConsumerServiceStub consumerServiceStub) {
        this.consumerAsyncClient = consumerServiceStub;
    }

    @Generated
    public void setHeartbeatClient(HeartbeatServiceGrpc.HeartbeatServiceBlockingStub heartbeatServiceBlockingStub) {
        this.heartbeatClient = heartbeatServiceBlockingStub;
    }

    @Generated
    public void setListener(ReceiveMsgHook<?> receiveMsgHook) {
        this.listener = receiveMsgHook;
    }

    @Generated
    public void setSubStreamHandler(SubStreamHandler<?> subStreamHandler) {
        this.subStreamHandler = subStreamHandler;
    }

    @Generated
    public boolean equals(Object obj) {
        if (obj == this) {
            return true;
        }
        if (!(obj instanceof EventMeshGrpcConsumer)) {
            return false;
        }
        EventMeshGrpcConsumer eventMeshGrpcConsumer = (EventMeshGrpcConsumer) obj;
        if (!eventMeshGrpcConsumer.canEqual(this)) {
            return false;
        }
        ManagedChannel channel = getChannel();
        ManagedChannel channel2 = eventMeshGrpcConsumer.getChannel();
        if (channel == null) {
            if (channel2 != null) {
                return false;
            }
        } else if (!channel.equals(channel2)) {
            return false;
        }
        EventMeshGrpcClientConfig clientConfig = getClientConfig();
        EventMeshGrpcClientConfig clientConfig2 = eventMeshGrpcConsumer.getClientConfig();
        if (clientConfig == null) {
            if (clientConfig2 != null) {
                return false;
            }
        } else if (!clientConfig.equals(clientConfig2)) {
            return false;
        }
        Map<String, SubscriptionInfo> subscriptionMap = getSubscriptionMap();
        Map<String, SubscriptionInfo> subscriptionMap2 = eventMeshGrpcConsumer.getSubscriptionMap();
        if (subscriptionMap == null) {
            if (subscriptionMap2 != null) {
                return false;
            }
        } else if (!subscriptionMap.equals(subscriptionMap2)) {
            return false;
        }
        ScheduledThreadPoolExecutor scheduler = getScheduler();
        ScheduledThreadPoolExecutor scheduler2 = eventMeshGrpcConsumer.getScheduler();
        if (scheduler == null) {
            if (scheduler2 != null) {
                return false;
            }
        } else if (!scheduler.equals(scheduler2)) {
            return false;
        }
        ConsumerServiceGrpc.ConsumerServiceBlockingStub consumerClient = getConsumerClient();
        ConsumerServiceGrpc.ConsumerServiceBlockingStub consumerClient2 = eventMeshGrpcConsumer.getConsumerClient();
        if (consumerClient == null) {
            if (consumerClient2 != null) {
                return false;
            }
        } else if (!consumerClient.equals(consumerClient2)) {
            return false;
        }
        ConsumerServiceGrpc.ConsumerServiceStub consumerAsyncClient = getConsumerAsyncClient();
        ConsumerServiceGrpc.ConsumerServiceStub consumerAsyncClient2 = eventMeshGrpcConsumer.getConsumerAsyncClient();
        if (consumerAsyncClient == null) {
            if (consumerAsyncClient2 != null) {
                return false;
            }
        } else if (!consumerAsyncClient.equals(consumerAsyncClient2)) {
            return false;
        }
        HeartbeatServiceGrpc.HeartbeatServiceBlockingStub heartbeatClient = getHeartbeatClient();
        HeartbeatServiceGrpc.HeartbeatServiceBlockingStub heartbeatClient2 = eventMeshGrpcConsumer.getHeartbeatClient();
        if (heartbeatClient == null) {
            if (heartbeatClient2 != null) {
                return false;
            }
        } else if (!heartbeatClient.equals(heartbeatClient2)) {
            return false;
        }
        ReceiveMsgHook<?> listener = getListener();
        ReceiveMsgHook<?> listener2 = eventMeshGrpcConsumer.getListener();
        if (listener == null) {
            if (listener2 != null) {
                return false;
            }
        } else if (!listener.equals(listener2)) {
            return false;
        }
        SubStreamHandler<?> subStreamHandler = getSubStreamHandler();
        SubStreamHandler<?> subStreamHandler2 = eventMeshGrpcConsumer.getSubStreamHandler();
        return subStreamHandler == null ? subStreamHandler2 == null : subStreamHandler.equals(subStreamHandler2);
    }

    @Generated
    protected boolean canEqual(Object obj) {
        return obj instanceof EventMeshGrpcConsumer;
    }

    @Generated
    public int hashCode() {
        ManagedChannel channel = getChannel();
        int hashCode = (1 * 59) + (channel == null ? 43 : channel.hashCode());
        EventMeshGrpcClientConfig clientConfig = getClientConfig();
        int hashCode2 = (hashCode * 59) + (clientConfig == null ? 43 : clientConfig.hashCode());
        Map<String, SubscriptionInfo> subscriptionMap = getSubscriptionMap();
        int hashCode3 = (hashCode2 * 59) + (subscriptionMap == null ? 43 : subscriptionMap.hashCode());
        ScheduledThreadPoolExecutor scheduler = getScheduler();
        int hashCode4 = (hashCode3 * 59) + (scheduler == null ? 43 : scheduler.hashCode());
        ConsumerServiceGrpc.ConsumerServiceBlockingStub consumerClient = getConsumerClient();
        int hashCode5 = (hashCode4 * 59) + (consumerClient == null ? 43 : consumerClient.hashCode());
        ConsumerServiceGrpc.ConsumerServiceStub consumerAsyncClient = getConsumerAsyncClient();
        int hashCode6 = (hashCode5 * 59) + (consumerAsyncClient == null ? 43 : consumerAsyncClient.hashCode());
        HeartbeatServiceGrpc.HeartbeatServiceBlockingStub heartbeatClient = getHeartbeatClient();
        int hashCode7 = (hashCode6 * 59) + (heartbeatClient == null ? 43 : heartbeatClient.hashCode());
        ReceiveMsgHook<?> listener = getListener();
        int hashCode8 = (hashCode7 * 59) + (listener == null ? 43 : listener.hashCode());
        SubStreamHandler<?> subStreamHandler = getSubStreamHandler();
        return (hashCode8 * 59) + (subStreamHandler == null ? 43 : subStreamHandler.hashCode());
    }

    @Generated
    public String toString() {
        return "EventMeshGrpcConsumer(channel=" + getChannel() + ", clientConfig=" + getClientConfig() + ", subscriptionMap=" + getSubscriptionMap() + ", scheduler=" + getScheduler() + ", consumerClient=" + getConsumerClient() + ", consumerAsyncClient=" + getConsumerAsyncClient() + ", heartbeatClient=" + getHeartbeatClient() + ", listener=" + getListener() + ", subStreamHandler=" + getSubStreamHandler() + ")";
    }

    static {
        $assertionsDisabled = !EventMeshGrpcConsumer.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(EventMeshGrpcConsumer.class);
    }
}
