package ai.grakn.engine.backgroundtasks.distributed;

import ai.grakn.engine.backgroundtasks.config.ZookeeperPaths;
import ai.grakn.engine.backgroundtasks.taskstorage.SynchronizedStateStorage;
import ai.grakn.engine.util.EngineID;
import ai.grakn.engine.util.ExceptionWrapper;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;

/* loaded from: input_file:ai/grakn/engine/backgroundtasks/distributed/ClusterManager.class */
public class ClusterManager extends LeaderSelectorListenerAdapter {
    private static ClusterManager instance = null;
    private LeaderSelector leaderSelector;
    private Scheduler scheduler;
    private TreeCache cache;
    private TaskRunner taskRunner;
    private Thread taskRunnerThread;
    private SynchronizedStateStorage zookeeperStorage;
    private final KafkaLogger LOG = KafkaLogger.getInstance();
    private final CountDownLatch leaderInitLatch = new CountDownLatch(1);
    private final String engineID = EngineID.getInstance().id();

    public static synchronized ClusterManager getInstance() {
        if (instance == null) {
            instance = new ClusterManager();
        }
        return instance;
    }

    private ClusterManager() {
    }

    public void start() {
        try {
            this.LOG.debug("Starting Cluster manager, called by " + Thread.currentThread().getStackTrace()[1]);
            this.zookeeperStorage = SynchronizedStateStorage.getInstance();
            this.taskRunner = new TaskRunner();
            this.taskRunner.open();
            this.taskRunnerThread = new Thread(this.taskRunner);
            this.taskRunnerThread.start();
            this.leaderSelector = new LeaderSelector(this.zookeeperStorage.connection(), ZookeeperPaths.SCHEDULER, this);
            this.leaderSelector.autoRequeue();
            this.leaderSelector.start();
            while (!this.leaderSelector.getLeader().isLeader()) {
                Thread.sleep(1000L);
            }
            if (this.leaderSelector.hasLeadership()) {
                this.leaderInitLatch.await();
            }
            this.LOG.debug("ClusterManager started, a leader has been elected.");
        } catch (Exception e) {
            this.LOG.error(ExceptionUtils.getFullStackTrace(e));
            throw new RuntimeException(e);
        }
    }

    public void stop() {
        LeaderSelector leaderSelector = this.leaderSelector;
        leaderSelector.getClass();
        ExceptionWrapper.noThrow(leaderSelector::interruptLeadership, "Could not interrupt leadership.");
        LeaderSelector leaderSelector2 = this.leaderSelector;
        leaderSelector2.getClass();
        ExceptionWrapper.noThrow(leaderSelector2::close, "Could not close leaderSelector.");
        if (this.scheduler != null) {
            Scheduler scheduler = this.scheduler;
            scheduler.getClass();
            ExceptionWrapper.noThrow(scheduler::close, "Could not stop scheduler.");
        }
        if (this.cache != null) {
            TreeCache treeCache = this.cache;
            treeCache.getClass();
            ExceptionWrapper.noThrow(treeCache::close, "Could not close ZK Tree Cache.");
        }
        TaskRunner taskRunner = this.taskRunner;
        taskRunner.getClass();
        ExceptionWrapper.noThrow(taskRunner::close, "Could not stop TaskRunner.");
        try {
            this.taskRunnerThread.join();
        } catch (Throwable th) {
            this.LOG.error("Exception whilst waiting for TaskRunner thread to join - " + ExceptionUtils.getFullStackTrace(th));
        }
        SynchronizedStateStorage synchronizedStateStorage = this.zookeeperStorage;
        synchronizedStateStorage.getClass();
        ExceptionWrapper.noThrow(synchronizedStateStorage::close, "Could not close ZK storage.");
        this.zookeeperStorage = null;
    }

    public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
        registerFailover(curatorFramework);
        this.scheduler = new Scheduler();
        this.scheduler.open();
        this.LOG.info(this.engineID + " has taken over the scheduler.");
        Thread thread = new Thread(this.scheduler);
        thread.setDaemon(true);
        thread.start();
        this.leaderInitLatch.countDown();
        thread.join();
    }

    public Scheduler getScheduler() {
        return this.scheduler;
    }

    private void registerFailover(CuratorFramework curatorFramework) throws Exception {
        this.cache = new TreeCache(curatorFramework, ZookeeperPaths.RUNNERS_WATCH);
        TaskFailover open = TaskFailover.getInstance().open(curatorFramework, this.cache);
        Throwable th = null;
        try {
            try {
                this.cache.getListenable().addListener(open);
                if (open != null) {
                    if (0 != 0) {
                        try {
                            open.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        open.close();
                    }
                }
                this.cache.start();
            } finally {
            }
        } catch (Throwable th3) {
            if (open != null) {
                if (th != null) {
                    try {
                        open.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    open.close();
                }
            }
            throw th3;
        }
    }
}
