package ai.grakn.engine.backgroundtasks.distributed;

import ai.grakn.engine.TaskStatus;
import ai.grakn.engine.backgroundtasks.BackgroundTask;
import ai.grakn.engine.backgroundtasks.TaskState;
import ai.grakn.engine.backgroundtasks.TaskStateStorage;
import ai.grakn.engine.backgroundtasks.config.ConfigHelper;
import ai.grakn.engine.backgroundtasks.config.KafkaTerms;
import ai.grakn.engine.util.ConfigProperties;
import ai.grakn.engine.util.EngineID;
import ai.grakn.engine.util.ExceptionWrapper;
import ai.grakn.exception.EngineStorageException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.zookeeper.CreateMode;
import org.json.JSONArray;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ai/grakn/engine/backgroundtasks/distributed/TaskRunner.class */
public class TaskRunner implements Runnable, AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(TaskRunner.class);
    private static final ConfigProperties properties = ConfigProperties.getInstance();
    private static final int POLLING_FREQUENCY = properties.getPropertyAsInt(ConfigProperties.TASKRUNNER_POLLING_FREQ);
    private static final String ENGINE_ID = EngineID.getInstance().id();
    private final TaskStateStorage storage;
    private final ZookeeperConnection connection;
    private final CountDownLatch shutdownLatch;
    private final ExecutorService executor;
    private final int executorSize;
    private final Set<String> runningTasks = new HashSet();
    private final AtomicInteger acceptedTasks = new AtomicInteger(0);
    private final KafkaConsumer<String, String> consumer = ConfigHelper.kafkaConsumer(KafkaTerms.TASK_RUNNER_GROUP);

    /* loaded from: input_file:ai/grakn/engine/backgroundtasks/distributed/TaskRunner$HandleRebalance.class */
    private class HandleRebalance implements ConsumerRebalanceListener {
        private HandleRebalance() {
        }

        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            TaskRunner.LOG.debug("TaskRunner consumer partitions assigned " + collection);
        }

        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            TaskRunner.this.consumer.commitSync();
            TaskRunner.LOG.debug("TaskRunner consumer partitions revoked " + collection);
        }
    }

    public TaskRunner(TaskStateStorage taskStateStorage, ZookeeperConnection zookeeperConnection) {
        this.storage = taskStateStorage;
        this.connection = zookeeperConnection;
        this.consumer.subscribe(Collections.singletonList(KafkaTerms.WORK_QUEUE_TOPIC), new HandleRebalance());
        registerAsRunning();
        updateOwnState();
        int availableThreads = properties.getAvailableThreads();
        this.executor = Executors.newFixedThreadPool(availableThreads);
        this.executorSize = availableThreads * 4;
        this.shutdownLatch = new CountDownLatch(1);
        LOG.info("TaskRunner started");
    }

    @Override // java.lang.Runnable
    public void run() {
        while (true) {
            try {
                try {
                    if (getAcceptedTasksCount() < this.executorSize) {
                        processRecords(this.consumer.poll(POLLING_FREQUENCY));
                    }
                } catch (WakeupException e) {
                    LOG.debug("TaskRunner exiting, woken up.");
                    KafkaConsumer<String, String> kafkaConsumer = this.consumer;
                    kafkaConsumer.getClass();
                    ExceptionWrapper.noThrow(kafkaConsumer::commitSync, "Exception syncing commits while closing in TaskRunner");
                    KafkaConsumer<String, String> kafkaConsumer2 = this.consumer;
                    kafkaConsumer2.getClass();
                    ExceptionWrapper.noThrow(kafkaConsumer2::close, "Exception while closing consumer in TaskRunner");
                    CountDownLatch countDownLatch = this.shutdownLatch;
                    countDownLatch.getClass();
                    ExceptionWrapper.noThrow(countDownLatch::countDown, "Exception while counting down close latch in TaskRunner");
                    return;
                } catch (Throwable th) {
                    LOG.error("Error in TaskRunner poll " + ExceptionUtils.getFullStackTrace(th));
                    KafkaConsumer<String, String> kafkaConsumer3 = this.consumer;
                    kafkaConsumer3.getClass();
                    ExceptionWrapper.noThrow(kafkaConsumer3::commitSync, "Exception syncing commits while closing in TaskRunner");
                    KafkaConsumer<String, String> kafkaConsumer4 = this.consumer;
                    kafkaConsumer4.getClass();
                    ExceptionWrapper.noThrow(kafkaConsumer4::close, "Exception while closing consumer in TaskRunner");
                    CountDownLatch countDownLatch2 = this.shutdownLatch;
                    countDownLatch2.getClass();
                    ExceptionWrapper.noThrow(countDownLatch2::countDown, "Exception while counting down close latch in TaskRunner");
                    return;
                }
            } catch (Throwable th2) {
                KafkaConsumer<String, String> kafkaConsumer5 = this.consumer;
                kafkaConsumer5.getClass();
                ExceptionWrapper.noThrow(kafkaConsumer5::commitSync, "Exception syncing commits while closing in TaskRunner");
                KafkaConsumer<String, String> kafkaConsumer6 = this.consumer;
                kafkaConsumer6.getClass();
                ExceptionWrapper.noThrow(kafkaConsumer6::close, "Exception while closing consumer in TaskRunner");
                CountDownLatch countDownLatch3 = this.shutdownLatch;
                countDownLatch3.getClass();
                ExceptionWrapper.noThrow(countDownLatch3::countDown, "Exception while counting down close latch in TaskRunner");
                throw th2;
            }
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        KafkaConsumer<String, String> kafkaConsumer = this.consumer;
        kafkaConsumer.getClass();
        ExceptionWrapper.noThrow(kafkaConsumer::wakeup, "Could not wake up task runner thread.");
        CountDownLatch countDownLatch = this.shutdownLatch;
        countDownLatch.getClass();
        ExceptionWrapper.noThrow(countDownLatch::await, "Error waiting for TaskRunner consumer to exit");
        ExecutorService executorService = this.executor;
        executorService.getClass();
        ExceptionWrapper.noThrow(executorService::shutdownNow, "Could not shutdown scheduling service.");
        LOG.debug("TaskRunner stopped");
    }

    private void processRecords(ConsumerRecords<String, String> consumerRecords) {
        Iterator it = consumerRecords.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            LOG.debug(String.format("Received [%s], currently running: %s has: %s allowed: %s", consumerRecord.key(), Integer.valueOf(getRunningTasksCount()), Integer.valueOf(getAcceptedTasksCount()), Integer.valueOf(this.executorSize)));
            if (getAcceptedTasksCount() >= this.executorSize) {
                seekAndCommit(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset());
                return;
            }
            String str = (String) consumerRecord.key();
            try {
                TaskState state = this.storage.getState(str);
                if (state.status() != TaskStatus.SCHEDULED) {
                    LOG.debug("Cant run this task - " + str + " because\n\t\tstatus: " + state.status());
                } else {
                    this.storage.updateState(state.status(TaskStatus.RUNNING).statusChangedBy(getClass().getName()).engineID(ENGINE_ID));
                    this.acceptedTasks.incrementAndGet();
                    this.executor.submit(() -> {
                        executeTask(state);
                    });
                    seekAndCommit(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset() + 1);
                }
            } catch (EngineStorageException e) {
                LOG.error("Cant run this task - " + str + " because state was not found in storage");
            }
        }
    }

    private void executeTask(TaskState taskState) {
        LOG.debug("Executing task " + taskState.getId());
        try {
            try {
                addRunningTask(taskState.getId());
                BackgroundTask backgroundTask = (BackgroundTask) Class.forName(taskState.taskClassName()).newInstance();
                if (taskState.checkpoint() != null) {
                    backgroundTask.resume(saveCheckpoint(taskState), taskState.checkpoint());
                } else {
                    backgroundTask.start(saveCheckpoint(taskState), taskState.configuration());
                }
                this.storage.updateState(taskState.status(TaskStatus.COMPLETED));
                removeRunningTask(taskState.getId());
                this.acceptedTasks.decrementAndGet();
                LOG.debug("Finished executing task - " + taskState.getId());
            } catch (Throwable th) {
                this.storage.updateState(taskState.status(TaskStatus.FAILED));
                LOG.error("Failed task - " + taskState.getId() + ": " + ExceptionUtils.getFullStackTrace(th));
                removeRunningTask(taskState.getId());
                this.acceptedTasks.decrementAndGet();
                LOG.debug("Finished executing task - " + taskState.getId());
            }
        } catch (Throwable th2) {
            removeRunningTask(taskState.getId());
            this.acceptedTasks.decrementAndGet();
            LOG.debug("Finished executing task - " + taskState.getId());
            throw th2;
        }
    }

    private Consumer<String> saveCheckpoint(TaskState taskState) {
        return str -> {
            this.storage.updateState(taskState.checkpoint(str));
        };
    }

    private void updateOwnState() {
        JSONArray jSONArray = new JSONArray();
        jSONArray.put((Collection) this.runningTasks);
        try {
            this.connection.connection().setData().forPath("/task_runners/last_state/" + ENGINE_ID, jSONArray.toString().getBytes(StandardCharsets.UTF_8));
        } catch (Exception e) {
            LOG.error("Could not update TaskRunner taskstorage in ZooKeeper! " + e);
        }
    }

    private void registerAsRunning() {
        try {
            if (this.connection.connection().checkExists().forPath("/task_runners/watch/" + ENGINE_ID) == null) {
                ((ACLBackgroundPathAndBytesable) this.connection.connection().create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL)).forPath("/task_runners/watch/" + ENGINE_ID);
            }
            if (this.connection.connection().checkExists().forPath("/task_runners/last_state/" + ENGINE_ID) == null) {
                this.connection.connection().create().creatingParentContainersIfNeeded().forPath("/task_runners/last_state/" + ENGINE_ID);
            }
            LOG.debug("Registered TaskRunner");
        } catch (Exception e) {
            throw new RuntimeException("Could not create Zookeeper paths in TaskRunner");
        }
    }

    private synchronized int getAcceptedTasksCount() {
        return this.acceptedTasks.get();
    }

    private synchronized int getRunningTasksCount() {
        return this.runningTasks.size();
    }

    private synchronized void addRunningTask(String str) {
        this.runningTasks.add(str);
        updateOwnState();
    }

    private synchronized void removeRunningTask(String str) {
        this.runningTasks.remove(str);
        updateOwnState();
    }

    private void seekAndCommit(TopicPartition topicPartition, long j) {
        this.consumer.seek(topicPartition, j);
        this.consumer.commitSync();
    }
}
