package ai.grakn.engine.backgroundtasks.standalone;

import ai.grakn.engine.backgroundtasks.BackgroundTask;
import ai.grakn.engine.backgroundtasks.StateStorage;
import ai.grakn.engine.backgroundtasks.TaskManager;
import ai.grakn.engine.backgroundtasks.TaskState;
import ai.grakn.engine.backgroundtasks.TaskStatus;
import ai.grakn.engine.backgroundtasks.taskstorage.InMemoryStateStorage;
import ai.grakn.engine.util.ConfigProperties;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import javafx.util.Pair;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ai/grakn/engine/backgroundtasks/standalone/StandaloneTaskManager.class */
public class StandaloneTaskManager implements TaskManager {
    private static final String RUN_ONCE_NAME = "One off task scheduler.";
    private static final String RUN_RECURRING_NAME = "Recurring task scheduler.";
    private static final String EXCEPTION_CATCHER_NAME = "Task Exception Catcher.";
    private static final String SAVE_CHECKPOINT_NAME = "Save task checkpoint.";
    private static StandaloneTaskManager instance = null;
    private final Logger LOG = LoggerFactory.getLogger(StandaloneTaskManager.class);
    private final Map<String, Pair<ScheduledFuture<?>, BackgroundTask>> instantiatedTasks = new ConcurrentHashMap();
    private final StateStorage stateStorage = InMemoryStateStorage.getInstance();
    private final ReentrantLock stateUpdateLock = new ReentrantLock();
    private final ExecutorService executorService;
    private final ScheduledExecutorService schedulingService;

    private StandaloneTaskManager() {
        ConfigProperties configProperties = ConfigProperties.getInstance();
        this.schedulingService = Executors.newScheduledThreadPool(1);
        this.executorService = Executors.newFixedThreadPool(configProperties.getAvailableThreads());
    }

    public static synchronized StandaloneTaskManager getInstance() {
        if (instance == null) {
            instance = new StandaloneTaskManager();
        }
        return instance;
    }

    @Override // ai.grakn.engine.backgroundtasks.TaskManager
    public TaskManager open() {
        return this;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.executorService.shutdown();
        this.schedulingService.shutdown();
        instance = null;
    }

    @Override // ai.grakn.engine.backgroundtasks.TaskManager
    public String scheduleTask(BackgroundTask backgroundTask, String str, Date date, long j, JSONObject jSONObject) {
        Boolean valueOf = Boolean.valueOf(j != 0);
        String newState = this.stateStorage.newState(backgroundTask.getClass().getName(), str, date, valueOf, j, jSONObject);
        long time = new Date().getTime() - date.getTime();
        try {
            this.stateStorage.updateState(newState, TaskStatus.SCHEDULED, getClass().getName(), null, null, null, null);
            this.instantiatedTasks.put(newState, new Pair<>(valueOf.booleanValue() ? this.schedulingService.scheduleAtFixedRate(runTask(newState, backgroundTask, true), time, j, TimeUnit.MILLISECONDS) : this.schedulingService.schedule(runTask(newState, backgroundTask, false), time, TimeUnit.MILLISECONDS), backgroundTask));
            return newState;
        } catch (Throwable th) {
            this.LOG.error(ExceptionUtils.getFullStackTrace(th));
            this.stateStorage.updateState(newState, TaskStatus.FAILED, getClass().getName(), null, th, null, null);
            this.instantiatedTasks.remove(newState);
            return null;
        }
    }

    @Override // ai.grakn.engine.backgroundtasks.TaskManager
    public CompletableFuture completableFuture(String str) {
        if (!this.instantiatedTasks.containsKey(str)) {
            return null;
        }
        try {
            return CompletableFuture.runAsync(() -> {
                while (true) {
                    try {
                        TaskState state = storage().getState(str);
                        if (state.status().equals(TaskStatus.COMPLETED) || state.status().equals(TaskStatus.FAILED)) {
                            break;
                        } else {
                            Thread.sleep(1000L);
                        }
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            });
        } catch (Throwable th) {
            this.LOG.error(ExceptionUtils.getFullStackTrace(th));
            throw new RuntimeException(th);
        }
    }

    @Override // ai.grakn.engine.backgroundtasks.TaskManager
    public TaskManager stopTask(String str, String str2) {
        this.stateUpdateLock.lock();
        TaskState state = this.stateStorage.getState(str);
        if (state == null) {
            return this;
        }
        Pair<ScheduledFuture<?>, BackgroundTask> pair = this.instantiatedTasks.get(str);
        String name = getClass().getName();
        synchronized (pair) {
            if (state.status() == TaskStatus.SCHEDULED || (state.status() == TaskStatus.COMPLETED && state.isRecurring().booleanValue())) {
                this.LOG.info("Stopping a currently scheduled task " + str);
                ((ScheduledFuture) pair.getKey()).cancel(true);
                this.stateStorage.updateState(str, TaskStatus.STOPPED, name, null, null, null, null);
            } else if (state.status() == TaskStatus.RUNNING) {
                this.LOG.info("Stopping running task " + str);
                BackgroundTask backgroundTask = (BackgroundTask) pair.getValue();
                if (backgroundTask != null) {
                    backgroundTask.stop();
                }
                this.stateStorage.updateState(str, TaskStatus.STOPPED, name, null, null, null, null);
            } else {
                this.LOG.warn("Task not running - " + str);
            }
        }
        this.stateUpdateLock.unlock();
        return this;
    }

    @Override // ai.grakn.engine.backgroundtasks.TaskManager
    public StateStorage storage() {
        return this.stateStorage;
    }

    private Runnable exceptionCatcher(String str, BackgroundTask backgroundTask) {
        return () -> {
            try {
                backgroundTask.start(saveCheckpoint(str), this.stateStorage.getState(str).configuration());
                this.stateUpdateLock.lock();
                if (this.stateStorage.getState(str).status() == TaskStatus.RUNNING) {
                    this.stateStorage.updateState(str, TaskStatus.COMPLETED, EXCEPTION_CATCHER_NAME, null, null, null, null);
                }
                this.stateUpdateLock.unlock();
            } catch (Throwable th) {
                this.LOG.error(ExceptionUtils.getFullStackTrace(th));
                this.stateStorage.updateState(str, TaskStatus.FAILED, EXCEPTION_CATCHER_NAME, null, th, null, null);
            }
        };
    }

    private Runnable runTask(String str, BackgroundTask backgroundTask, Boolean bool) {
        return () -> {
            this.stateUpdateLock.lock();
            TaskState state = this.stateStorage.getState(str);
            if (bool.booleanValue() && (state.status() == TaskStatus.SCHEDULED || state.status() == TaskStatus.COMPLETED)) {
                this.stateStorage.updateState(str, TaskStatus.RUNNING, RUN_RECURRING_NAME, null, null, null, null);
                this.executorService.submit(exceptionCatcher(str, backgroundTask));
            } else if (!bool.booleanValue() && state.status() == TaskStatus.SCHEDULED) {
                this.stateStorage.updateState(str, TaskStatus.RUNNING, RUN_ONCE_NAME, null, null, null, null);
                this.executorService.submit(exceptionCatcher(str, backgroundTask));
            }
            this.stateUpdateLock.unlock();
        };
    }

    private Consumer<String> saveCheckpoint(String str) {
        return str2 -> {
            this.stateUpdateLock.lock();
            this.stateStorage.updateState(str, this.stateStorage.getState(str).status(), SAVE_CHECKPOINT_NAME, null, null, str2, null);
            this.stateUpdateLock.unlock();
        };
    }
}
