package ai.grakn.engine.backgroundtasks.distributed;

import ai.grakn.engine.TaskStatus;
import ai.grakn.engine.backgroundtasks.TaskState;
import ai.grakn.engine.backgroundtasks.TaskStateStorage;
import ai.grakn.engine.backgroundtasks.config.ConfigHelper;
import ai.grakn.engine.backgroundtasks.config.KafkaTerms;
import ai.grakn.engine.backgroundtasks.config.ZookeeperPaths;
import ai.grakn.engine.util.ExceptionWrapper;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.json.JSONArray;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ai/grakn/engine/backgroundtasks/distributed/TaskFailover.class */
public class TaskFailover implements TreeCacheListener, AutoCloseable {
    private final Logger LOG = LoggerFactory.getLogger(TaskFailover.class);
    private final AtomicBoolean OPENED = new AtomicBoolean(false);
    private final TaskStateStorage stateStorage;
    private Map<String, ChildData> current;
    private TreeCache cache;
    private KafkaProducer<String, String> producer;

    /* renamed from: ai.grakn.engine.backgroundtasks.distributed.TaskFailover$1, reason: invalid class name */
    /* loaded from: input_file:ai/grakn/engine/backgroundtasks/distributed/TaskFailover$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$curator$framework$recipes$cache$TreeCacheEvent$Type = new int[TreeCacheEvent.Type.values().length];

        static {
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$TreeCacheEvent$Type[TreeCacheEvent.Type.NODE_ADDED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$TreeCacheEvent$Type[TreeCacheEvent.Type.NODE_REMOVED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public TaskFailover(CuratorFramework curatorFramework, TreeCache treeCache, TaskStateStorage taskStateStorage) throws Exception {
        this.stateStorage = taskStateStorage;
        if (!this.OPENED.compareAndSet(false, true)) {
            this.LOG.error("TaskFailover already opened!");
            return;
        }
        this.cache = treeCache;
        this.current = treeCache.getCurrentChildren(ZookeeperPaths.RUNNERS_WATCH);
        this.producer = ConfigHelper.kafkaProducer();
        scanStaleStates(curatorFramework);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (!this.OPENED.compareAndSet(true, false)) {
            this.LOG.error("TaskFailover close() called before open().");
            return;
        }
        KafkaProducer<String, String> kafkaProducer = this.producer;
        kafkaProducer.getClass();
        ExceptionWrapper.noThrow(kafkaProducer::flush, "Could not flush Kafka Producer.");
        KafkaProducer<String, String> kafkaProducer2 = this.producer;
        kafkaProducer2.getClass();
        ExceptionWrapper.noThrow(kafkaProducer2::close, "Could not close Kafka Producer.");
    }

    public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
        Map<String, ChildData> currentChildren = this.cache.getCurrentChildren(ZookeeperPaths.RUNNERS_WATCH);
        switch (AnonymousClass1.$SwitchMap$org$apache$curator$framework$recipes$cache$TreeCacheEvent$Type[treeCacheEvent.getType().ordinal()]) {
            case 1:
                this.LOG.debug("New engine joined pool. Current engines: " + currentChildren.keySet());
                this.current = currentChildren;
                return;
            case 2:
                this.LOG.debug("Engine failure detected. Current engines " + currentChildren.keySet());
                failover(curatorFramework, currentChildren);
                this.current = currentChildren;
                return;
            default:
                return;
        }
    }

    private void failover(CuratorFramework curatorFramework, Map<String, ChildData> map) throws Exception {
        for (String str : this.current.keySet()) {
            if (!map.containsKey(str)) {
                this.LOG.debug("Dead engine: " + str);
                reQueue(curatorFramework, str);
            }
        }
    }

    private void reQueue(CuratorFramework curatorFramework, String str) throws Exception {
        Iterator it = new JSONArray(new String((byte[]) curatorFramework.getData().forPath("/task_runners/last_state/" + str), StandardCharsets.UTF_8)).iterator();
        while (it.hasNext()) {
            String str2 = (String) it.next();
            TaskState state = this.stateStorage.getState(str2);
            if (state.status() == TaskStatus.RUNNING) {
                this.LOG.debug(String.format("Engine [%s] stopped, task [%s] requeued", str, state.getId()));
                this.stateStorage.updateState(state.status(TaskStatus.SCHEDULED));
                this.producer.send(new ProducerRecord(KafkaTerms.WORK_QUEUE_TOPIC, str2, state.configuration().toString()));
            } else {
                this.LOG.debug(String.format("Engine [%s] stopped, task [%s] not restarted because state [%s]", str, state.getId(), state.status()));
            }
        }
    }

    private void scanStaleStates(CuratorFramework curatorFramework) throws Exception {
        HashSet hashSet = new HashSet();
        for (String str : (List) curatorFramework.getChildren().forPath(ZookeeperPaths.TASKS_PATH_PREFIX)) {
            TaskState state = this.stateStorage.getState(str);
            if (state.status() != TaskStatus.RUNNING) {
                return;
            }
            String engineID = state.engineID();
            if (engineID == null || engineID.isEmpty()) {
                throw new IllegalStateException("ZK Task SynchronizedState - " + str + " - has no engineID (" + engineID + ") - status " + state.status().toString());
            }
            if (hashSet.contains(engineID)) {
                return;
            }
            if (curatorFramework.checkExists().forPath("/task_runners/watch/" + engineID) == null) {
                reQueue(curatorFramework, engineID);
                hashSet.add(engineID);
            }
        }
    }
}
