package com.github.chen0040.zkcoordinator.services;

import com.github.chen0040.data.utils.TupleTwo;
import com.github.chen0040.zkcoordinator.consts.TaskStates;
import com.github.chen0040.zkcoordinator.consts.UTF8;
import com.github.chen0040.zkcoordinator.models.ChildrenCache;
import com.github.chen0040.zkcoordinator.models.NodeUri;
import com.github.chen0040.zkcoordinator.models.ZkConfig;
import com.github.chen0040.zkcoordinator.utils.TupleThree;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZKUtil;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/chen0040/zkcoordinator/services/TaskAssignmentServiceImpl.class */
public class TaskAssignmentServiceImpl implements TaskAssignmentService {
    private static final Logger logger = LoggerFactory.getLogger(TaskAssignmentServiceImpl.class);
    private static Random rand = new Random();
    private final ZooKeeper zk;
    private final String zkTaskPath;
    private final String zkTaskAssignmentPath;
    private final String workerSystemName;
    private ChildrenCache workersCache;
    private final List<String> workerList = new ArrayList();
    private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    AsyncCallback.StringCallback createTaskCallback = (i, str, obj, str2) -> {
        TupleTwo tupleTwo = (TupleTwo) obj;
        String str = (String) tupleTwo._1();
        Consumer<String> consumer = (Consumer) tupleTwo._2();
        switch (AnonymousClass1.$SwitchMap$org$apache$zookeeper$KeeperException$Code[KeeperException.Code.get(i).ordinal()]) {
            case 1:
                createTask(str, str, consumer);
                return;
            case 2:
                consumer.accept(str);
                return;
            case 3:
                consumer.accept(str);
                return;
            default:
                logger.error("Something went wrong: ", KeeperException.create(KeeperException.Code.get(i), str));
                return;
        }
    };
    private int zkDepth4Tasks = 4;
    AsyncCallback.DataCallback isTaskAssignedCallback = (i, str, obj, bArr, stat) -> {
        BiConsumer<String, Boolean> biConsumer = (BiConsumer) obj;
        String substring = str.substring(str.lastIndexOf("/") + 1);
        switch (AnonymousClass1.$SwitchMap$org$apache$zookeeper$KeeperException$Code[KeeperException.Code.get(i).ordinal()]) {
            case 1:
                isTaskAssigned(substring, biConsumer);
                return;
            case 2:
                biConsumer.accept(substring, Boolean.valueOf(!new String(bArr).equals(TaskStates.Idle)));
                return;
            default:
                logger.error("Something went wrong", KeeperException.create(KeeperException.Code.get(i)), str);
                return;
        }
    };
    AsyncCallback.DataCallback taskExistsCallback = (i, str, obj, bArr, stat) -> {
        BiConsumer<String, Boolean> biConsumer = (BiConsumer) obj;
        String substring = str.substring(str.lastIndexOf("/") + 1);
        switch (AnonymousClass1.$SwitchMap$org$apache$zookeeper$KeeperException$Code[KeeperException.Code.get(i).ordinal()]) {
            case 1:
                taskExists(substring, biConsumer);
                return;
            case 2:
                biConsumer.accept(substring, true);
                return;
            case 3:
            default:
                logger.error("Something went wrong", KeeperException.create(KeeperException.Code.get(i)), str);
                return;
            case 4:
                biConsumer.accept(substring, false);
                return;
        }
    };
    AsyncCallback.DataCallback getWorkerAssigned2TaskCallback = (i, str, obj, bArr, stat) -> {
        Consumer<NodeUri> consumer = (Consumer) obj;
        String substring = str.substring(str.lastIndexOf("/") + 1);
        switch (AnonymousClass1.$SwitchMap$org$apache$zookeeper$KeeperException$Code[KeeperException.Code.get(i).ordinal()]) {
            case 1:
                getWorkerAssigned2Task(substring, consumer);
                return;
            case 2:
                consumer.accept(getUri(new String(bArr)));
                return;
            default:
                return;
        }
    };
    private AsyncCallback.VoidCallback unregisterAbsentWorkerInAssignmentCallback = (i, str, obj) -> {
        String str = (String) obj;
        switch (AnonymousClass1.$SwitchMap$org$apache$zookeeper$KeeperException$Code[KeeperException.Code.get(i).ordinal()]) {
            case 1:
                unregisterAbsentWorkerInAssignment(str);
                return;
            case 2:
                logger.info("Worker unregistered from task assignment: " + str);
                return;
            default:
                logger.error("Error when trying to unregisterAbsentWorkerInAssignment: " + str, KeeperException.create(KeeperException.Code.get(i), str));
                return;
        }
    };
    private AsyncCallback.DataCallback isWorkerRegisteredInAssignmentCallback = (i, str, obj, bArr, stat) -> {
        String substring = str.substring(str.lastIndexOf("/") + 1);
        BiConsumer<String, Boolean> biConsumer = (BiConsumer) obj;
        switch (AnonymousClass1.$SwitchMap$org$apache$zookeeper$KeeperException$Code[KeeperException.Code.get(i).ordinal()]) {
            case 1:
                isWorkerRegisteredInAssignment(substring, biConsumer);
                return;
            case 2:
                biConsumer.accept(substring, true);
                return;
            case 3:
            default:
                logger.error("isWorkerRegisteredInAssignment failed.", KeeperException.create(KeeperException.Code.get(i)), str);
                return;
            case 4:
                biConsumer.accept(substring, false);
                return;
        }
    };
    private AsyncCallback.StringCallback registerNewWorkerInAssignmentCallback = (i, str, obj, str2) -> {
        String substring = str.substring(str.lastIndexOf("/") + 1);
        Consumer<String> consumer = (Consumer) obj;
        switch (AnonymousClass1.$SwitchMap$org$apache$zookeeper$KeeperException$Code[KeeperException.Code.get(i).ordinal()]) {
            case 1:
                registerNewWorkerInAssignment(substring, consumer);
                return;
            case 2:
                consumer.accept(substring);
                return;
            case 3:
                consumer.accept(substring);
                return;
            default:
                logger.error("Something went wrong.", KeeperException.create(KeeperException.Code.get(i)), str);
                return;
        }
    };
    private AsyncCallback.StringCallback createAssignmentCallback = (i, str, obj, str2) -> {
        TupleThree tupleThree = (TupleThree) obj;
        String str = (String) tupleThree._1();
        String str2 = (String) tupleThree._2();
        BiConsumer<String, NodeUri> biConsumer = (BiConsumer) tupleThree._3();
        switch (AnonymousClass1.$SwitchMap$org$apache$zookeeper$KeeperException$Code[KeeperException.Code.get(i).ordinal()]) {
            case 1:
                createAssignment(str, str, str2, biConsumer);
                return;
            case 2:
                logger.info("Task assigned correctly: " + str2);
                biConsumer.accept(str, getUri(str2));
                return;
            case 3:
                biConsumer.accept(str, getUri(str2));
                return;
            default:
                logger.error("createAssignment failed.", KeeperException.create(KeeperException.Code.get(i), str));
                return;
        }
    };
    private AsyncCallback.VoidCallback deleteTaskCallback = (i, str, obj) -> {
        String str = (String) obj;
        switch (AnonymousClass1.$SwitchMap$org$apache$zookeeper$KeeperException$Code[KeeperException.Code.get(i).ordinal()]) {
            case 1:
                deleteTask(str);
                return;
            case 2:
                logger.info("Task deleted: " + str);
                return;
            default:
                logger.error("Error when trying to delete task.", KeeperException.create(KeeperException.Code.get(i), str));
                return;
        }
    };
    private AsyncCallback.StatCallback updateTaskHandlerCallback = (i, str, obj, stat) -> {
        TupleTwo tupleTwo = (TupleTwo) obj;
        String str = (String) tupleTwo._1();
        BiConsumer<String, NodeUri> biConsumer = (BiConsumer) tupleTwo._2();
        String substring = str.substring(str.lastIndexOf("/") + 1);
        switch (AnonymousClass1.$SwitchMap$org$apache$zookeeper$KeeperException$Code[KeeperException.Code.get(i).ordinal()]) {
            case 1:
                updateTaskHandler(str, str, biConsumer);
                return;
            case 2:
                logger.info("Task handler updated: {} = {}", str, str);
                biConsumer.accept(substring, getUri(str));
                return;
            default:
                logger.error("Something went wrong: ", KeeperException.create(KeeperException.Code.get(i), str));
                return;
        }
    };
    private AsyncCallback.DataCallback assignTaskCallback = (i, str, obj, bArr, stat) -> {
        String substring = str.substring(str.lastIndexOf("/") + 1);
        BiConsumer<String, NodeUri> biConsumer = (BiConsumer) obj;
        switch (AnonymousClass1.$SwitchMap$org$apache$zookeeper$KeeperException$Code[KeeperException.Code.get(i).ordinal()]) {
            case 1:
                assignTask(substring, biConsumer);
                return;
            case 2:
                String str = null;
                this.readWriteLock.readLock().lock();
                try {
                    if (!this.workerList.isEmpty()) {
                        str = this.workerList.get(rand.nextInt(this.workerList.size()));
                    }
                    if (str != null) {
                        createAssignment(substring, str, biConsumer);
                        return;
                    } else {
                        logger.error("No worker to assign task: {}", substring);
                        return;
                    }
                } finally {
                    this.readWriteLock.readLock().unlock();
                }
            default:
                logger.error("assignTask failed.", KeeperException.create(KeeperException.Code.get(i), str));
                return;
        }
    };
    private AsyncCallback.ChildrenCallback getAbsentWorkerTasksCallback = (i, str, obj, list) -> {
        String str = (String) obj;
        switch (AnonymousClass1.$SwitchMap$org$apache$zookeeper$KeeperException$Code[KeeperException.Code.get(i).ordinal()]) {
            case 1:
                getAbsentWorkerTasks(str);
                return;
            case 2:
                if (list != null) {
                    unregisterAbsentWorkerInAssignment(str);
                    return;
                }
                return;
            default:
                logger.error("getAbsentWorkerTasks failed.", KeeperException.create(KeeperException.Code.get(i), str));
                return;
        }
    };

    /* renamed from: com.github.chen0040.zkcoordinator.services.TaskAssignmentServiceImpl$1, reason: invalid class name */
    /* loaded from: input_file:com/github/chen0040/zkcoordinator/services/TaskAssignmentServiceImpl$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$zookeeper$KeeperException$Code = new int[KeeperException.Code.values().length];

        static {
            try {
                $SwitchMap$org$apache$zookeeper$KeeperException$Code[KeeperException.Code.CONNECTIONLOSS.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$zookeeper$KeeperException$Code[KeeperException.Code.OK.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$zookeeper$KeeperException$Code[KeeperException.Code.NODEEXISTS.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$zookeeper$KeeperException$Code[KeeperException.Code.NONODE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public TaskAssignmentServiceImpl(ZooKeeper zooKeeper, ZkConfig zkConfig) {
        this.zk = zooKeeper;
        this.zkTaskPath = zkConfig.getTaskPath();
        this.zkTaskAssignmentPath = zkConfig.getTaskAssignmentPath();
        this.workerSystemName = zkConfig.getWorkerSystemName();
    }

    @Override // com.github.chen0040.zkcoordinator.services.TaskAssignmentService
    public void reassignAndSet(List<String> list) {
        List<String> removeAndSet;
        this.readWriteLock.writeLock().lock();
        try {
            this.workerList.clear();
            this.workerList.addAll(list);
            this.readWriteLock.readLock().lock();
            try {
                logger.info("Task assignment service gets a new set of workers: {}", Integer.valueOf(list.size()));
                int min = Math.min(20, this.workerList.size());
                for (int i = 0; i < min; i++) {
                    logger.info("worker: {}", this.workerList.get(i));
                }
                if (this.workersCache == null) {
                    this.workersCache = new ChildrenCache(list);
                    removeAndSet = null;
                } else {
                    logger.info("Removing and setting workers");
                    removeAndSet = this.workersCache.removeAndSet(list);
                }
                if (removeAndSet == null || removeAndSet.isEmpty()) {
                    return;
                }
                removeAndSet.forEach(this::getAbsentWorkerTasks);
            } finally {
                this.readWriteLock.readLock().unlock();
            }
        } finally {
            this.readWriteLock.writeLock().unlock();
        }
    }

    private void unregisterAbsentWorkerInAssignment(String str) {
        try {
            ZKUtil.deleteRecursive(this.zk, this.zkTaskAssignmentPath + "/" + str, this.unregisterAbsentWorkerInAssignmentCallback, str);
        } catch (InterruptedException | KeeperException e) {
            logger.error("unregisterAbsentWorkerInAssignment failed.", e);
        }
    }

    private void getAbsentWorkerTasks(String str) {
        this.zk.getChildren(this.zkTaskAssignmentPath + "/" + str, false, this.getAbsentWorkerTasksCallback, str);
    }

    private void reassignTasks4LostWorker(List<String> list) {
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            assignTask(it.next(), (str, nodeUri) -> {
                logger.info("Task reassigned: {}", str);
            });
        }
    }

    private void createAssignment(String str, String str2, BiConsumer<String, NodeUri> biConsumer) {
        String str3 = this.zkTaskAssignmentPath + "/" + str2;
        createAssignment(str3, str, str2, (str4, nodeUri) -> {
            String str4 = str3 + "/" + getPartition(str, 0, this.zkDepth4Tasks);
            createAssignment(str4, str, str2, (str5, nodeUri) -> {
                String str5 = str4 + "/" + getPartition(str, 1, this.zkDepth4Tasks);
                createAssignment(str5, str, str2, (str6, nodeUri) -> {
                    String str6 = str5 + "/" + getPartition(str, 2, this.zkDepth4Tasks);
                    createAssignment(str6, str, str2, (str7, nodeUri) -> {
                        String str7 = str6 + "/" + getPartition(str, 3, this.zkDepth4Tasks);
                        createAssignment(str7, str, str2, (str8, nodeUri) -> {
                            createAssignment(str7 + "/" + str, str, str2, (str8, nodeUri) -> {
                                updateTaskHandler(getPath4TaskId(str), str2, biConsumer);
                            });
                        });
                    });
                });
            });
        });
    }

    @Override // com.github.chen0040.zkcoordinator.services.TaskAssignmentService
    public void assignTask(String str, BiConsumer<String, NodeUri> biConsumer) {
        this.zk.getData(getPath4TaskId(str), false, this.assignTaskCallback, biConsumer);
    }

    private void isWorkerRegisteredInAssignment(String str, BiConsumer<String, Boolean> biConsumer) {
        this.zk.getData(this.zkTaskAssignmentPath + "/" + str, false, this.isWorkerRegisteredInAssignmentCallback, biConsumer);
    }

    private void registerNewWorkerInAssignment(String str, Consumer<String> consumer) {
        this.zk.create(this.zkTaskAssignmentPath + "/" + str, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, this.registerNewWorkerInAssignmentCallback, consumer);
    }

    private void createAssignment(String str, String str2, String str3, BiConsumer<String, NodeUri> biConsumer) {
        this.zk.create(str, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, this.createAssignmentCallback, new TupleThree(str2, str3, biConsumer));
    }

    private String getPath4TaskId(String str) {
        StringBuilder sb = new StringBuilder();
        sb.append(this.zkTaskPath);
        for (int i = 0; i < this.zkDepth4Tasks; i++) {
            sb.append("/" + getPartition(str, i, this.zkDepth4Tasks));
        }
        sb.append("/" + str);
        return sb.toString();
    }

    private int getPartition(String str, int i, int i2) {
        int length = str.length();
        int i3 = length / i2;
        int i4 = i * i3;
        return hash(str.substring(i4, Math.min(i4 + i3, length)));
    }

    private int hash(String str) {
        return Math.abs(str.hashCode() % 100);
    }

    private void deleteTask(String str) {
        this.zk.delete(getPath4TaskId(str), -1, this.deleteTaskCallback, str);
    }

    private void updateTaskHandler(String str, String str2, BiConsumer<String, NodeUri> biConsumer) {
        this.zk.setData(str, UTF8.getBytes(str2), -1, this.updateTaskHandlerCallback, new TupleTwo(str2, biConsumer));
    }

    private void createTask(String str, String str2, Consumer<String> consumer) {
        this.zk.create(str, UTF8.getBytes(TaskStates.Idle), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, this.createTaskCallback, new TupleTwo(str2, consumer));
    }

    @Override // com.github.chen0040.zkcoordinator.services.TaskAssignmentService
    public void isTaskAssigned(String str, BiConsumer<String, Boolean> biConsumer) {
        this.zk.getData(getPath4TaskId(str), false, this.isTaskAssignedCallback, biConsumer);
    }

    @Override // com.github.chen0040.zkcoordinator.services.TaskAssignmentService
    public void taskExists(String str, BiConsumer<String, Boolean> biConsumer) {
        this.zk.getData(getPath4TaskId(str), false, this.taskExistsCallback, biConsumer);
    }

    private NodeUri getUri(String str) {
        NodeUri nodeUri = new NodeUri();
        String[] split = str.split("_");
        nodeUri.setHost(split[0]);
        nodeUri.setPort(Integer.parseInt(split[1]));
        nodeUri.setSystem(this.workerSystemName);
        return nodeUri;
    }

    @Override // com.github.chen0040.zkcoordinator.services.TaskAssignmentService
    public void getWorkerAssigned2Task(String str, Consumer<NodeUri> consumer) {
        this.zk.getData(getPath4TaskId(str), false, this.getWorkerAssigned2TaskCallback, consumer);
    }

    @Override // com.github.chen0040.zkcoordinator.services.TaskAssignmentService
    public void createTask(String str, Consumer<String> consumer) {
        String str2 = this.zkTaskPath + "/" + getPartition(str, 0, this.zkDepth4Tasks);
        createTask(str2, str, str3 -> {
            String str3 = str2 + "/" + getPartition(str, 1, this.zkDepth4Tasks);
            createTask(str3, str, str4 -> {
                String str4 = str3 + "/" + getPartition(str, 2, this.zkDepth4Tasks);
                createTask(str4, str, str5 -> {
                    String str5 = str4 + "/" + getPartition(str, 3, this.zkDepth4Tasks);
                    createTask(str5, str, str6 -> {
                        createTask(str5 + "/" + str, str, consumer);
                    });
                });
            });
        });
    }
}
