package com.gitee.sidihuo.sse;

import com.gitee.sidihuo.sse.base.SseEmitterUtf8;
import com.gitee.sidihuo.sse.base.SseThreadPool;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

/* loaded from: input_file:com/gitee/sidihuo/sse/SseContextHolder.class */
public class SseContextHolder {
    private static final Logger log = LoggerFactory.getLogger(SseContextHolder.class);
    private static SseContextHolder sseContextHolder = new SseContextHolder();
    private static Map<String, SseEmitter> sseMap = new ConcurrentHashMap();
    private ExecutorService executorService = new SseThreadPool(5, 100);
    private boolean autoClearSseMap;

    private SseContextHolder() {
        log.info("SseContextHolder construct ...");
    }

    public static SseContextHolder getInstance() {
        return sseContextHolder;
    }

    public void subscribe(SsePara ssePara) {
        if (ssePara == null || ssePara.getSseEmitter() == null || StringUtils.isBlank(ssePara.getId())) {
            throw new RuntimeException("subscribe SsePara is illegal");
        }
        SseEmitter sseEmitter = ssePara.getSseEmitter();
        String id = ssePara.getId();
        if (sseMap.containsKey(id)) {
            log.info("sse already subscribed {} need resubscribe", id);
            complete(id);
        }
        sseMap.put(id, sseEmitter);
        sseEmitter.onTimeout(() -> {
            complete(id);
            log.warn("超时回调通知 sse is timeout and removed {}", id);
        });
        sseEmitter.onError(th -> {
            complete(id);
            if ((th instanceof IOException) && "Broken pipe".equalsIgnoreCase(th.getMessage())) {
                log.warn("异常回调通知 sse is onError and removed , IOException maybe client disconnect {}", id);
            } else {
                log.warn("异常回调通知 sse is onError and removed {}", id, th);
            }
        });
        sseEmitter.onCompletion(() -> {
            if (sseMap.containsKey(id)) {
                sseMap.remove(id);
                log.warn("sse is already completed but in context map and removed {}", id);
            }
            log.info("结束回调通知 sse is already completed and confirm removed {}", id);
        });
        int size = sseMap.size();
        log.info("sse subscribe success {} current sse clients num = {}   jarCompileTime {}", new Object[]{id, Integer.valueOf(size), SseJarCompiler.compileTime});
        try {
            clearExpired(ssePara, size);
        } catch (Throwable th2) {
            log.warn("clearExpired Throwable {}", id, th2);
        }
    }

    public void pushMessage(String str, String str2) throws IOException {
        SseEmitter sseEmitter = sseMap.get(str);
        if (sseEmitter == null) {
            log.warn("sse not exist cannot push message {} {}", str, str2);
            return;
        }
        log.info("sse push message {} {}", str, str2);
        try {
            sseEmitter.send(str2);
        } catch (IOException e) {
            String message = e.getMessage();
            Throwable cause = e.getCause();
            if (!"java.io.IOException: Broken pipe".equalsIgnoreCase(message) || cause == null || !StringUtils.equalsIgnoreCase(cause.getMessage(), "Broken pipe")) {
                throw e;
            }
            throw new RuntimeException("IOException maybe client disconnect " + str);
        }
    }

    public void complete(String str) {
        SseEmitter sseEmitter = sseMap.get(str);
        if (sseEmitter == null) {
            log.warn("sse not exist no need complete {}", str);
            return;
        }
        log.info("sse complete {}", str);
        sseMap.remove(str);
        sseEmitter.complete();
    }

    public ExecutorService getExecutorService() {
        return this.executorService;
    }

    public SseEmitter getSseEmitter(String str) {
        return sseMap.get(str);
    }

    public void setExecutorService(ExecutorService executorService) {
        this.executorService = executorService;
    }

    public void setAutoClearSseMap(boolean z) {
        this.autoClearSseMap = z;
    }

    private void clearExpired(SsePara ssePara, int i) {
        if (this.autoClearSseMap && i >= 1000 && ssePara.hashCode() % 1000 <= 5) {
            long currentTimeMillis = System.currentTimeMillis();
            Iterator<Map.Entry<String, SseEmitter>> it = sseMap.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<String, SseEmitter> next = it.next();
                SseEmitter value = next.getValue();
                if (value instanceof SseEmitterUtf8) {
                    long createTime = currentTimeMillis - ((SseEmitterUtf8) value).getCreateTime();
                    Long timeout = value.getTimeout();
                    if (timeout.longValue() == 0) {
                        if (createTime > 36000000) {
                            log.warn("SseEmitterUtf8 [forever] already alive {} {}", Long.valueOf(createTime), next.getKey());
                        }
                    } else if (createTime > timeout.longValue() * 2) {
                        log.warn("SseEmitterUtf8 set live time {} but already alive {} and auto removed {}", new Object[]{Long.valueOf(createTime), Long.valueOf(createTime), next.getKey()});
                        it.remove();
                    }
                }
            }
        }
    }
}
