package org.flinkextended.flink.ml.cluster.storage;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.flinkextended.flink.ml.util.MLConstants;
import org.flinkextended.flink.ml.util.MLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/flinkextended/flink/ml/cluster/storage/ZookeeperStorageImpl.class */
public class ZookeeperStorageImpl implements Storage {
    private CuratorFramework client;
    private static final int DEFAULT_ZK_TIMEOUT = 6000;
    private static final Logger LOG = LoggerFactory.getLogger(ZookeeperStorageImpl.class);
    static Map<Pair<String, String>, Pair<CuratorFramework, Integer>> map = new ConcurrentHashMap();
    private static final Duration SESSION_TIMEOUT = Duration.ofMinutes(1);

    private static synchronized CuratorFramework getCuratorFramework(String str, String str2, Map<String, String> map2) {
        Pair<String, String> of = Pair.of(str, str2);
        Pair<CuratorFramework, Integer> pair = map.get(of);
        if (pair != null) {
            map.put(of, Pair.of(pair.getLeft(), Integer.valueOf(((Integer) pair.getRight()).intValue() + 1)));
            return (CuratorFramework) pair.getKey();
        }
        CuratorFramework build = CuratorFrameworkFactory.builder().connectString(str).sessionTimeoutMs((int) SESSION_TIMEOUT.toMillis()).connectionTimeoutMs(Integer.valueOf(map2.getOrDefault(MLConstants.CONFIG_ZOOKEEPER_TIMEOUT, String.valueOf(DEFAULT_ZK_TIMEOUT))).intValue()).retryPolicy(new ExponentialBackoffRetry(1000, 5)).namespace(str2.startsWith("/") ? str2.substring(1) : str2).build();
        build.start();
        LOG.info("Create a new ZK connection. ConnectionStr=" + str + ", basePath=" + str2);
        map.put(of, Pair.of(build, 1));
        return build;
    }

    private static synchronized void returnCuratorFramework(CuratorFramework curatorFramework) {
        Pair<String, String> pair = null;
        Pair<CuratorFramework, Integer> pair2 = null;
        Iterator<Map.Entry<Pair<String, String>, Pair<CuratorFramework, Integer>>> it = map.entrySet().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Map.Entry<Pair<String, String>, Pair<CuratorFramework, Integer>> next = it.next();
            if (((CuratorFramework) next.getValue().getKey()).equals(curatorFramework)) {
                pair = next.getKey();
                pair2 = Pair.of(next.getValue().getLeft(), Integer.valueOf(((Integer) next.getValue().getValue()).intValue() - 1));
                break;
            }
        }
        if (pair != null) {
            if (((Integer) pair2.getRight()).intValue() > 0) {
                map.put(pair, pair2);
                return;
            }
            map.remove(pair);
            ((CuratorFramework) pair2.getKey()).close();
            LOG.info("Close ZK connection.");
        }
    }

    public ZookeeperStorageImpl(String str, String str2, Map<String, String> map2) throws IOException {
        this.client = getCuratorFramework(str, str2, map2);
    }

    public void start() {
    }

    @Override // org.flinkextended.flink.ml.cluster.storage.Storage
    public byte[] getValue(String str) throws IOException {
        String str2 = "/" + str;
        try {
            this.client.sync().forPath(str2);
            return (byte[]) this.client.getData().forPath(str2);
        } catch (KeeperException.NoNodeException e) {
            return null;
        } catch (Exception e2) {
            throw new MLException("Failed to get value for path " + str, e2);
        }
    }

    @Override // org.flinkextended.flink.ml.cluster.storage.Storage
    public void setValue(String str, byte[] bArr) throws IOException {
        String str2 = "/" + str;
        try {
            if (((Stat) this.client.checkExists().forPath(str2)) != null) {
                LOG.info(str + " is replaced with new value.");
                this.client.setData().forPath(str2, bArr);
            } else {
                this.client.create().creatingParentsIfNeeded().forPath(str2, bArr);
            }
        } catch (Exception e) {
            throw new MLException("Fail to create zookeeper node " + str, e);
        }
    }

    @Override // org.flinkextended.flink.ml.cluster.storage.Storage
    public void removeValue(String str) throws IOException {
        try {
            this.client.delete().deletingChildrenIfNeeded().forPath("/" + str);
        } catch (Exception e) {
            throw new MLException("Fail to delete node " + str, e);
        } catch (KeeperException.NoNodeException e2) {
        }
    }

    @Override // org.flinkextended.flink.ml.cluster.storage.Storage
    public List<String> listChildren(String str) throws IOException {
        try {
            return (List) this.client.getChildren().forPath("/" + str);
        } catch (Exception e) {
            throw new MLException("Failed to list children for path " + str, e);
        } catch (KeeperException.NoNodeException e2) {
            return new ArrayList();
        }
    }

    @Override // org.flinkextended.flink.ml.cluster.storage.Storage
    public boolean exists(String str) throws IOException {
        try {
            return null != ((Stat) this.client.checkExists().forPath(new StringBuilder().append("/").append(str).toString()));
        } catch (Exception e) {
            throw new MLException("Fail to check path existence " + str, e);
        }
    }

    @Override // org.flinkextended.flink.ml.cluster.storage.Storage
    public void close() {
        returnCuratorFramework(this.client);
    }

    @Override // org.flinkextended.flink.ml.cluster.storage.Storage
    public void clear() {
        try {
            this.client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/");
        } catch (Exception e) {
            LOG.warn("Failed to delete ZK node {}", this.client.getNamespace(), e);
        }
    }
}
