package io.fluo.core.oracle;

import io.fluo.accumulo.util.ZookeeperConstants;
import io.fluo.core.impl.CuratorCnxnListener;
import io.fluo.core.impl.Environment;
import io.fluo.core.thrift.OracleService;
import io.fluo.core.util.UtilWaitThread;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.framework.recipes.leader.Participant;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.transport.TFastFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/fluo/core/oracle/OracleClient.class */
public class OracleClient {
    private Participant currentLeader;
    private final Environment env;
    private final ArrayBlockingQueue<TimeRequest> queue = new ArrayBlockingQueue<>(1000);
    public static final Logger log = LoggerFactory.getLogger(OracleClient.class);
    private static final Map<String, OracleClient> clients = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/fluo/core/oracle/OracleClient$TimeRequest.class */
    public static final class TimeRequest {
        CountDownLatch cdl;
        AtomicLong timestamp;

        private TimeRequest() {
            this.cdl = new CountDownLatch(1);
            this.timestamp = new AtomicLong();
        }
    }

    /* loaded from: input_file:io/fluo/core/oracle/OracleClient$TimestampRetriever.class */
    private class TimestampRetriever extends LeaderSelectorListenerAdapter implements Runnable, PathChildrenCacheListener {
        private LeaderSelector leaderSelector;
        private CuratorFramework curatorFramework;
        private OracleService.Client client;
        private PathChildrenCache pathChildrenCache;
        private TTransport transport;

        private TimestampRetriever() {
        }

        @Override // java.lang.Runnable
        public void run() {
            String oraclePath = ZookeeperConstants.oraclePath(OracleClient.this.env.getZookeeperRoot());
            try {
                this.curatorFramework = CuratorFrameworkFactory.newClient(OracleClient.this.env.getConnector().getInstance().getZooKeepers(), new ExponentialBackoffRetry(1000, 10));
                CuratorCnxnListener curatorCnxnListener = new CuratorCnxnListener();
                this.curatorFramework.getConnectionStateListenable().addListener(curatorCnxnListener);
                this.curatorFramework.start();
                while (!curatorCnxnListener.isConnected()) {
                    Thread.sleep(200L);
                }
                this.pathChildrenCache = new PathChildrenCache(this.curatorFramework, oraclePath, true);
                this.pathChildrenCache.getListenable().addListener(this);
                this.pathChildrenCache.start();
                this.leaderSelector = new LeaderSelector(this.curatorFramework, oraclePath, this);
                connect();
                doWork();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
            if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED) || pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED) || pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
                Participant leader = this.leaderSelector.getLeader();
                synchronized (this) {
                    if (isLeader(leader)) {
                        OracleClient.this.currentLeader = this.leaderSelector.getLeader();
                    } else {
                        OracleClient.this.currentLeader = null;
                    }
                }
            }
        }

        private void doWork() {
            String oracle;
            OracleService.Client client;
            long timestamps;
            String oracle2;
            ArrayList arrayList = new ArrayList();
            while (true) {
                try {
                    arrayList.clear();
                    arrayList.add(OracleClient.this.queue.take());
                    OracleClient.this.queue.drainTo(arrayList);
                    while (true) {
                        try {
                            synchronized (this) {
                                oracle = OracleClient.this.getOracle();
                                client = this.client;
                            }
                            timestamps = client.getTimestamps(OracleClient.this.env.getFluoInstanceID(), arrayList.size());
                            oracle2 = OracleClient.this.getOracle();
                        } catch (TException e) {
                            e.printStackTrace();
                        } catch (TTransportException e2) {
                            OracleClient.log.info("Oracle connection lost. Retrying...");
                            reconnect();
                        }
                        if (oracle2 == null || oracle2.equals(oracle)) {
                            break;
                        } else {
                            reconnect();
                        }
                    }
                    for (int i = 0; i < arrayList.size(); i++) {
                        TimeRequest timeRequest = (TimeRequest) arrayList.get(i);
                        timeRequest.timestamp.set(timestamps + i);
                        timeRequest.cdl.countDown();
                    }
                } catch (Exception e3) {
                    e3.printStackTrace();
                }
            }
        }

        private synchronized void connect() throws IOException, KeeperException, InterruptedException, TTransportException {
            getLeader();
            while (true) {
                OracleClient.log.debug("Connecting to oracle at " + OracleClient.this.currentLeader.getId());
                String[] split = OracleClient.this.currentLeader.getId().split(":");
                try {
                    this.transport = new TFastFramedTransport(new TSocket(split[0], Integer.parseInt(split[1])));
                    this.transport.open();
                    this.client = new OracleService.Client(new TCompactProtocol(this.transport));
                    OracleClient.log.info("Connected to oracle at " + OracleClient.this.getOracle());
                    return;
                } catch (Exception e) {
                    throw new RuntimeException(e);
                } catch (TTransportException e2) {
                    sleepRandom();
                    getLeader();
                }
            }
        }

        private synchronized void reconnect() throws InterruptedException, TTransportException, KeeperException, IOException {
            close();
            connect();
        }

        private void close() {
            if (this.transport.isOpen()) {
                this.transport.close();
            }
        }

        private boolean getLeaderAttempt() {
            Participant participant = null;
            try {
                participant = this.leaderSelector.getLeader();
            } catch (KeeperException e) {
            } catch (Exception e2) {
                throw new RuntimeException(e2);
            }
            if (!isLeader(participant)) {
                return false;
            }
            OracleClient.this.currentLeader = participant;
            return true;
        }

        private void getLeader() {
            boolean leaderAttempt = getLeaderAttempt();
            while (!leaderAttempt) {
                sleepRandom();
                leaderAttempt = getLeaderAttempt();
            }
        }

        private void sleepRandom() {
            UtilWaitThread.sleep(100 + ((long) (1000.0d * Math.random())));
        }

        private boolean isLeader(Participant participant) {
            return participant != null && participant.isLeader();
        }

        public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
        }
    }

    private OracleClient(Environment environment) throws Exception {
        this.env = environment;
        Thread thread = new Thread(new TimestampRetriever());
        thread.setDaemon(true);
        thread.start();
    }

    public long getTimestamp() throws Exception {
        TimeRequest timeRequest = new TimeRequest();
        this.queue.add(timeRequest);
        timeRequest.cdl.await();
        return timeRequest.timestamp.get();
    }

    public synchronized String getOracle() {
        if (this.currentLeader != null) {
            return this.currentLeader.getId();
        }
        return null;
    }

    public static synchronized OracleClient getInstance(Environment environment) {
        String fluoInstanceID = environment.getFluoInstanceID();
        OracleClient oracleClient = clients.get(fluoInstanceID);
        if (oracleClient == null) {
            try {
                oracleClient = new OracleClient(environment);
                clients.put(fluoInstanceID, oracleClient);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        return oracleClient;
    }
}
