package org.apache.eventmesh.connector.lark.sink;

import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.RetryListener;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import com.lark.oapi.Client;
import com.lark.oapi.card.enums.MessageCardHeaderTemplateEnum;
import com.lark.oapi.card.model.MessageCard;
import com.lark.oapi.card.model.MessageCardConfig;
import com.lark.oapi.card.model.MessageCardElement;
import com.lark.oapi.card.model.MessageCardHeader;
import com.lark.oapi.card.model.MessageCardMarkdown;
import com.lark.oapi.card.model.MessageCardPlainText;
import com.lark.oapi.core.httpclient.OkHttpTransport;
import com.lark.oapi.core.request.RequestOptions;
import com.lark.oapi.core.utils.Lists;
import com.lark.oapi.okhttp.OkHttpClient;
import com.lark.oapi.service.im.v1.ImService;
import com.lark.oapi.service.im.v1.enums.MsgTypeEnum;
import com.lark.oapi.service.im.v1.model.CreateMessageReq;
import com.lark.oapi.service.im.v1.model.CreateMessageReqBody;
import com.lark.oapi.service.im.v1.model.CreateMessageResp;
import com.lark.oapi.service.im.v1.model.ext.MessageText;
import java.util.HashMap;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.text.StringEscapeUtils;
import org.apache.eventmesh.connector.lark.ConnectRecordExtensionKeys;
import org.apache.eventmesh.connector.lark.config.LarkMessageTemplateType;
import org.apache.eventmesh.connector.lark.sink.config.SinkConnectorConfig;
import org.apache.eventmesh.connector.lark.sink.connector.LarkSinkConnector;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/connector/lark/sink/ImServiceHandler.class */
public class ImServiceHandler {
    private SinkConnectorConfig sinkConnectorConfig;
    private ImService imService;
    private Retryer<ConnectRecord> retryer;
    private ExecutorService sinkAsyncWorker;
    private ExecutorService cleanerWorker;
    private ScheduledExecutorService retryWorker;
    private static final Logger log = LoggerFactory.getLogger(ImServiceHandler.class);
    private static final LongAdder redoSinkNum = new LongAdder();

    public static ImServiceHandler create(SinkConnectorConfig sinkConnectorConfig) {
        ImServiceHandler imServiceHandler = new ImServiceHandler();
        imServiceHandler.sinkConnectorConfig = sinkConnectorConfig;
        imServiceHandler.imService = Client.newBuilder(sinkConnectorConfig.getAppId(), sinkConnectorConfig.getAppSecret()).httpTransport(new OkHttpTransport(new OkHttpClient().newBuilder().callTimeout(3L, TimeUnit.SECONDS).build())).disableTokenCache().requestTimeout(3L, TimeUnit.SECONDS).build().im();
        long parseLong = Long.parseLong(sinkConnectorConfig.getRetryDelayInMills());
        int parseInt = Integer.parseInt(sinkConnectorConfig.getMaxRetryTimes()) + 1;
        if (Boolean.parseBoolean(sinkConnectorConfig.getSinkAsync())) {
            int availableProcessors = Runtime.getRuntime().availableProcessors();
            imServiceHandler.sinkAsyncWorker = Executors.newFixedThreadPool(availableProcessors, runnable -> {
                Thread thread = new Thread(runnable);
                thread.setDaemon(true);
                thread.setName("eventmesh-connector-lark-sinkAsyncWorker");
                return thread;
            });
            imServiceHandler.cleanerWorker = Executors.newFixedThreadPool(availableProcessors, runnable2 -> {
                Thread thread = new Thread(runnable2);
                thread.setDaemon(true);
                thread.setName("eventmesh-connector-lark-cleanerWorker");
                return thread;
            });
            imServiceHandler.retryWorker = Executors.newScheduledThreadPool(availableProcessors, runnable3 -> {
                Thread thread = new Thread(runnable3);
                thread.setDaemon(true);
                thread.setName("eventmesh-connector-lark-retryWorker");
                return thread;
            });
        } else {
            imServiceHandler.retryer = RetryerBuilder.newBuilder().retryIfException().retryIfResult((v0) -> {
                return Objects.nonNull(v0);
            }).withWaitStrategy(WaitStrategies.fixedWait(parseLong, TimeUnit.MILLISECONDS)).withStopStrategy(StopStrategies.stopAfterAttempt(parseInt)).withRetryListener(new RetryListener() { // from class: org.apache.eventmesh.connector.lark.sink.ImServiceHandler.1
                public <V> void onRetry(Attempt<V> attempt) {
                    if (attempt.getAttemptNumber() > 1) {
                        ImServiceHandler.redoSinkNum.increment();
                        ImServiceHandler.log.info("Total redo sink task num : [{}]", Long.valueOf(ImServiceHandler.redoSinkNum.sum()));
                        ImServiceHandler.log.warn("Retry sink event to lark | times=[{}]", Long.valueOf(attempt.getAttemptNumber() - 1));
                    }
                }
            }).build();
        }
        return imServiceHandler;
    }

    public void sink(ConnectRecord connectRecord) throws ExecutionException, RetryException {
        HashMap hashMap = new HashMap();
        hashMap.put("Content-Type", Lists.newArrayList(new String[]{"application/json; charset=utf-8"}));
        RequestOptions build = RequestOptions.newBuilder().tenantAccessToken(LarkSinkConnector.getTenantAccessToken(this.sinkConnectorConfig.getAppId(), this.sinkConnectorConfig.getAppSecret())).headers(hashMap).build();
        this.retryer.call(() -> {
            CreateMessageResp create = this.imService.message().create(convertCreateMessageReq(connectRecord), build);
            if (create.getCode() == 0) {
                return null;
            }
            log.warn("Sinking event to lark failure | code:[{}] | msg:[{}] | err:[{}]", new Object[]{Integer.valueOf(create.getCode()), create.getMsg(), create.getError()});
            return connectRecord;
        });
    }

    public void sinkAsync(ConnectRecord connectRecord) {
        HashMap hashMap = new HashMap();
        hashMap.put("Content-Type", Lists.newArrayList(new String[]{"application/json; charset=utf-8"}));
        RequestOptions build = RequestOptions.newBuilder().tenantAccessToken(LarkSinkConnector.getTenantAccessToken(this.sinkConnectorConfig.getAppId(), this.sinkConnectorConfig.getAppSecret())).headers(hashMap).build();
        CreateMessageReq convertCreateMessageReq = convertCreateMessageReq(connectRecord);
        long parseLong = Long.parseLong(this.sinkConnectorConfig.getRetryDelayInMills());
        int parseInt = Integer.parseInt(this.sinkConnectorConfig.getMaxRetryTimes()) + 1;
        LongAdder longAdder = new LongAdder();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        ScheduledFuture<?> scheduleAtFixedRate = this.retryWorker.scheduleAtFixedRate(() -> {
            CompletableFuture.supplyAsync(() -> {
                try {
                    longAdder.increment();
                    return this.imService.message().create(convertCreateMessageReq, build);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }, this.sinkAsyncWorker).whenCompleteAsync((createMessageResp, th) -> {
                if (longAdder.sum() > 1) {
                    redoSinkNum.increment();
                    log.info("Total redo sink task num : [{}]", Long.valueOf(redoSinkNum.sum()));
                    log.warn("Retry sink event to lark | times=[{}]", Long.valueOf(longAdder.sum() - 1));
                }
                if (Objects.nonNull(th)) {
                    log.error("eventmesh-connector-lark internal exception.", th);
                } else if (createMessageResp.getCode() != 0) {
                    log.warn("Sinking event to lark failure | code:[{}] | msg:[{}] | err:[{}]", new Object[]{Integer.valueOf(createMessageResp.getCode()), createMessageResp.getMsg(), createMessageResp.getError()});
                } else {
                    atomicBoolean.set(true);
                }
            });
        }, 0L, parseLong, TimeUnit.MILLISECONDS);
        this.cleanerWorker.submit(() -> {
            while (!atomicBoolean.get() && longAdder.sum() < parseInt) {
            }
            scheduleAtFixedRate.cancel(true);
        });
    }

    private CreateMessageReq convertCreateMessageReq(ConnectRecord connectRecord) {
        CreateMessageReqBody.Builder uuid = CreateMessageReqBody.newBuilder().receiveId(this.sinkConnectorConfig.getReceiveId()).uuid(UUID.randomUUID().toString());
        String extension = connectRecord.getExtension(ConnectRecordExtensionKeys.TEMPLATE_TYPE_4_LARK);
        if (null == extension || "null".equals(extension)) {
            extension = LarkMessageTemplateType.PLAIN_TEXT.getTemplateKey();
        }
        LarkMessageTemplateType of = LarkMessageTemplateType.of(extension);
        if (LarkMessageTemplateType.PLAIN_TEXT == of) {
            uuid.content(createTextContent(connectRecord)).msgType(MsgTypeEnum.MSG_TYPE_TEXT.getValue());
        } else if (LarkMessageTemplateType.MARKDOWN == of) {
            uuid.content(createInteractiveContent(connectRecord, (String) Optional.ofNullable(connectRecord.getExtension(ConnectRecordExtensionKeys.MARKDOWN_MESSAGE_TITLE_4_LARK)).orElse("EventMesh-Message"))).msgType(MsgTypeEnum.MSG_TYPE_INTERACTIVE.getValue());
        }
        return CreateMessageReq.newBuilder().receiveIdType(this.sinkConnectorConfig.getReceiveIdType()).createMessageReqBody(uuid.build()).build();
    }

    private String createTextContent(ConnectRecord connectRecord) {
        MessageText.Builder newBuilder = MessageText.newBuilder();
        if (needAtAll(connectRecord)) {
            newBuilder.atAll();
        }
        String needAtUser = needAtUser(connectRecord);
        if (!needAtUser.isEmpty()) {
            for (String str : needAtUser.split(";")) {
                String[] split = str.split(",");
                newBuilder.atUser(split[0], split[1]);
            }
        }
        return newBuilder.text(StringEscapeUtils.escapeJava(new String((byte[]) connectRecord.getData()))).build();
    }

    private String createInteractiveContent(ConnectRecord connectRecord, String str) {
        StringBuilder sb = new StringBuilder();
        if (needAtAll(connectRecord)) {
            atAll(sb);
        }
        String needAtUser = needAtUser(connectRecord);
        if (!needAtUser.isEmpty()) {
            for (String str2 : needAtUser.split(";")) {
                atUser(sb, str2.split(",")[0]);
            }
        }
        sb.append(new String((byte[]) connectRecord.getData()));
        return MessageCard.newBuilder().config(MessageCardConfig.newBuilder().enableForward(true).wideScreenMode(true).updateMulti(true).build()).header(MessageCardHeader.newBuilder().template(MessageCardHeaderTemplateEnum.BLUE).title(MessageCardPlainText.newBuilder().content(str).build()).build()).elements(new MessageCardElement[]{MessageCardMarkdown.newBuilder().content(sb.toString()).build()}).build().String();
    }

    private boolean needAtAll(ConnectRecord connectRecord) {
        String extension = connectRecord.getExtension(ConnectRecordExtensionKeys.AT_ALL_4_LARK);
        return (null == extension || "null".equals(extension) || !Boolean.parseBoolean(extension)) ? false : true;
    }

    private String needAtUser(ConnectRecord connectRecord) {
        String extension = connectRecord.getExtension(ConnectRecordExtensionKeys.AT_USERS_4_LARK);
        return (null == extension || "null".equals(extension)) ? "" : extension;
    }

    private void atAll(StringBuilder sb) {
        sb.append("<at id=all>").append("</at>");
    }

    private void atUser(StringBuilder sb, String str) {
        sb.append("<at id=").append(str).append(">").append("</at>");
    }
}
