package io.gitee.jaychang.rocketmq.strategy;

import io.gitee.jaychang.rocketmq.core.ConsumeStatusEnum;
import io.gitee.jaychang.rocketmq.core.DedupConfig;
import io.gitee.jaychang.rocketmq.persist.DedupElement;
import io.gitee.jaychang.rocketmq.persist.IPersist;
import java.util.Map;
import java.util.function.BiFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/gitee/jaychang/rocketmq/strategy/DedupConsumeStrategy.class */
public class DedupConsumeStrategy implements ConsumeStrategy {
    private static final Logger log = LoggerFactory.getLogger(DedupConsumeStrategy.class);
    private final DedupConfig dedupConfig;
    private final BiFunction<Object, Map<String, Object>, String> dedupMessageKeyFunction;

    @Override // io.gitee.jaychang.rocketmq.strategy.ConsumeStrategy
    public <T> boolean invoke(BiFunction<T, Map<String, Object>, Boolean> biFunction, T t, Map<String, Object> map) {
        IPersist persist = this.dedupConfig.getPersist();
        DedupElement dedupElement = new DedupElement(this.dedupConfig.getApplicationName(), (String) map.get("TOPIC"), (String) map.getOrDefault("TAGS", ""), (String) map.get("CONSUMER_GROUP"), this.dedupMessageKeyFunction.apply(t, map));
        Boolean bool = true;
        if (dedupElement.getMsgUniqKey() != null) {
            bool = Boolean.valueOf(persist.setConsumingIfNX(dedupElement, this.dedupConfig.getDedupProcessingExpireMilliSeconds()));
        }
        if (bool != null && bool.booleanValue()) {
            return doHandleMsgAndUpdateStatus(biFunction, t, map, dedupElement);
        }
        Integer consumeStatus = persist.getConsumeStatus(dedupElement);
        ConsumeStatusEnum codeOf = ConsumeStatusEnum.codeOf(consumeStatus);
        String str = (String) map.get("MSG_ID");
        if (ConsumeStatusEnum.CONSUMING.equals(codeOf)) {
            log.warn("the same message is considered consuming, try consume later dedupKey : {}, {}, {}", new Object[]{persist.toPrintInfo(dedupElement), str, persist.getClass().getSimpleName()});
            return false;
        }
        if (ConsumeStatusEnum.CONSUMED.equals(codeOf)) {
            log.warn("message has been consumed before! dedupKey : {}, msgId : {} , so just ack. {}", new Object[]{persist.toPrintInfo(dedupElement), str, persist.getClass().getSimpleName()});
            return true;
        }
        log.warn("[NOTIFYME]unknown consume result {}, ignore dedup, continue consuming,  dedupKey : {}, {}, {} ", new Object[]{consumeStatus, persist.toPrintInfo(dedupElement), str, persist.getClass().getSimpleName()});
        return doHandleMsgAndUpdateStatus(biFunction, t, map, dedupElement);
    }

    private <T> boolean doHandleMsgAndUpdateStatus(BiFunction<T, Map<String, Object>, Boolean> biFunction, T t, Map<String, Object> map, DedupElement dedupElement) {
        String str = (String) map.get("MSG_ID");
        if (dedupElement.getMsgUniqKey() == null) {
            log.warn("dedup key is null , consume msg but not update status{}", str);
            return biFunction.apply(t, map).booleanValue();
        }
        IPersist persist = this.dedupConfig.getPersist();
        try {
            boolean booleanValue = biFunction.apply(t, map).booleanValue();
            try {
                if (booleanValue) {
                    log.debug("set consume res as CONSUME_STATUS_CONSUMED , {}", dedupElement);
                    persist.markConsumed(dedupElement, this.dedupConfig.getDedupRecordReserveMinutes());
                } else {
                    log.info("consume Res is false, try deleting dedup record {} , {}", dedupElement, persist);
                    persist.delete(dedupElement);
                }
            } catch (Exception e) {
                log.error("消费去重收尾工作异常 {}，忽略异常", str, e);
            }
            return booleanValue;
        } catch (Throwable th) {
            try {
                persist.delete(dedupElement);
            } catch (Exception e2) {
                log.error("error when delete dedup record {}", dedupElement, e2);
            }
            log.error(String.format("consume %s failed", str), th);
            throw th;
        }
    }

    public DedupConsumeStrategy(DedupConfig dedupConfig, BiFunction<Object, Map<String, Object>, String> biFunction) {
        this.dedupConfig = dedupConfig;
        this.dedupMessageKeyFunction = biFunction;
    }
}
