package org.aoju.bus.cache.provider;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.PreDestroy;
import org.aoju.bus.cache.Hitting;
import org.aoju.bus.cache.magic.CachePair;
import org.aoju.bus.core.lang.Symbol;
import org.aoju.bus.logger.Logger;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.KeeperException;

/* loaded from: input_file:org/aoju/bus/cache/provider/ZookeeperHitting.class */
public class ZookeeperHitting implements Hitting {
    private static final ExecutorService executor = Executors.newSingleThreadExecutor(runnable -> {
        Thread thread = new Thread(runnable);
        thread.setName("cache:zk-uploader");
        thread.setDaemon(true);
        return thread;
    });
    private static final String NAME_SPACE = "cache";
    private volatile boolean isShutdown;
    private BlockingQueue<CachePair<String, Integer>> hitQueue;
    private BlockingQueue<CachePair<String, Integer>> requireQueue;
    private Map<String, DistributedAtomicLong> hitCounterMap;
    private Map<String, DistributedAtomicLong> requireCounterMap;
    private CuratorFramework client;
    private String hitPathPrefix;
    private String requirePathPrefix;

    public ZookeeperHitting(String str) {
        this(str, System.getProperty("product.name", "unnamed"));
    }

    public ZookeeperHitting(String str, String str2) {
        this.isShutdown = false;
        this.hitQueue = new LinkedTransferQueue();
        this.requireQueue = new LinkedTransferQueue();
        this.hitCounterMap = new HashMap();
        this.requireCounterMap = new HashMap();
        this.client = CuratorFrameworkFactory.builder().connectString(str).retryPolicy(new RetryNTimes(3, 0)).namespace(NAME_SPACE).build();
        this.client.start();
        String processProductName = processProductName(str2);
        this.hitPathPrefix = String.format("%s%s", processProductName, "hit");
        this.requirePathPrefix = String.format("%s%s", processProductName, "require");
        try {
            this.client.create().creatingParentsIfNeeded().forPath(this.hitPathPrefix);
            this.client.create().creatingParentsIfNeeded().forPath(this.requirePathPrefix);
        } catch (KeeperException.NodeExistsException e) {
        } catch (Exception e2) {
            throw new RuntimeException("create path: " + this.hitPathPrefix + ", " + this.requirePathPrefix + " on namespace: cache error", e2);
        }
        executor.submit(() -> {
            while (!this.isShutdown) {
                dumpToZK(this.hitQueue, this.hitCounterMap, this.hitPathPrefix);
                dumpToZK(this.requireQueue, this.requireCounterMap, this.requirePathPrefix);
            }
        });
    }

    @Override // org.aoju.bus.cache.Hitting
    public void hitIncr(String str, int i) {
        if (i != 0) {
            this.hitQueue.add(CachePair.of(str, Integer.valueOf(i)));
        }
    }

    @Override // org.aoju.bus.cache.Hitting
    public void reqIncr(String str, int i) {
        if (i != 0) {
            this.requireQueue.add(CachePair.of(str, Integer.valueOf(i)));
        }
    }

    @Override // org.aoju.bus.cache.Hitting
    public Map<String, Hitting.HittingDO> getHitting() {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        AtomicLong atomicLong = new AtomicLong(0L);
        AtomicLong atomicLong2 = new AtomicLong(0L);
        this.requireCounterMap.forEach((str, distributedAtomicLong) -> {
            try {
                long value = getValue(distributedAtomicLong.get());
                long value2 = getValue(this.hitCounterMap.get(str));
                atomicLong2.addAndGet(value);
                atomicLong.addAndGet(value2);
                linkedHashMap.put(str, Hitting.HittingDO.newInstance(value2, value));
            } catch (Exception e) {
                Logger.error(e, "acquire hit count error: ", e.getMessage());
            }
        });
        linkedHashMap.put(summaryName(), Hitting.HittingDO.newInstance(atomicLong.get(), atomicLong2.get()));
        return linkedHashMap;
    }

    @Override // org.aoju.bus.cache.Hitting
    public void reset(String str) {
        this.hitCounterMap.computeIfPresent(str, this::doReset);
        this.requireCounterMap.computeIfPresent(str, this::doReset);
    }

    @Override // org.aoju.bus.cache.Hitting
    public void resetAll() {
        this.hitCounterMap.forEach(this::doReset);
        this.requireCounterMap.forEach(this::doReset);
    }

    @PreDestroy
    public void tearDown() {
        while (true) {
            if (this.hitQueue.size() <= 0 && this.requireQueue.size() <= 0) {
                this.isShutdown = true;
                return;
            }
            try {
                TimeUnit.SECONDS.sleep(1L);
            } catch (InterruptedException e) {
            }
        }
    }

    private String processProductName(String str) {
        if (!str.startsWith(Symbol.SLASH)) {
            str = "/" + str;
        }
        if (!str.endsWith(Symbol.SLASH)) {
            str = str + "/";
        }
        return str;
    }

    private DistributedAtomicLong doReset(String str, DistributedAtomicLong distributedAtomicLong) {
        try {
            distributedAtomicLong.forceSet(0L);
            return null;
        } catch (Exception e) {
            Logger.error(e, "reset distribute counter error: ", e.getMessage());
            return null;
        }
    }

    private void dumpToZK(BlockingQueue<CachePair<String, Integer>> blockingQueue, Map<String, DistributedAtomicLong> map, String str) {
        long j = 0;
        HashMap hashMap = new HashMap();
        while (true) {
            CachePair<String, Integer> poll = blockingQueue.poll();
            if (null == poll || j > 100) {
                break;
            }
            ((AtomicLong) hashMap.computeIfAbsent(poll.getLeft(), str2 -> {
                return new AtomicLong(0L);
            })).addAndGet(poll.getRight().intValue());
            j++;
        }
        hashMap.forEach((str3, atomicLong) -> {
            String format = String.format("%s/%s", str, str3);
            try {
                ((DistributedAtomicLong) map.computeIfAbsent(str3, str3 -> {
                    return new DistributedAtomicLong(this.client, format, new RetryNTimes(10, 10));
                })).add(Long.valueOf(atomicLong.get())).postValue();
            } catch (Exception e) {
                Logger.error(e, "dump data from queue to zookeeper error: ", e.getMessage());
            }
        });
    }

    private long getValue(Object obj) throws Exception {
        long j = 0;
        if (null != obj) {
            j = obj instanceof DistributedAtomicLong ? getValue(((DistributedAtomicLong) obj).get()) : obj instanceof AtomicValue ? ((Long) ((AtomicValue) obj).postValue()).longValue() : ((AtomicLong) obj).get();
        }
        return j;
    }
}
