package org.apache.eventmesh.client.http.consumer;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.handler.codec.http.HttpMethod;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.eventmesh.client.http.AbstractHttpClient;
import org.apache.eventmesh.client.http.EventMeshRetObj;
import org.apache.eventmesh.client.http.conf.EventMeshHttpClientConfig;
import org.apache.eventmesh.client.http.model.RequestParam;
import org.apache.eventmesh.client.http.util.HttpUtils;
import org.apache.eventmesh.common.ThreadPoolFactory;
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.http.body.client.HeartbeatRequestBody;
import org.apache.eventmesh.common.protocol.http.common.ClientType;
import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.ProtocolVersion;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/client/http/consumer/EventMeshHttpConsumer.class */
public class EventMeshHttpConsumer extends AbstractHttpClient implements AutoCloseable {
    private final ThreadPoolExecutor consumeExecutor;
    private final ScheduledThreadPoolExecutor scheduler;
    private static final Logger log = LoggerFactory.getLogger(EventMeshHttpConsumer.class);
    private static final List<SubscriptionItem> SUBSCRIPTIONS = Collections.synchronizedList(new ArrayList());

    public EventMeshHttpConsumer(EventMeshHttpClientConfig eventMeshHttpClientConfig) throws EventMeshException {
        this(eventMeshHttpClientConfig, null);
    }

    public EventMeshHttpConsumer(EventMeshHttpClientConfig eventMeshHttpClientConfig, ThreadPoolExecutor threadPoolExecutor) throws EventMeshException {
        super(eventMeshHttpClientConfig);
        this.consumeExecutor = (ThreadPoolExecutor) Optional.ofNullable(threadPoolExecutor).orElseGet(() -> {
            return ThreadPoolFactory.createThreadPoolExecutor(eventMeshHttpClientConfig.getConsumeThreadCore(), eventMeshHttpClientConfig.getConsumeThreadMax(), "EventMesh-client-consume-");
        });
        this.scheduler = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), new ThreadFactoryBuilder().setNameFormat("HTTPClientScheduler").setDaemon(true).build());
    }

    public void subscribe(List<SubscriptionItem> list, String str) throws EventMeshException {
        Preconditions.checkNotNull(list, "Subscribe item cannot be null");
        Preconditions.checkNotNull(str, "SubscribeUrl cannot be null");
        RequestParam addBody = buildCommonRequestParam().addHeader("code", RequestCode.SUBSCRIBE.getRequestCode()).addBody("topic", JsonUtils.serialize(list)).addBody("consumerGroup", this.eventMeshHttpClientConfig.getConsumerGroup()).addBody("url", str);
        String selectEventMesh = selectEventMesh();
        try {
            EventMeshRetObj eventMeshRetObj = (EventMeshRetObj) JsonUtils.deserialize(HttpUtils.post(this.httpClient, selectEventMesh, addBody), EventMeshRetObj.class);
            if (eventMeshRetObj.getRetCode() != EventMeshRetCode.SUCCESS.getRetCode().intValue()) {
                throw new EventMeshException(Integer.valueOf(eventMeshRetObj.getRetCode()), eventMeshRetObj.getRetMsg());
            }
            SUBSCRIPTIONS.addAll(list);
        } catch (Exception e) {
            throw new EventMeshException(String.format("Subscribe topic error, target:%s", selectEventMesh), e);
        }
    }

    public void heartBeat(List<SubscriptionItem> list, String str) {
        Preconditions.checkNotNull(list, "Subscribe item cannot be null");
        Preconditions.checkNotNull(str, "SubscribeUrl cannot be null");
        this.scheduler.scheduleAtFixedRate(() -> {
            try {
                RequestParam addBody = buildCommonRequestParam().addHeader("code", RequestCode.HEARTBEAT.getRequestCode()).addBody("clientType", ClientType.SUB.name()).addBody("heartbeatEntities", JsonUtils.serialize((List) list.stream().map(subscriptionItem -> {
                    HeartbeatRequestBody.HeartbeatEntity heartbeatEntity = new HeartbeatRequestBody.HeartbeatEntity();
                    heartbeatEntity.topic = subscriptionItem.getTopic();
                    heartbeatEntity.url = str;
                    return heartbeatEntity;
                }).collect(Collectors.toList())));
                EventMeshRetObj eventMeshRetObj = (EventMeshRetObj) JsonUtils.deserialize(HttpUtils.post(this.httpClient, selectEventMesh(), addBody), EventMeshRetObj.class);
                if (eventMeshRetObj.getRetCode() != EventMeshRetCode.SUCCESS.getRetCode().intValue()) {
                    throw new EventMeshException(Integer.valueOf(eventMeshRetObj.getRetCode()), eventMeshRetObj.getRetMsg());
                }
            } catch (Exception e) {
                log.error("send heartBeat error", e);
            }
        }, 30000L, 30000L, TimeUnit.MILLISECONDS);
    }

    public void unsubscribe(List<String> list, String str) throws EventMeshException {
        Preconditions.checkNotNull(list, "Topics cannot be null");
        Preconditions.checkNotNull(str, "unSubscribeUrl cannot be null");
        RequestParam addBody = buildCommonRequestParam().addHeader("code", RequestCode.UNSUBSCRIBE.getRequestCode()).addBody("topic", JsonUtils.serialize(list)).addBody("url", str);
        String selectEventMesh = selectEventMesh();
        try {
            EventMeshRetObj eventMeshRetObj = (EventMeshRetObj) JsonUtils.deserialize(HttpUtils.post(this.httpClient, selectEventMesh, addBody), EventMeshRetObj.class);
            if (eventMeshRetObj.getRetCode() != EventMeshRetCode.SUCCESS.getRetCode().intValue()) {
                throw new EventMeshException(Integer.valueOf(eventMeshRetObj.getRetCode()), eventMeshRetObj.getRetMsg());
            }
            SUBSCRIPTIONS.removeIf(subscriptionItem -> {
                return list.contains(subscriptionItem.getTopic());
            });
        } catch (Exception e) {
            throw new EventMeshException(String.format("Unsubscribe topic error, target:%s", selectEventMesh), e);
        }
    }

    @Override // org.apache.eventmesh.client.http.AbstractHttpClient, java.lang.AutoCloseable
    public void close() throws EventMeshException {
        log.info("LiteConsumer shutting down");
        super.close();
        if (this.consumeExecutor != null) {
            this.consumeExecutor.shutdown();
        }
        this.scheduler.shutdown();
        log.info("LiteConsumer shutdown");
    }

    private RequestParam buildCommonRequestParam() {
        return new RequestParam(HttpMethod.POST).addHeader("env", this.eventMeshHttpClientConfig.getEnv()).addHeader("idc", this.eventMeshHttpClientConfig.getIdc()).addHeader("ip", this.eventMeshHttpClientConfig.getIp()).addHeader("pid", this.eventMeshHttpClientConfig.getPid()).addHeader("sys", this.eventMeshHttpClientConfig.getSys()).addHeader("username", this.eventMeshHttpClientConfig.getUserName()).addHeader("passwd", this.eventMeshHttpClientConfig.getPassword()).addHeader("version", ProtocolVersion.V1.getVersion()).addHeader("language", "JAVA").setTimeout(15000L).addBody("consumerGroup", this.eventMeshHttpClientConfig.getConsumerGroup());
    }
}
