package ai.grakn.engine.backgroundtasks.distributed;

import ai.grakn.engine.backgroundtasks.BackgroundTask;
import ai.grakn.engine.backgroundtasks.StateStorage;
import ai.grakn.engine.backgroundtasks.TaskState;
import ai.grakn.engine.backgroundtasks.TaskStatus;
import ai.grakn.engine.backgroundtasks.config.ConfigHelper;
import ai.grakn.engine.backgroundtasks.config.KafkaTerms;
import ai.grakn.engine.backgroundtasks.config.ZookeeperPaths;
import ai.grakn.engine.backgroundtasks.taskstorage.GraknStateStorage;
import ai.grakn.engine.backgroundtasks.taskstorage.SynchronizedState;
import ai.grakn.engine.backgroundtasks.taskstorage.SynchronizedStateStorage;
import ai.grakn.engine.util.ConfigProperties;
import ai.grakn.engine.util.EngineID;
import ai.grakn.engine.util.ExceptionWrapper;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.zookeeper.CreateMode;
import org.json.JSONArray;
import org.json.JSONObject;

/* loaded from: input_file:ai/grakn/engine/backgroundtasks/distributed/TaskRunner.class */
public class TaskRunner implements Runnable, AutoCloseable {
    private static final ConfigProperties properties = ConfigProperties.getInstance();
    private ExecutorService executor;
    private StateStorage graknStorage;
    private SynchronizedStateStorage zkStorage;
    private KafkaConsumer<String, String> consumer;
    private CountDownLatch waitToClose;
    private final KafkaLogger LOG = KafkaLogger.getInstance();
    private final Set<String> runningTasks = new HashSet();
    private final String engineID = EngineID.getInstance().id();
    private final AtomicBoolean OPENED = new AtomicBoolean(false);
    private boolean initialised = false;
    private final Integer allowableRunningTasks = Integer.valueOf(properties.getAvailableThreads());
    private volatile boolean running = false;

    @Override // java.lang.Runnable
    public void run() {
        this.running = true;
        while (this.running) {
            try {
                printInitialization();
                this.LOG.debug("TaskRunner polling, size of new tasks " + this.consumer.endOffsets((Collection) this.consumer.partitionsFor(KafkaTerms.WORK_QUEUE_TOPIC).stream().map(partitionInfo -> {
                    return new TopicPartition(KafkaTerms.WORK_QUEUE_TOPIC, partitionInfo.partition());
                }).collect(Collectors.toSet())));
                if (getRunningTasksCount() < this.allowableRunningTasks.intValue()) {
                    processRecords(this.consumer.poll(properties.getPropertyAsInt(ConfigProperties.TASKRUNNER_POLLING_FREQ)));
                } else {
                    Thread.sleep(500L);
                }
            } catch (WakeupException | InterruptedException e) {
                if (this.running) {
                    this.LOG.error("TaskRunner interrupted unexpectedly (without clearing 'running' flag first", e);
                } else {
                    this.LOG.debug("TaskRunner exiting gracefully.");
                }
                return;
            } finally {
                this.consumer.commitSync();
                this.consumer.close();
                this.waitToClose.countDown();
            }
        }
    }

    public TaskRunner open() throws Exception {
        if (this.OPENED.compareAndSet(false, true)) {
            this.graknStorage = new GraknStateStorage();
            this.consumer = ConfigHelper.kafkaConsumer(KafkaTerms.TASK_RUNNER_GROUP);
            this.consumer.subscribe(Collections.singletonList(KafkaTerms.WORK_QUEUE_TOPIC), new RebalanceListener(this.consumer));
            this.zkStorage = SynchronizedStateStorage.getInstance();
            registerAsRunning();
            updateOwnState();
            this.executor = Executors.newFixedThreadPool(properties.getAvailableThreads());
            this.waitToClose = new CountDownLatch(1);
            this.LOG.info("TaskRunner opened.");
        } else {
            this.LOG.error("TaskRunner already opened!");
        }
        return this;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (!this.OPENED.compareAndSet(true, false)) {
            this.LOG.error("TaskRunner close() called before open()!");
            return;
        }
        this.running = false;
        KafkaConsumer<String, String> kafkaConsumer = this.consumer;
        kafkaConsumer.getClass();
        ExceptionWrapper.noThrow(kafkaConsumer::wakeup, "Could not call wakeup on Kafka Consumer.");
        try {
            this.waitToClose.await(5 * properties.getPropertyAsLong(ConfigProperties.TASKRUNNER_POLLING_FREQ), TimeUnit.MILLISECONDS);
        } catch (Throwable th) {
            this.LOG.error("Exception whilst waiting for scheduler run() thread to finish - " + ExceptionUtils.getFullStackTrace(th));
        }
        ExecutorService executorService = this.executor;
        executorService.getClass();
        ExceptionWrapper.noThrow(executorService::shutdownNow, "Could shutdown executor pool.");
        this.graknStorage = null;
        this.zkStorage = null;
        this.LOG.debug("TaskRunner stopped");
    }

    private void processRecords(ConsumerRecords<String, String> consumerRecords) {
        Iterator it = consumerRecords.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            this.LOG.debug("Got a record\n\t\tkey: " + ((String) consumerRecord.key()) + "\n\t\toffset " + consumerRecord.offset() + "\n\t\tvalue " + ((String) consumerRecord.value()));
            this.LOG.debug("Runner currently has tasks: " + getRunningTasksCount() + " allowed: " + this.allowableRunningTasks);
            if (getRunningTasksCount() >= this.allowableRunningTasks.intValue()) {
                seekAndCommit(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset());
                return;
            }
            String str = (String) consumerRecord.key();
            InterProcessMutex acquireMutex = acquireMutex(str);
            if (acquireMutex == null) {
                seekAndCommit(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset());
                return;
            }
            TaskStatus status = getStatus(str);
            if (status == null) {
                seekAndCommit(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset());
                releaseMutex(acquireMutex, str);
                return;
            }
            if (status != TaskStatus.SCHEDULED) {
                this.LOG.debug("Cant schedule this task - " + str + " because\n\t\tstatus: " + status);
                releaseMutex(acquireMutex, str);
            } else {
                addRunningTask(str);
                updateTaskState(str, TaskStatus.RUNNING, getClass().getName(), this.engineID, null, null);
                releaseMutex(acquireMutex, str);
                try {
                    JSONObject jSONObject = new JSONObject((String) consumerRecord.value());
                    this.executor.submit(() -> {
                        executeTask(str, jSONObject);
                    });
                } catch (NullPointerException | RejectedExecutionException e) {
                    removeRunningTask(str);
                    this.LOG.error(ExceptionUtils.getFullStackTrace(e));
                }
                this.LOG.debug("Runner next read from " + ((String) consumerRecord.key()) + " OFFSET " + (consumerRecord.offset() + 1) + " topic " + consumerRecord.topic());
                seekAndCommit(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset() + 1);
            }
        }
    }

    private TaskStatus getStatus(String str) {
        SynchronizedState state = this.zkStorage.getState(str);
        if (state != null) {
            return state.status();
        }
        this.LOG.error("Cant run task - " + str + " - because zkStorage returned null");
        return null;
    }

    private void executeTask(String str, JSONObject jSONObject) {
        try {
            try {
                this.LOG.debug("Executing task " + str);
                TaskState state = this.graknStorage.getState(str);
                this.LOG.debug("Got state of " + str + " from storage");
                ((BackgroundTask) Class.forName(state.taskClassName()).newInstance()).start(saveCheckpoint(str), jSONObject);
                this.LOG.debug("Task - " + str + " completed successfully, updating state in graph");
                updateTaskState(str, TaskStatus.COMPLETED, getClass().getName(), null, null, null);
                removeRunningTask(str);
                this.LOG.debug("Finished executing task - " + str);
            } catch (Throwable th) {
                this.LOG.debug("Failed task - " + str + ": " + ExceptionUtils.getFullStackTrace(th));
                updateTaskState(str, TaskStatus.FAILED, getClass().getName(), null, th, null);
                this.LOG.debug("Updated state " + str);
                removeRunningTask(str);
                this.LOG.debug("Finished executing task - " + str);
            }
        } catch (Throwable th2) {
            removeRunningTask(str);
            this.LOG.debug("Finished executing task - " + str);
            throw th2;
        }
    }

    private InterProcessMutex acquireMutex(String str) {
        InterProcessMutex interProcessMutex = null;
        try {
            if (this.zkStorage.connection().checkExists().forPath("/tasks/" + str + ZookeeperPaths.TASK_LOCK_SUFFIX) == null) {
                this.zkStorage.connection().create().creatingParentContainersIfNeeded().forPath("/tasks/" + str + ZookeeperPaths.TASK_LOCK_SUFFIX);
            }
            interProcessMutex = new InterProcessMutex(this.zkStorage.connection(), "/tasks/" + str + ZookeeperPaths.TASK_LOCK_SUFFIX);
            if (!interProcessMutex.acquire(5000L, TimeUnit.MILLISECONDS)) {
                this.LOG.debug("Could not acquire mutex");
                interProcessMutex = null;
            }
        } catch (Exception e) {
            this.LOG.debug("Exception whilst trying to get mutex for task - " + str + " - " + ExceptionUtils.getFullStackTrace(e));
        }
        this.LOG.debug("<<<<<<<<<<<< Got mutex for - " + str);
        return interProcessMutex;
    }

    private void releaseMutex(InterProcessMutex interProcessMutex, String str) {
        try {
            interProcessMutex.release();
            this.LOG.debug(">>>>>>>>>>>> released mutex for - " + str);
        } catch (Exception e) {
            this.LOG.error("********************************\nCOULD NOT RELEASE MUTEX FOR TASK - " + str + "\n" + ExceptionUtils.getFullStackTrace(e) + "\n********************************");
        }
    }

    private Consumer<String> saveCheckpoint(String str) {
        return str2 -> {
            this.LOG.debug("Writing checkpoint");
            updateTaskState(str, null, null, null, null, str2);
        };
    }

    private void updateTaskState(String str, TaskStatus taskStatus, String str2, String str3, Throwable th, String str4) {
        this.LOG.debug("Updating state of task " + str);
        this.zkStorage.updateState(str, taskStatus, str3, str4);
        try {
            this.graknStorage.updateState(str, taskStatus, str2, str3, th, str4, null);
        } catch (Exception e) {
        }
    }

    private void updateOwnState() {
        JSONArray jSONArray = new JSONArray();
        jSONArray.put((Collection) this.runningTasks);
        try {
            this.zkStorage.connection().setData().forPath("/task_runners/last_state/" + this.engineID, jSONArray.toString().getBytes());
        } catch (Exception e) {
            this.LOG.error("Could not update TaskRunner taskstorage in ZooKeeper! " + e);
        }
    }

    private void registerAsRunning() throws Exception {
        if (this.zkStorage.connection().checkExists().forPath("/task_runners/watch/" + this.engineID) == null) {
            ((ACLBackgroundPathAndBytesable) this.zkStorage.connection().create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL)).forPath("/task_runners/watch/" + this.engineID);
        }
        if (this.zkStorage.connection().checkExists().forPath("/task_runners/last_state/" + this.engineID) == null) {
            this.zkStorage.connection().create().creatingParentContainersIfNeeded().forPath("/task_runners/last_state/" + this.engineID);
        }
        this.LOG.debug("Registered TaskRunner");
    }

    private synchronized int getRunningTasksCount() {
        return this.runningTasks.size();
    }

    private synchronized void addRunningTask(String str) {
        this.runningTasks.add(str);
        updateOwnState();
    }

    private synchronized void removeRunningTask(String str) {
        this.runningTasks.remove(str);
        updateOwnState();
    }

    private void seekAndCommit(TopicPartition topicPartition, long j) {
        this.consumer.seek(topicPartition, j);
        this.consumer.commitSync();
    }

    private void printInitialization() {
        if (this.initialised) {
            return;
        }
        this.initialised = true;
        this.LOG.info("TaskRunner initialised");
    }
}
