package io.teknek.daemon;

import com.google.common.annotations.VisibleForTesting;
import io.teknek.datalayer.WorkerDao;
import io.teknek.datalayer.WorkerDaoException;
import io.teknek.plan.Plan;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.recipes.lock.LockListener;
import org.apache.zookeeper.recipes.lock.WriteLock;

/* loaded from: input_file:io/teknek/daemon/TeknekDaemon.class */
public class TeknekDaemon implements Watcher {
    private static final Logger logger = Logger.getLogger(TeknekDaemon.class.getName());
    public static final String ZK_SERVER_LIST = "teknek.zk.servers";
    public static final String MAX_WORKERS = "teknek.max.workers";
    public static final String DAEMON_ID = "teknek.daemon.id";
    private int maxWorkers;
    private String myId;
    private Properties properties;
    private ZooKeeper zk;
    ConcurrentHashMap<Plan, List<Worker>> workerThreads;
    private String hostname;
    private CountDownLatch awaitConnection;
    private long rescanMillis = 5000;
    private boolean goOn = true;

    public TeknekDaemon(Properties properties) {
        this.maxWorkers = 4;
        this.properties = properties;
        if (properties.containsKey(DAEMON_ID)) {
            this.myId = properties.getProperty(DAEMON_ID);
        } else {
            this.myId = UUID.randomUUID().toString();
        }
        this.workerThreads = new ConcurrentHashMap<>();
        if (properties.containsKey(MAX_WORKERS)) {
            this.maxWorkers = Integer.parseInt(properties.getProperty(MAX_WORKERS));
        }
        try {
            setHostname(InetAddress.getLocalHost().getHostName());
        } catch (UnknownHostException e) {
            setHostname("unknown");
        }
    }

    /* JADX WARN: Type inference failed for: r0v14, types: [io.teknek.daemon.TeknekDaemon$1] */
    public void init() {
        logger.debug("Daemon id:" + this.myId);
        logger.info("connecting to " + this.properties.getProperty(ZK_SERVER_LIST));
        this.awaitConnection = new CountDownLatch(1);
        try {
            this.zk = new ZooKeeper(this.properties.getProperty(ZK_SERVER_LIST), 1000, this);
            if (!this.awaitConnection.await(10L, TimeUnit.SECONDS)) {
                throw new RuntimeException("Did not connect before timeout");
            }
            try {
                WorkerDao.createZookeeperBase(this.zk);
                WorkerDao.createEphemeralNodeForDaemon(this.zk, this);
                new Thread() { // from class: io.teknek.daemon.TeknekDaemon.1
                    @Override // java.lang.Thread, java.lang.Runnable
                    public void run() {
                        while (TeknekDaemon.this.goOn) {
                            try {
                                if (TeknekDaemon.this.workerThreads.size() < TeknekDaemon.this.maxWorkers) {
                                    List<String> finalAllPlanNames = WorkerDao.finalAllPlanNames(TeknekDaemon.this.zk);
                                    TeknekDaemon.logger.debug("List of plans: " + finalAllPlanNames);
                                    Iterator<String> it = finalAllPlanNames.iterator();
                                    while (it.hasNext()) {
                                        TeknekDaemon.this.considerStarting(it.next());
                                    }
                                } else {
                                    TeknekDaemon.logger.debug("Will not attemt to start worker. Already at max workers " + TeknekDaemon.this.workerThreads.size());
                                }
                            } catch (Exception e) {
                                TeknekDaemon.logger.error("Exception during scan " + e);
                                e.printStackTrace();
                            }
                            try {
                                Thread.sleep(TeknekDaemon.this.rescanMillis);
                            } catch (InterruptedException e2) {
                                e2.printStackTrace();
                            }
                        }
                    }
                }.start();
            } catch (WorkerDaoException e) {
                throw new RuntimeException(e);
            }
        } catch (IOException | InterruptedException e2) {
            throw new RuntimeException(e2);
        }
    }

    @VisibleForTesting
    public void applyPlan(Plan plan) {
        try {
            WorkerDao.createOrUpdatePlan(plan, this.zk);
        } catch (WorkerDaoException e) {
            e.printStackTrace();
        }
    }

    @VisibleForTesting
    public void deletePlan(Plan plan) {
        try {
            WorkerDao.deletePlan(this.zk, plan);
        } catch (WorkerDaoException e) {
            e.printStackTrace();
        }
    }

    public boolean isPlanSane(Plan plan) {
        if (plan == null) {
            logger.error("did not find plan");
            return false;
        }
        if (plan.isDisabled()) {
            logger.debug("disabled " + plan.getName());
            return false;
        }
        if (plan.getFeedDesc() != null) {
            return true;
        }
        logger.debug("feed was null " + plan.getName());
        return false;
    }

    @VisibleForTesting
    boolean alreadyAtMaxWorkersPerNode(Plan plan, List<String> list, List<Worker> list2) {
        if (plan.getMaxWorkersPerNode() == 0) {
            return false;
        }
        int i = 0;
        if (list2 == null) {
            return false;
        }
        for (Worker worker : list2) {
            Iterator<String> it = list.iterator();
            while (it.hasNext()) {
                if (worker.getMyId().toString().equals(it.next())) {
                    i++;
                }
            }
        }
        return i >= plan.getMaxWorkersPerNode();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void considerStarting(String str) {
        try {
            Plan findPlanByName = WorkerDao.findPlanByName(this.zk, str);
            if (!alreadyAtMaxWorkersPerNode(findPlanByName, WorkerDao.findWorkersWorkingOnPlan(this.zk, findPlanByName), this.workerThreads.get(findPlanByName)) && isPlanSane(findPlanByName)) {
                logger.debug("trying to acqure lock on /teknek/locks/" + findPlanByName.getName());
                try {
                    WorkerDao.maybeCreatePlanLockDir(this.zk, findPlanByName);
                    final CountDownLatch countDownLatch = new CountDownLatch(1);
                    WriteLock writeLock = new WriteLock(this.zk, "/teknek/locks/" + findPlanByName.getName(), null);
                    writeLock.setLockListener(new LockListener() { // from class: io.teknek.daemon.TeknekDaemon.2
                        @Override // org.apache.zookeeper.recipes.lock.LockListener
                        public void lockAcquired() {
                            TeknekDaemon.logger.debug(TeknekDaemon.this.myId + " counting down");
                            countDownLatch.countDown();
                        }

                        @Override // org.apache.zookeeper.recipes.lock.LockListener
                        public void lockReleased() {
                            TeknekDaemon.logger.debug(TeknekDaemon.this.myId + " released");
                        }
                    });
                    try {
                        try {
                            writeLock.lock();
                            if (countDownLatch.await(3000L, TimeUnit.MILLISECONDS)) {
                                try {
                                    Plan findPlanByName2 = WorkerDao.findPlanByName(this.zk, str);
                                    if (findPlanByName2.isDisabled()) {
                                        logger.debug("disabled " + findPlanByName2.getName());
                                        try {
                                            writeLock.unlock();
                                            return;
                                        } catch (RuntimeException e) {
                                            logger.debug(e);
                                            e.printStackTrace();
                                            return;
                                        }
                                    }
                                    List<String> findWorkersWorkingOnPlan = WorkerDao.findWorkersWorkingOnPlan(this.zk, findPlanByName2);
                                    if (findWorkersWorkingOnPlan.size() >= findPlanByName2.getMaxWorkers()) {
                                        logger.debug("already running max children:" + findWorkersWorkingOnPlan.size() + " planmax:" + findPlanByName2.getMaxWorkers() + " running:" + findWorkersWorkingOnPlan);
                                        try {
                                            writeLock.unlock();
                                            return;
                                        } catch (RuntimeException e2) {
                                            logger.debug(e2);
                                            e2.printStackTrace();
                                            return;
                                        }
                                    }
                                    logger.debug("starting worker");
                                    try {
                                        Worker worker = new Worker(findPlanByName2, findWorkersWorkingOnPlan, this);
                                        worker.init();
                                        worker.start();
                                        addWorkerToList(findPlanByName2, worker);
                                    } catch (RuntimeException e3) {
                                        throw new WorkerStartException(e3);
                                    }
                                } catch (WorkerDaoException e4) {
                                    logger.error(e4);
                                    try {
                                        writeLock.unlock();
                                    } catch (RuntimeException e5) {
                                        logger.debug(e5);
                                        e5.printStackTrace();
                                    }
                                }
                            }
                        } catch (KeeperException | WorkerStartException | WorkerDaoException | InterruptedException e6) {
                            logger.warn("getting lock", e6);
                            try {
                                writeLock.unlock();
                            } catch (RuntimeException e7) {
                                logger.debug(e7);
                                e7.printStackTrace();
                            }
                        }
                    } finally {
                        try {
                            writeLock.unlock();
                        } catch (RuntimeException e8) {
                            logger.debug(e8);
                            e8.printStackTrace();
                        }
                    }
                } catch (WorkerDaoException e9) {
                    logger.error(e9);
                }
            }
        } catch (WorkerDaoException e10) {
            logger.error(e10);
        }
    }

    private void addWorkerToList(Plan plan, Worker worker) {
        logger.debug("adding worker " + worker.getMyId() + " to plan " + plan.getName());
        List<Worker> list = this.workerThreads.get(plan);
        if (list == null) {
            list = Collections.synchronizedList(new ArrayList());
        }
        list.add(worker);
        this.workerThreads.put(plan, list);
    }

    public void process(WatchedEvent watchedEvent) {
        if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
            this.awaitConnection.countDown();
        }
    }

    public String getMyId() {
        return this.myId;
    }

    public void setMyId(String str) {
        this.myId = str;
    }

    public void stop() {
        this.goOn = false;
    }

    public Properties getProperties() {
        return this.properties;
    }

    public long getRescanMillis() {
        return this.rescanMillis;
    }

    public void setRescanMillis(long j) {
        this.rescanMillis = j;
    }

    public String getHostname() {
        return this.hostname;
    }

    @VisibleForTesting
    void setHostname(String str) {
        this.hostname = str;
    }

    public static void main(String[] strArr) {
        new TeknekDaemon(System.getProperties()).init();
    }
}
