package ai.grakn.engine.backgroundtasks.standalone;

import ai.grakn.engine.TaskStatus;
import ai.grakn.engine.backgroundtasks.BackgroundTask;
import ai.grakn.engine.backgroundtasks.TaskManager;
import ai.grakn.engine.backgroundtasks.TaskState;
import ai.grakn.engine.backgroundtasks.TaskStateStorage;
import ai.grakn.engine.backgroundtasks.taskstatestorage.TaskStateInMemoryStore;
import ai.grakn.engine.util.ConfigProperties;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import javafx.util.Pair;
import mjson.Json;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ai/grakn/engine/backgroundtasks/standalone/StandaloneTaskManager.class */
public class StandaloneTaskManager implements TaskManager {
    private static final String EXCEPTION_CATCHER_NAME = "Task Exception Catcher.";
    private static final String SAVE_CHECKPOINT_NAME = "Save task checkpoint.";
    private final Logger LOG = LoggerFactory.getLogger(StandaloneTaskManager.class);
    private final Map<String, Pair<ScheduledFuture<?>, BackgroundTask>> instantiatedTasks = new ConcurrentHashMap();
    private final TaskStateStorage stateStorage = new TaskStateInMemoryStore();
    private final ReentrantLock stateUpdateLock = new ReentrantLock();
    private final ExecutorService executorService;
    private final ScheduledExecutorService schedulingService;

    public StandaloneTaskManager() {
        ConfigProperties configProperties = ConfigProperties.getInstance();
        this.schedulingService = Executors.newScheduledThreadPool(1);
        this.executorService = Executors.newFixedThreadPool(configProperties.getAvailableThreads());
    }

    public TaskManager open() {
        return this;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.executorService.shutdown();
        this.schedulingService.shutdown();
    }

    @Override // ai.grakn.engine.backgroundtasks.TaskManager
    public String createTask(String str, String str2, Instant instant, long j, Json json) {
        Boolean valueOf = Boolean.valueOf(j != 0);
        TaskState configuration = new TaskState(str).creator(str2).runAt(instant).isRecurring(valueOf).interval(j).configuration(json);
        this.stateStorage.newState(configuration);
        long millis = Duration.between(Instant.now(), instant).toMillis();
        try {
            this.stateStorage.updateState(configuration.status(TaskStatus.SCHEDULED).statusChangedBy(getClass().getName()));
            BackgroundTask backgroundTask = (BackgroundTask) Class.forName(str).newInstance();
            this.instantiatedTasks.put(configuration.getId(), new Pair<>(valueOf.booleanValue() ? this.schedulingService.scheduleAtFixedRate(runTask(configuration.getId(), backgroundTask, true), millis, j, TimeUnit.MILLISECONDS) : this.schedulingService.schedule(runTask(configuration.getId(), backgroundTask, false), millis, TimeUnit.MILLISECONDS), backgroundTask));
            return configuration.getId();
        } catch (Throwable th) {
            this.LOG.error(ExceptionUtils.getFullStackTrace(th));
            this.stateStorage.updateState(configuration.status(TaskStatus.FAILED).exception(ExceptionUtils.getFullStackTrace(th)));
            this.instantiatedTasks.remove(configuration.getId());
            return null;
        }
    }

    @Override // ai.grakn.engine.backgroundtasks.TaskManager
    public TaskManager stopTask(String str, String str2) {
        try {
            this.stateUpdateLock.lock();
            TaskState state = this.stateStorage.getState(str);
            if (state == null) {
                return this;
            }
            Pair<ScheduledFuture<?>, BackgroundTask> pair = this.instantiatedTasks.get(str);
            synchronized (pair) {
                if (state.status() == TaskStatus.SCHEDULED || (state.status() == TaskStatus.COMPLETED && state.isRecurring().booleanValue())) {
                    this.LOG.info("Stopping a currently scheduled task " + str);
                    ((ScheduledFuture) pair.getKey()).cancel(true);
                    this.stateStorage.updateState(state.status(TaskStatus.STOPPED));
                } else if (state.status() == TaskStatus.RUNNING) {
                    this.LOG.info("Stopping running task " + str);
                    BackgroundTask backgroundTask = (BackgroundTask) pair.getValue();
                    if (backgroundTask != null) {
                        backgroundTask.stop();
                    }
                    this.stateStorage.updateState(state.status(TaskStatus.STOPPED));
                } else {
                    this.LOG.warn("Task not running - " + str);
                }
            }
            this.stateUpdateLock.unlock();
            return this;
        } finally {
            this.stateUpdateLock.unlock();
        }
    }

    @Override // ai.grakn.engine.backgroundtasks.TaskManager
    public TaskStateStorage storage() {
        return this.stateStorage;
    }

    private Runnable exceptionCatcher(TaskState taskState, BackgroundTask backgroundTask) {
        return () -> {
            try {
                backgroundTask.start(saveCheckpoint(taskState), taskState.configuration());
                this.stateUpdateLock.lock();
                if (taskState.status() == TaskStatus.RUNNING) {
                    this.stateStorage.updateState(taskState.status(TaskStatus.COMPLETED).statusChangedBy(EXCEPTION_CATCHER_NAME));
                }
                this.stateUpdateLock.unlock();
            } catch (Throwable th) {
                this.LOG.error(ExceptionUtils.getFullStackTrace(th));
                this.stateStorage.updateState(taskState.status(TaskStatus.FAILED).statusChangedBy(EXCEPTION_CATCHER_NAME).exception(ExceptionUtils.getFullStackTrace(th)));
            }
        };
    }

    private Runnable runTask(String str, BackgroundTask backgroundTask, Boolean bool) {
        return () -> {
            this.stateUpdateLock.lock();
            TaskState state = this.stateStorage.getState(str);
            if (bool.booleanValue() && (state.status() == TaskStatus.SCHEDULED || state.status() == TaskStatus.COMPLETED)) {
                this.stateStorage.updateState(state.status(TaskStatus.RUNNING).isRecurring(true));
                this.executorService.submit(exceptionCatcher(state, backgroundTask));
            } else if (!bool.booleanValue() && state.status() == TaskStatus.SCHEDULED) {
                this.stateStorage.updateState(state.status(TaskStatus.RUNNING).isRecurring(false));
                this.executorService.submit(exceptionCatcher(state, backgroundTask));
            }
            this.stateUpdateLock.unlock();
        };
    }

    private Consumer<String> saveCheckpoint(TaskState taskState) {
        return str -> {
            this.stateUpdateLock.lock();
            this.stateStorage.updateState(taskState.checkpoint(SAVE_CHECKPOINT_NAME));
            this.stateUpdateLock.unlock();
        };
    }
}
