package ai.grakn.engine.tasks.manager.singlequeue;

import ai.grakn.engine.GraknEngineConfig;
import ai.grakn.engine.TaskId;
import ai.grakn.engine.lock.LockProvider;
import ai.grakn.engine.lock.ZookeeperLock;
import ai.grakn.engine.tasks.ExternalOffsetStorage;
import ai.grakn.engine.tasks.TaskConfiguration;
import ai.grakn.engine.tasks.TaskManager;
import ai.grakn.engine.tasks.TaskState;
import ai.grakn.engine.tasks.TaskStateStorage;
import ai.grakn.engine.tasks.config.ConfigHelper;
import ai.grakn.engine.tasks.connection.ZookeeperConnection;
import ai.grakn.engine.tasks.manager.ExternalStorageRebalancer;
import ai.grakn.engine.tasks.storage.TaskStateZookeeperStore;
import ai.grakn.engine.util.EngineID;
import ai.grakn.engine.util.ExceptionWrapper;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.nio.charset.Charset;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ai/grakn/engine/tasks/manager/singlequeue/SingleQueueTaskManager.class */
public class SingleQueueTaskManager implements TaskManager {
    private static final String TASK_RUNNER_THREAD_POOL_NAME = "task-runner-pool-%s";
    private static final int TIME_UNTIL_BACKOFF = 60000;
    private static final String TASKS_STOPPED = "/stopped/%s";
    private static final String TASKS_STOPPED_PREFIX = "/stopped";
    private final PathChildrenCache stoppedTasks;
    private Set<SingleQueueTaskRunner> taskRunners;
    private static final Logger LOG = LoggerFactory.getLogger(SingleQueueTaskManager.class);
    private static final int CAPACITY = GraknEngineConfig.getInstance().getAvailableThreads();
    private Charset zkCharset = Charsets.UTF_8;
    private final ZookeeperConnection zookeeper = new ZookeeperConnection();
    private final TaskStateStorage storage = new TaskStateZookeeperStore(this.zookeeper);
    private final ExternalOffsetStorage offsetStorage = new ExternalOffsetStorage(this.zookeeper);
    private final Producer<TaskState, TaskConfiguration> producer = ConfigHelper.kafkaProducer();
    private ExecutorService taskRunnerThreadPool = Executors.newFixedThreadPool(CAPACITY * 2, new ThreadFactoryBuilder().setNameFormat(TASK_RUNNER_THREAD_POOL_NAME).build());

    public SingleQueueTaskManager(EngineID engineID) {
        this.taskRunners = (Set) Stream.concat(((Set) Stream.generate(() -> {
            return newTaskRunner(engineID, TaskState.Priority.HIGH.queue());
        }).limit(CAPACITY).collect(Collectors.toSet())).stream(), ((Set) Stream.generate(() -> {
            return newTaskRunner(engineID, TaskState.Priority.LOW.queue());
        }).limit(CAPACITY).collect(Collectors.toSet())).stream()).collect(Collectors.toSet());
        Set<SingleQueueTaskRunner> set = this.taskRunners;
        ExecutorService executorService = this.taskRunnerThreadPool;
        executorService.getClass();
        set.forEach((v1) -> {
            r1.submit(v1);
        });
        this.stoppedTasks = new PathChildrenCache(this.zookeeper.connection(), TASKS_STOPPED_PREFIX, true);
        this.stoppedTasks.getListenable().addListener((curatorFramework, pathChildrenCacheEvent) -> {
            if (pathChildrenCacheEvent.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
                TaskId of = TaskId.of(new String(pathChildrenCacheEvent.getData().getData(), this.zkCharset));
                LOG.debug("Attempting to stop task {}", of);
                this.taskRunners.forEach(singleQueueTaskRunner -> {
                    singleQueueTaskRunner.stopTask(of);
                });
            }
        });
        try {
            this.stoppedTasks.start();
            LockProvider.instantiate((str, lock) -> {
                return new ZookeeperLock(this.zookeeper, str);
            });
            LOG.debug("TaskManager started");
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override // ai.grakn.engine.tasks.TaskManager
    public void close() {
        LOG.debug("Closing SingleQueueTaskManager");
        PathChildrenCache pathChildrenCache = this.stoppedTasks;
        pathChildrenCache.getClass();
        ExceptionWrapper.noThrow(pathChildrenCache::close, "Error closing down stop tasks listener");
        for (SingleQueueTaskRunner singleQueueTaskRunner : this.taskRunners) {
            singleQueueTaskRunner.getClass();
            ExceptionWrapper.noThrow(singleQueueTaskRunner::close, "Error shutting down TaskRunner");
        }
        Producer<TaskState, TaskConfiguration> producer = this.producer;
        producer.getClass();
        ExceptionWrapper.noThrow(producer::close, "Error shutting down producer in TaskManager");
        ExecutorService executorService = this.taskRunnerThreadPool;
        executorService.getClass();
        ExceptionWrapper.noThrow(executorService::shutdown, "Error closing task runner thread pool");
        ExceptionWrapper.noThrow(() -> {
            this.taskRunnerThreadPool.awaitTermination(1L, TimeUnit.MINUTES);
        }, "Error waiting for TaskRunner executor to shutdown.");
        ZookeeperConnection zookeeperConnection = this.zookeeper;
        zookeeperConnection.getClass();
        ExceptionWrapper.noThrow(zookeeperConnection::close, "Error waiting for zookeeper connection to close");
        LockProvider.clear();
        LOG.debug("TaskManager closed");
    }

    @Override // ai.grakn.engine.tasks.TaskManager
    public void stopTask(TaskId taskId) {
        try {
            this.zookeeper.connection().create().creatingParentsIfNeeded().forPath(String.format(TASKS_STOPPED, taskId), taskId.getValue().getBytes(this.zkCharset));
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e2) {
            throw new RuntimeException(e2);
        }
    }

    @Override // ai.grakn.engine.tasks.TaskManager
    public TaskStateStorage storage() {
        return this.storage;
    }

    private Consumer<TaskState, TaskConfiguration> newConsumer(String str) {
        Consumer<TaskState, TaskConfiguration> kafkaConsumer = ConfigHelper.kafkaConsumer("task-runners-" + str);
        kafkaConsumer.subscribe(ImmutableList.of(str), ExternalStorageRebalancer.rebalanceListener(kafkaConsumer, this.offsetStorage));
        return kafkaConsumer;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isTaskMarkedStopped(TaskId taskId) {
        return this.stoppedTasks.getCurrentData(String.format(TASKS_STOPPED, taskId)) != null;
    }

    @Override // ai.grakn.engine.tasks.TaskManager
    public void addTask(TaskState taskState, TaskConfiguration taskConfiguration) {
        this.producer.send(new ProducerRecord(taskState.priority().queue(), taskState, taskConfiguration));
        this.producer.flush();
    }

    private SingleQueueTaskRunner newTaskRunner(EngineID engineID, String str) {
        return new SingleQueueTaskRunner(this, engineID, this.offsetStorage, TIME_UNTIL_BACKOFF, newConsumer(str));
    }
}
