package ai.grakn.engine.tasks.manager;

import ai.grakn.engine.GraknEngineConfig;
import ai.grakn.engine.TaskId;
import ai.grakn.engine.TaskStatus;
import ai.grakn.engine.lock.LockProvider;
import ai.grakn.engine.lock.NonReentrantLock;
import ai.grakn.engine.tasks.BackgroundTask;
import ai.grakn.engine.tasks.TaskCheckpoint;
import ai.grakn.engine.tasks.TaskConfiguration;
import ai.grakn.engine.tasks.TaskManager;
import ai.grakn.engine.tasks.TaskSchedule;
import ai.grakn.engine.tasks.TaskState;
import ai.grakn.engine.tasks.TaskStateStorage;
import ai.grakn.engine.tasks.storage.TaskStateInMemoryStore;
import ai.grakn.engine.util.EngineID;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ai/grakn/engine/tasks/manager/StandaloneTaskManager.class */
public class StandaloneTaskManager implements TaskManager {
    private final ExecutorService executorService;
    private final ScheduledExecutorService schedulingService;
    private final EngineID engineID;
    private final Logger LOG = LoggerFactory.getLogger(StandaloneTaskManager.class);
    private final Map<TaskId, BackgroundTask> runningTasks = new ConcurrentHashMap();
    private final Map<TaskId, ScheduledFuture> scheduledTasks = new ConcurrentHashMap();
    private final TaskStateStorage storage = new TaskStateInMemoryStore();
    private final Lock stateUpdateLock = new NonReentrantLock();

    public StandaloneTaskManager(EngineID engineID) {
        this.engineID = engineID;
        GraknEngineConfig graknEngineConfig = GraknEngineConfig.getInstance();
        this.schedulingService = Executors.newScheduledThreadPool(1);
        this.executorService = Executors.newFixedThreadPool(graknEngineConfig.getAvailableThreads());
        LockProvider.instantiate((str, lock) -> {
            return lock != null ? lock : new NonReentrantLock();
        });
    }

    @Override // ai.grakn.engine.tasks.TaskManager
    public void close() {
        this.executorService.shutdown();
        this.schedulingService.shutdownNow();
        this.runningTasks.keySet().forEach(this::stopTask);
        this.runningTasks.clear();
        this.scheduledTasks.values().forEach(scheduledFuture -> {
            scheduledFuture.cancel(true);
        });
        this.scheduledTasks.clear();
        LockProvider.clear();
    }

    @Override // ai.grakn.engine.tasks.TaskManager
    public void addTask(TaskState taskState, TaskConfiguration taskConfiguration) {
        if (!taskState.priority().equals(TaskState.Priority.LOW)) {
            this.LOG.info("Standalone mode only has a single priority.");
        }
        this.storage.newState(taskState);
        Instant now = Instant.now();
        TaskSchedule schedule = taskState.schedule();
        long millis = Duration.between(now, taskState.schedule().runAt()).toMillis();
        Runnable submitTaskForExecution = submitTaskForExecution(taskState, taskConfiguration);
        this.scheduledTasks.put(taskState.getId(), schedule.isRecurring() ? this.schedulingService.scheduleAtFixedRate(submitTaskForExecution, millis, schedule.interval().get().toMillis(), TimeUnit.MILLISECONDS) : this.schedulingService.schedule(submitTaskForExecution, millis, TimeUnit.MILLISECONDS));
        this.LOG.info("Added task " + taskState.getId());
    }

    @Override // ai.grakn.engine.tasks.TaskManager
    public void stopTask(TaskId taskId) {
        TaskState state = this.storage.getState(taskId);
        try {
            if (taskShouldRun(state)) {
                this.LOG.info("Stopping a currently scheduled task {}", taskId);
                state.markStopped();
            } else if (state.status() == TaskStatus.RUNNING && this.runningTasks.containsKey(taskId)) {
                this.LOG.info("Stopping running task {}", taskId);
                this.runningTasks.get(taskId).stop();
                state.markStopped();
            } else {
                this.LOG.warn("Task not running {}, was not stopped", taskId);
            }
        } finally {
            saveState(state);
            cancelTask(state);
        }
    }

    @Override // ai.grakn.engine.tasks.TaskManager
    public TaskStateStorage storage() {
        return this.storage;
    }

    private Runnable executeTask(TaskState taskState, TaskConfiguration taskConfiguration) {
        return () -> {
            boolean start;
            try {
                try {
                    BackgroundTask newInstance = taskState.taskClass().newInstance();
                    this.runningTasks.put(taskState.getId(), newInstance);
                    if (taskShouldResume(taskState)) {
                        start = newInstance.resume(saveCheckpoint(taskState), taskState.checkpoint());
                    } else {
                        taskState.markRunning(this.engineID);
                        saveState(taskState);
                        start = newInstance.start(saveCheckpoint(taskState), taskConfiguration, this::addTask);
                    }
                    if (start) {
                        taskState.markCompleted();
                    } else {
                        taskState.markStopped();
                    }
                    saveState(taskState);
                    this.runningTasks.remove(taskState.getId());
                    cancelTask(taskState);
                } catch (Throwable th) {
                    this.LOG.error("{} failed with {}", taskState.getId(), th.getMessage());
                    taskState.markFailed(th);
                    saveState(taskState);
                    this.runningTasks.remove(taskState.getId());
                    cancelTask(taskState);
                }
            } catch (Throwable th2) {
                saveState(taskState);
                this.runningTasks.remove(taskState.getId());
                cancelTask(taskState);
                throw th2;
            }
        };
    }

    private Runnable submitTaskForExecution(TaskState taskState, TaskConfiguration taskConfiguration) {
        return () -> {
            if (taskShouldRun(this.storage.getState(taskState.getId())) || taskShouldResume(taskState)) {
                this.executorService.submit(executeTask(taskState, taskConfiguration));
            }
        };
    }

    private boolean taskShouldRun(TaskState taskState) {
        return taskState.status() == TaskStatus.CREATED || (taskState.schedule().isRecurring() && taskState.status() == TaskStatus.COMPLETED);
    }

    private boolean taskShouldResume(TaskState taskState) {
        return taskState.status() == TaskStatus.RUNNING;
    }

    private Consumer<TaskCheckpoint> saveCheckpoint(TaskState taskState) {
        return taskCheckpoint -> {
            saveState(taskState.checkpoint(taskCheckpoint));
        };
    }

    private synchronized void cancelTask(TaskState taskState) {
        if (!this.scheduledTasks.containsKey(taskState.getId())) {
            this.LOG.debug("Given task is not scheduled.");
            return;
        }
        if (taskState.status() == TaskStatus.STOPPED || taskState.status() == TaskStatus.FAILED) {
            this.scheduledTasks.remove(taskState.getId()).cancel(true);
        }
        if (taskState.status() != TaskStatus.COMPLETED || taskState.schedule().isRecurring()) {
            return;
        }
        this.scheduledTasks.remove(taskState.getId());
    }

    private void saveState(TaskState taskState) {
        this.stateUpdateLock.lock();
        try {
            this.storage.updateState(taskState);
        } finally {
            this.stateUpdateLock.unlock();
        }
    }
}
