package ai.grakn.engine.backgroundtasks.distributed;

import ai.grakn.engine.backgroundtasks.TaskStateStorage;
import ai.grakn.engine.backgroundtasks.config.ZookeeperPaths;
import ai.grakn.engine.util.ExceptionWrapper;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.leader.CancelLeadershipException;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.framework.state.ConnectionState;

/* loaded from: input_file:ai/grakn/engine/backgroundtasks/distributed/SchedulerElector.class */
public class SchedulerElector extends LeaderSelectorListenerAdapter {
    private static final String SCHEDULER_THREAD_NAME = "scheduler-";
    private final LeaderSelector leaderSelector;
    private final TaskStateStorage storage;
    private Scheduler scheduler;
    private TreeCache cache;
    private TaskFailover failover;

    public SchedulerElector(TaskStateStorage taskStateStorage, ZookeeperConnection zookeeperConnection) {
        this.storage = taskStateStorage;
        this.leaderSelector = new LeaderSelector(zookeeperConnection.connection(), ZookeeperPaths.SCHEDULER, this);
        this.leaderSelector.autoRequeue();
        try {
            this.leaderSelector.start();
            while (!this.leaderSelector.getLeader().isLeader()) {
                Thread.sleep(1000L);
            }
        } catch (Exception e) {
            throw new RuntimeException("There were errors electing a leader- Engine should stop");
        }
    }

    public void stop() {
        this.leaderSelector.interruptLeadership();
        LeaderSelector leaderSelector = this.leaderSelector;
        leaderSelector.getClass();
        ExceptionWrapper.noThrow(leaderSelector::close, "Error closing leadership elector");
        if (this.scheduler != null) {
            Scheduler scheduler = this.scheduler;
            scheduler.getClass();
            ExceptionWrapper.noThrow(scheduler::close, "Error closing the Scheduler");
            TaskFailover taskFailover = this.failover;
            taskFailover.getClass();
            ExceptionWrapper.noThrow(taskFailover::close, "Error shutting down task failover hook");
            TreeCache treeCache = this.cache;
            treeCache.getClass();
            ExceptionWrapper.noThrow(treeCache::close, "Error closing zookeeper cache");
        }
    }

    public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
        if (connectionState == ConnectionState.SUSPENDED || connectionState == ConnectionState.LOST) {
            this.scheduler.close();
            throw new CancelLeadershipException();
        }
    }

    public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
        registerFailover(curatorFramework);
        this.scheduler = new Scheduler(this.storage);
        Thread thread = new Thread(this.scheduler, SCHEDULER_THREAD_NAME + this.scheduler.hashCode());
        thread.start();
        thread.join();
    }

    public Scheduler getScheduler() {
        return this.scheduler;
    }

    private void registerFailover(CuratorFramework curatorFramework) throws Exception {
        this.cache = new TreeCache(curatorFramework, ZookeeperPaths.RUNNERS_WATCH);
        this.failover = new TaskFailover(curatorFramework, this.cache, this.storage);
        this.cache.getListenable().addListener(this.failover);
        this.cache.start();
    }
}
