package ai.grakn.engine.backgroundtasks.taskstatestorage;

import ai.grakn.engine.TaskStatus;
import ai.grakn.engine.backgroundtasks.TaskState;
import ai.grakn.engine.backgroundtasks.TaskStateStorage;
import ai.grakn.engine.backgroundtasks.config.ZookeeperPaths;
import ai.grakn.engine.backgroundtasks.distributed.ZookeeperConnection;
import ai.grakn.exception.EngineStorageException;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang.SerializationUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ai/grakn/engine/backgroundtasks/taskstatestorage/TaskStateZookeeperStore.class */
public class TaskStateZookeeperStore implements TaskStateStorage {
    private static final String ZK_TASK_PATH = "/tasks/%s/state";
    private final Logger LOG = LoggerFactory.getLogger(TaskStateZookeeperStore.class);
    private final CuratorFramework zookeeperConnection;

    public TaskStateZookeeperStore(ZookeeperConnection zookeeperConnection) {
        this.zookeeperConnection = zookeeperConnection.connection();
    }

    @Override // ai.grakn.engine.backgroundtasks.TaskStateStorage
    public String newState(TaskState taskState) {
        InterProcessMutex mutex = mutex(taskState.getId());
        acquire(mutex);
        try {
            try {
                this.zookeeperConnection.create().creatingParentContainersIfNeeded().forPath(String.format(ZK_TASK_PATH, taskState.getId()), SerializationUtils.serialize(taskState));
                release(mutex);
                return taskState.getId();
            } catch (Exception e) {
                throw new EngineStorageException("Could not write state to storage " + ExceptionUtils.getFullStackTrace(e));
            }
        } catch (Throwable th) {
            release(mutex);
            throw th;
        }
    }

    @Override // ai.grakn.engine.backgroundtasks.TaskStateStorage
    public Boolean updateState(TaskState taskState) {
        InterProcessMutex mutex = mutex(taskState.getId());
        acquire(mutex);
        try {
            try {
                this.zookeeperConnection.setData().forPath(String.format(ZK_TASK_PATH, taskState.getId()), SerializationUtils.serialize(taskState));
                release(mutex);
                return true;
            } catch (Exception e) {
                this.LOG.error("Could not write to ZooKeeper! - " + e);
                release(mutex);
                return false;
            }
        } catch (Throwable th) {
            release(mutex);
            throw th;
        }
    }

    @Override // ai.grakn.engine.backgroundtasks.TaskStateStorage
    public TaskState getState(String str) {
        InterProcessMutex mutex = mutex(str);
        try {
            try {
                mutex.acquire();
                TaskState taskState = (TaskState) SerializationUtils.deserialize((byte[]) this.zookeeperConnection.getData().forPath("/tasks/" + str + ZookeeperPaths.TASK_STATE_SUFFIX));
                release(mutex);
                return taskState;
            } catch (Exception e) {
                throw new EngineStorageException("Could not get state from storage " + ExceptionUtils.getFullStackTrace(e));
            }
        } catch (Throwable th) {
            release(mutex);
            throw th;
        }
    }

    @Override // ai.grakn.engine.backgroundtasks.TaskStateStorage
    public Set<TaskState> getTasks(TaskStatus taskStatus, String str, String str2, int i, int i2) {
        try {
            Stream map = ((List) this.zookeeperConnection.getChildren().forPath(ZookeeperPaths.TASKS_PATH_PREFIX)).stream().map(this::getState);
            if (taskStatus != null) {
                map = map.filter(taskState -> {
                    return taskState.status().equals(taskStatus);
                });
            }
            if (str != null) {
                map = map.filter(taskState2 -> {
                    return taskState2.taskClassName().equals(str);
                });
            }
            if (str2 != null) {
                map = map.filter(taskState3 -> {
                    return taskState3.creator().equals(str2);
                });
            }
            Stream skip = map.skip(i2);
            if (i > 0) {
                skip = skip.limit(i);
            }
            return (Set) skip.collect(Collectors.toSet());
        } catch (Exception e) {
            throw new EngineStorageException("Could not get state from storage " + ExceptionUtils.getFullStackTrace(e));
        }
    }

    private InterProcessMutex mutex(String str) {
        return new InterProcessMutex(this.zookeeperConnection, ZookeeperPaths.TASKS_PATH_PREFIX + str + ZookeeperPaths.TASK_LOCK_SUFFIX);
    }

    private void acquire(InterProcessMutex interProcessMutex) {
        try {
            interProcessMutex.acquire();
        } catch (Exception e) {
            throw new EngineStorageException("Error acquiring mutex from zookeeper.");
        }
    }

    private void release(InterProcessMutex interProcessMutex) {
        try {
            interProcessMutex.release();
        } catch (Exception e) {
            throw new EngineStorageException("Error releasing mutex from zookeeper.");
        }
    }
}
