package io.teknek.datalayer;

import io.teknek.daemon.TeknekDaemon;
import io.teknek.daemon.WorkerStatus;
import io.teknek.plan.Bundle;
import io.teknek.plan.FeedDesc;
import io.teknek.plan.OperatorDesc;
import io.teknek.plan.Plan;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.URL;
import java.net.URLConnection;
import java.util.ArrayList;
import java.util.List;
import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;

/* loaded from: input_file:io/teknek/datalayer/WorkerDao.class */
public class WorkerDao {
    static final Logger logger = Logger.getLogger(WorkerDao.class.getName());
    static final String ENCODING = "UTF-8";
    public static final String BASE_ZK = "/teknek";
    public static final String WORKERS_ZK = "/teknek/workers";
    public static final String PLANS_ZK = "/teknek/plans";
    public static final String SAVED_ZK = "/teknek/saved";
    public static final String LOCKS_ZK = "/teknek/locks";

    public static void createZookeeperBase(ZooKeeper zooKeeper) throws WorkerDaoException {
        try {
            if (zooKeeper.exists("/teknek", true) == null) {
                logger.info("Creating /teknek heirarchy");
                zooKeeper.create("/teknek", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            if (zooKeeper.exists(WORKERS_ZK, false) == null) {
                zooKeeper.create(WORKERS_ZK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            if (zooKeeper.exists(PLANS_ZK, true) == null) {
                zooKeeper.create(PLANS_ZK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            if (zooKeeper.exists(SAVED_ZK, false) == null) {
                zooKeeper.create(SAVED_ZK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            if (zooKeeper.exists(LOCKS_ZK, false) == null) {
                zooKeeper.create(LOCKS_ZK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (KeeperException | InterruptedException e) {
            throw new WorkerDaoException((Throwable) e);
        }
    }

    public static List<String> findWorkersWorkingOnPlan(ZooKeeper zooKeeper, Plan plan) throws WorkerDaoException {
        try {
            return zooKeeper.getChildren("/teknek/plans/" + plan.getName(), false);
        } catch (KeeperException | InterruptedException e) {
            throw new WorkerDaoException((Throwable) e);
        }
    }

    public static List<String> finalAllPlanNames(ZooKeeper zooKeeper) throws WorkerDaoException {
        try {
            return zooKeeper.getChildren(PLANS_ZK, false);
        } catch (KeeperException | InterruptedException e) {
            throw new WorkerDaoException((Throwable) e);
        }
    }

    public static Plan findPlanByName(ZooKeeper zooKeeper, String str) throws WorkerDaoException {
        try {
            return deserializePlan(zooKeeper.getData("/teknek/plans/" + str, false, zooKeeper.exists("/teknek/plans/" + str, false)));
        } catch (IOException | KeeperException | InterruptedException e) {
            throw new WorkerDaoException(e);
        }
    }

    public static Plan deserializePlan(byte[] bArr) throws JsonParseException, JsonMappingException, IOException {
        return (Plan) new ObjectMapper().readValue(bArr, Plan.class);
    }

    public static byte[] serializePlan(Plan plan) throws WorkerDaoException {
        ObjectMapper objectMapper = new ObjectMapper();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        try {
            objectMapper.writeValue(byteArrayOutputStream, plan);
            return byteArrayOutputStream.toByteArray();
        } catch (IOException e) {
            throw new WorkerDaoException(e);
        }
    }

    public static void createOrUpdatePlan(Plan plan, ZooKeeper zooKeeper) throws WorkerDaoException {
        try {
            createZookeeperBase(zooKeeper);
            Stat exists = zooKeeper.exists("/teknek/plans/" + plan.getName(), false);
            if (exists != null) {
                zooKeeper.setData("/teknek/plans/" + plan.getName(), serializePlan(plan), exists.getVersion());
            } else {
                zooKeeper.create("/teknek/plans/" + plan.getName(), serializePlan(plan), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (KeeperException | InterruptedException e) {
            throw new WorkerDaoException((Throwable) e);
        }
    }

    public static void createEphemeralNodeForDaemon(ZooKeeper zooKeeper, TeknekDaemon teknekDaemon) throws WorkerDaoException {
        try {
            zooKeeper.create("/teknek/workers/" + teknekDaemon.getMyId().toString(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        } catch (KeeperException | InterruptedException e) {
            throw new WorkerDaoException((Throwable) e);
        }
    }

    public static List<WorkerStatus> findAllWorkerStatusForPlan(ZooKeeper zooKeeper, Plan plan, List<String> list) throws WorkerDaoException {
        ArrayList arrayList = new ArrayList();
        for (String str : list) {
            String str2 = "/teknek/plans/" + plan.getName() + "/" + str;
            try {
                arrayList.add(new WorkerStatus(str, new String(zooKeeper.getData(str2, false, zooKeeper.exists(str2, false)), "UTF-8")));
            } catch (KeeperException | UnsupportedEncodingException | InterruptedException e) {
                throw new WorkerDaoException((Throwable) e);
            }
        }
        return arrayList;
    }

    public static void registerWorkerStatus(ZooKeeper zooKeeper, Plan plan, WorkerStatus workerStatus) throws WorkerDaoException {
        String str = "/teknek/plans/" + plan.getName() + "/" + workerStatus.getWorkerUuid();
        try {
            zooKeeper.create(str, workerStatus.getFeedPartitionId().getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            logger.debug("Registered as ephemeral " + str);
            zooKeeper.exists("/teknek/plans/" + plan.getName(), true);
        } catch (KeeperException | UnsupportedEncodingException | InterruptedException e) {
            throw new WorkerDaoException((Throwable) e);
        }
    }

    public static FeedDesc loadSavedFeedDesc(ZooKeeper zooKeeper, String str, String str2) throws WorkerDaoException {
        String str3 = "/teknek/saved/" + str + "-" + str2 + "-feedDesc";
        try {
            Stat exists = zooKeeper.exists(str3, false);
            if (exists != null) {
                return deserializeFeedDesc(zooKeeper.getData(str3, false, exists));
            }
            throw new WorkerDaoException("not found in zk");
        } catch (KeeperException | IOException | InterruptedException e) {
            throw new WorkerDaoException((Throwable) e);
        }
    }

    public static OperatorDesc loadSavedOperatorDesc(ZooKeeper zooKeeper, String str, String str2) throws WorkerDaoException {
        String str3 = "/teknek/saved/" + str + "-" + str2 + "-operatorDesc";
        try {
            Stat exists = zooKeeper.exists(str3, false);
            if (exists != null) {
                return deserializeOperatorDesc(zooKeeper.getData(str3, false, exists));
            }
            throw new WorkerDaoException("not found in zk");
        } catch (KeeperException | IOException | InterruptedException e) {
            throw new WorkerDaoException((Throwable) e);
        }
    }

    public static void saveOperatorDesc(ZooKeeper zooKeeper, OperatorDesc operatorDesc, String str, String str2) throws WorkerDaoException {
        String str3 = "/teknek/saved/" + str + "-" + str2 + "-operatorDesc";
        createZookeeperBase(zooKeeper);
        try {
            logger.debug("Created " + zooKeeper.create(str3, serializeOperatorDesc(operatorDesc), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
        } catch (KeeperException | IOException | InterruptedException e) {
            throw new WorkerDaoException((Throwable) e);
        }
    }

    public static void saveFeedDesc(ZooKeeper zooKeeper, FeedDesc feedDesc, String str, String str2) throws WorkerDaoException {
        String str3 = "/teknek/saved/" + str + "-" + str2 + "-feedDesc";
        createZookeeperBase(zooKeeper);
        try {
            logger.debug("Created " + zooKeeper.create(str3, serializeFeedDesc(feedDesc), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
        } catch (KeeperException | IOException | InterruptedException e) {
            throw new WorkerDaoException((Throwable) e);
        }
    }

    public static OperatorDesc deserializeOperatorDesc(byte[] bArr) throws JsonParseException, JsonMappingException, IOException {
        return (OperatorDesc) new ObjectMapper().readValue(bArr, OperatorDesc.class);
    }

    public static FeedDesc deserializeFeedDesc(byte[] bArr) throws JsonParseException, JsonMappingException, IOException {
        return (FeedDesc) new ObjectMapper().readValue(bArr, FeedDesc.class);
    }

    public static byte[] serializeFeedDesc(FeedDesc feedDesc) throws JsonParseException, JsonMappingException, IOException {
        return new ObjectMapper().writeValueAsBytes(feedDesc);
    }

    public static byte[] serializeOperatorDesc(OperatorDesc operatorDesc) throws JsonParseException, JsonMappingException, IOException {
        return new ObjectMapper().writeValueAsBytes(operatorDesc);
    }

    public static Bundle getBundleFromUrl(URL url) throws WorkerDaoException {
        InputStream inputStream = null;
        try {
            try {
                URLConnection openConnection = url.openConnection();
                ObjectMapper objectMapper = new ObjectMapper();
                inputStream = openConnection.getInputStream();
                Bundle bundle = (Bundle) objectMapper.readValue(inputStream, Bundle.class);
                if (inputStream != null) {
                    try {
                        inputStream.close();
                    } catch (IOException e) {
                        logger.debug(e);
                    }
                }
                return bundle;
            } catch (IOException e2) {
                logger.warn(e2.getMessage());
                throw new WorkerDaoException(e2);
            }
        } catch (Throwable th) {
            if (inputStream != null) {
                try {
                    inputStream.close();
                } catch (IOException e3) {
                    logger.debug(e3);
                }
            }
            throw th;
        }
    }

    public static void saveBundle(ZooKeeper zooKeeper, Bundle bundle) throws WorkerDaoException {
        for (OperatorDesc operatorDesc : bundle.getOperatorList()) {
            saveOperatorDesc(zooKeeper, operatorDesc, bundle.getPackageName(), operatorDesc.getName());
        }
        for (FeedDesc feedDesc : bundle.getFeedDescList()) {
            saveFeedDesc(zooKeeper, feedDesc, bundle.getPackageName(), feedDesc.getName());
        }
    }

    public static void deletePlan(ZooKeeper zooKeeper, Plan plan) throws WorkerDaoException {
        String str = "/teknek/plans/" + plan.getName();
        try {
            Stat exists = zooKeeper.exists(str, false);
            if (exists != null) {
                zooKeeper.delete(str, exists.getVersion());
            }
        } catch (KeeperException | InterruptedException e) {
            throw new WorkerDaoException((Throwable) e);
        }
    }

    public static void maybeCreatePlanLockDir(ZooKeeper zooKeeper, Plan plan) throws WorkerDaoException {
        try {
            String str = "/teknek/locks/" + plan.getName();
            if (zooKeeper.exists(str, false) == null) {
                logger.debug("Creating " + str);
                zooKeeper.create(str, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (KeeperException | InterruptedException e) {
            throw new WorkerDaoException((Throwable) e);
        }
    }
}
