package ai.grakn.engine.tasks.manager.singlequeue;

import ai.grakn.engine.TaskId;
import ai.grakn.engine.TaskStatus;
import ai.grakn.engine.tasks.BackgroundTask;
import ai.grakn.engine.tasks.ExternalOffsetStorage;
import ai.grakn.engine.tasks.TaskCheckpoint;
import ai.grakn.engine.tasks.TaskConfiguration;
import ai.grakn.engine.tasks.TaskState;
import ai.grakn.engine.tasks.TaskStateStorage;
import ai.grakn.engine.util.EngineID;
import ai.grakn.engine.util.ExceptionWrapper;
import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ai/grakn/engine/tasks/manager/singlequeue/SingleQueueTaskRunner.class */
public class SingleQueueTaskRunner implements Runnable, AutoCloseable {
    private static final Logger LOG;
    private final Consumer<TaskState, TaskConfiguration> consumer;
    private final SingleQueueTaskManager manager;
    private final TaskStateStorage storage;
    private final ExternalOffsetStorage offsetStorage;
    private final EngineID engineID;
    private final int MAX_TIME_SINCE_HANDLED_BEFORE_BACKOFF;
    private Instant timeTaskLastHandled;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final AtomicBoolean wakeUp = new AtomicBoolean(false);
    private final CountDownLatch countDownLatch = new CountDownLatch(1);
    private TaskId runningTaskId = null;
    private BackgroundTask runningTask = null;
    private final int BACKOFF = 0;

    public SingleQueueTaskRunner(SingleQueueTaskManager singleQueueTaskManager, EngineID engineID, ExternalOffsetStorage externalOffsetStorage, int i, Consumer<TaskState, TaskConfiguration> consumer) {
        this.manager = singleQueueTaskManager;
        this.storage = singleQueueTaskManager.storage();
        this.consumer = consumer;
        this.engineID = engineID;
        this.offsetStorage = externalOffsetStorage;
        this.MAX_TIME_SINCE_HANDLED_BEFORE_BACKOFF = i;
    }

    @Override // java.lang.Runnable
    public void run() {
        boolean z;
        AssertionError assertionError;
        LOG.debug("started");
        this.timeTaskLastHandled = Instant.now();
        while (!this.wakeUp.get()) {
            try {
                readRecords(this.consumer);
                long millis = Duration.between(this.timeTaskLastHandled, Instant.now()).toMillis();
                if (millis >= this.MAX_TIME_SINCE_HANDLED_BEFORE_BACKOFF) {
                    LOG.trace("has been  " + millis + " ms since handled task, sleeping for 0ms");
                    Thread.sleep(0L);
                }
            } finally {
                if (!z) {
                }
            }
        }
        this.countDownLatch.countDown();
        LOG.debug("stopped");
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.wakeUp.set(true);
        CountDownLatch countDownLatch = this.countDownLatch;
        countDownLatch.getClass();
        ExceptionWrapper.noThrow(countDownLatch::await, "Error waiting for the TaskRunner loop to finish");
        Consumer<TaskState, TaskConfiguration> consumer = this.consumer;
        consumer.getClass();
        ExceptionWrapper.noThrow(consumer::close, "Error closing the task runner");
    }

    public boolean stopTask(TaskId taskId) {
        return taskId.equals(this.runningTaskId) && this.runningTask.stop();
    }

    private void readRecords(Consumer<TaskState, TaskConfiguration> consumer) {
        Iterator it = consumer.poll(1000L).iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            TaskState taskState = (TaskState) consumerRecord.key();
            if (handleTask(taskState, (TaskConfiguration) consumerRecord.value())) {
                this.timeTaskLastHandled = Instant.now();
            }
            this.offsetStorage.saveOffset(consumer, new TopicPartition(consumerRecord.topic(), consumerRecord.partition()));
            LOG.trace("{} acknowledged", taskState.getId());
        }
    }

    private boolean handleTask(TaskState taskState, TaskConfiguration taskConfiguration) {
        LOG.debug("{}\treceived", taskState);
        TaskState latestState = getLatestState(taskState);
        if (shouldStopTask(latestState)) {
            stopTask(latestState);
            return true;
        }
        if (shouldDelayTask(latestState)) {
            resubmitTask(latestState, taskConfiguration);
            return false;
        }
        TaskState executeTask = executeTask(latestState, taskConfiguration);
        if (!taskShouldRecur(executeTask)) {
            return true;
        }
        resubmitTask(executeTask, taskConfiguration);
        return true;
    }

    private TaskState executeTask(TaskState taskState, TaskConfiguration taskConfiguration) {
        boolean start;
        try {
            try {
                this.runningTaskId = taskState.getId();
                this.runningTask = taskState.taskClass().newInstance();
                if (taskShouldResume(taskState)) {
                    LOG.debug("{}\tresuming ", taskState);
                    taskState.markRunning(this.engineID);
                    putState(taskState);
                    start = this.runningTask.resume(saveCheckpoint(taskState), taskState.checkpoint());
                } else {
                    taskState.markRunning(this.engineID);
                    putState(taskState);
                    LOG.debug("{}\tmarked as running", taskState);
                    BackgroundTask backgroundTask = this.runningTask;
                    java.util.function.Consumer<TaskCheckpoint> saveCheckpoint = saveCheckpoint(taskState);
                    SingleQueueTaskManager singleQueueTaskManager = this.manager;
                    singleQueueTaskManager.getClass();
                    start = backgroundTask.start(saveCheckpoint, taskConfiguration, singleQueueTaskManager::addTask);
                }
                if (start) {
                    taskState.markCompleted();
                } else {
                    taskState.markStopped();
                }
                this.runningTask = null;
                this.runningTaskId = null;
                if (taskShouldRecur(taskState)) {
                    taskState.schedule(taskState.schedule().incrementByInterval());
                }
                this.storage.updateState(taskState);
                LOG.debug("{}\tmarked as {}", taskState, taskState.status());
            } catch (Throwable th) {
                taskState.markFailed(th);
                LOG.error("{}\tfailed with {}", taskState.getId(), th.getMessage());
                this.runningTask = null;
                this.runningTaskId = null;
                if (taskShouldRecur(taskState)) {
                    taskState.schedule(taskState.schedule().incrementByInterval());
                }
                this.storage.updateState(taskState);
                LOG.debug("{}\tmarked as {}", taskState, taskState.status());
            }
            return taskState;
        } catch (Throwable th2) {
            this.runningTask = null;
            this.runningTaskId = null;
            if (taskShouldRecur(taskState)) {
                taskState.schedule(taskState.schedule().incrementByInterval());
            }
            this.storage.updateState(taskState);
            LOG.debug("{}\tmarked as {}", taskState, taskState.status());
            throw th2;
        }
    }

    private void resubmitTask(TaskState taskState, TaskConfiguration taskConfiguration) {
        this.manager.addTask(taskState, taskConfiguration);
        LOG.debug("{}\tresubmitted with {}", taskState, taskState.priority().queue());
    }

    private void stopTask(TaskState taskState) {
        taskState.markStopped();
        putState(taskState);
        LOG.debug("{}\t marked as stopped", taskState);
    }

    private boolean shouldDelayTask(TaskState taskState) {
        return !taskState.schedule().runAt().isBefore(Instant.now());
    }

    private boolean taskShouldRecur(TaskState taskState) {
        return (!taskState.schedule().isRecurring() || taskState.status().equals(TaskStatus.FAILED) || taskState.status().equals(TaskStatus.STOPPED)) ? false : true;
    }

    private boolean taskShouldResume(TaskState taskState) {
        return taskState.status() == TaskStatus.RUNNING;
    }

    private boolean shouldStopTask(TaskState taskState) {
        return taskState.status() == TaskStatus.STOPPED || this.manager.isTaskMarkedStopped(taskState.getId());
    }

    private TaskState getLatestState(TaskState taskState) {
        return this.storage.containsTask(taskState.getId()) ? this.storage.getState(taskState.getId()) : taskState;
    }

    private void putState(TaskState taskState) {
        if (this.storage.containsTask(taskState.getId())) {
            this.storage.updateState(taskState);
        } else {
            this.storage.newState(taskState);
        }
    }

    private java.util.function.Consumer<TaskCheckpoint> saveCheckpoint(TaskState taskState) {
        return taskCheckpoint -> {
            this.storage.updateState(taskState.checkpoint(taskCheckpoint));
        };
    }

    static {
        $assertionsDisabled = !SingleQueueTaskRunner.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(SingleQueueTaskRunner.class);
    }
}
