package ai.grakn.engine.tasks.manager.redisqueue;

import ai.grakn.engine.GraknEngineConfig;
import ai.grakn.engine.TaskId;
import ai.grakn.engine.factory.EngineGraknTxFactory;
import ai.grakn.engine.lock.LockProvider;
import ai.grakn.engine.tasks.connection.RedisCountStorage;
import ai.grakn.engine.tasks.manager.TaskConfiguration;
import ai.grakn.engine.tasks.manager.TaskManager;
import ai.grakn.engine.tasks.manager.TaskState;
import ai.grakn.engine.util.EngineID;
import ai.grakn.redisq.Redisq;
import ai.grakn.redisq.RedisqBuilder;
import ai.grakn.redisq.State;
import ai.grakn.redisq.exceptions.StateFutureInitializationException;
import ai.grakn.redisq.exceptions.WaitException;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.util.Pool;

/* loaded from: input_file:ai/grakn/engine/tasks/manager/redisqueue/RedisTaskManager.class */
public class RedisTaskManager implements TaskManager {
    private static final Logger LOG = LoggerFactory.getLogger(RedisTaskManager.class);
    private static final String QUEUE_NAME = "grakn";
    private final Redisq<Task> redisq;
    private final RedisTaskStorage taskStorage;

    public RedisTaskManager(EngineID engineID, GraknEngineConfig graknEngineConfig, Pool<Jedis> pool, EngineGraknTxFactory engineGraknTxFactory, LockProvider lockProvider, MetricRegistry metricRegistry) {
        this(engineID, graknEngineConfig, pool, 32, engineGraknTxFactory, lockProvider, metricRegistry);
    }

    public RedisTaskManager(EngineID engineID, GraknEngineConfig graknEngineConfig, Pool<Jedis> pool, int i, EngineGraknTxFactory engineGraknTxFactory, LockProvider lockProvider, MetricRegistry metricRegistry) {
        RedisTaskQueueConsumer redisTaskQueueConsumer = new RedisTaskQueueConsumer(this, engineID, graknEngineConfig, RedisCountStorage.create(pool, metricRegistry), metricRegistry, engineGraknTxFactory, lockProvider);
        LOG.info("Running queue consumer with {} execution threads", Integer.valueOf(i));
        this.redisq = new RedisqBuilder().setJedisPool(pool).setName(QUEUE_NAME).setConsumer(redisTaskQueueConsumer).setMetricRegistry(metricRegistry).setThreadPool(Executors.newFixedThreadPool(i, new ThreadFactoryBuilder().setNameFormat("redisq-task-manager-%d").build())).setDocumentClass(Task.class).createRedisq();
        this.taskStorage = RedisTaskStorage.create(this.redisq, metricRegistry);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        LOG.info("Closing task manager");
        try {
            this.redisq.close();
        } catch (InterruptedException e) {
            LOG.error("Interrupted while closing queue", e);
        }
    }

    @Override // ai.grakn.engine.tasks.manager.TaskManager
    public CompletableFuture<Void> start() {
        Redisq<Task> redisq = this.redisq;
        redisq.getClass();
        return CompletableFuture.runAsync(redisq::startConsumer).exceptionally(th -> {
            close();
            throw new RuntimeException("Failed to initialise subscription");
        });
    }

    @Override // ai.grakn.engine.tasks.manager.TaskManager
    public void stopTask(TaskId taskId) {
    }

    @Override // ai.grakn.engine.tasks.manager.TaskManager
    public RedisTaskStorage storage() {
        return this.taskStorage;
    }

    @Override // ai.grakn.engine.tasks.manager.TaskSubmitter
    public void addTask(TaskState taskState, TaskConfiguration taskConfiguration) {
        this.redisq.push(Task.builder().setTaskConfiguration(taskConfiguration).setTaskState(taskState).build());
    }

    @Override // ai.grakn.engine.tasks.manager.TaskSubmitter
    public void runTask(TaskState taskState, TaskConfiguration taskConfiguration) {
        try {
            this.redisq.pushAndWait(Task.builder().setTaskConfiguration(taskConfiguration).setTaskState(taskState).build(), 5L, TimeUnit.MINUTES);
        } catch (WaitException e) {
            throw new RuntimeException("Could not run task", e);
        }
    }

    public Future<Void> subscribeToTask(TaskId taskId) throws StateFutureInitializationException, ExecutionException, InterruptedException {
        return this.redisq.getFutureForDocumentStateWait(ImmutableSet.of(State.DONE, State.FAILED), taskId.getValue());
    }

    public void waitForTask(TaskId taskId, long j, TimeUnit timeUnit) throws StateFutureInitializationException, ExecutionException, InterruptedException, TimeoutException {
        this.redisq.getFutureForDocumentStateWait(ImmutableSet.of(State.DONE, State.FAILED), taskId.getValue()).get(j, timeUnit);
    }

    public Redisq getQueue() {
        return this.redisq;
    }
}
