package ai.grakn.engine.backgroundtasks.distributed;

import ai.grakn.engine.backgroundtasks.TaskManager;
import ai.grakn.engine.backgroundtasks.TaskState;
import ai.grakn.engine.backgroundtasks.TaskStateStorage;
import ai.grakn.engine.backgroundtasks.config.ConfigHelper;
import ai.grakn.engine.backgroundtasks.config.KafkaTerms;
import ai.grakn.engine.backgroundtasks.taskstatestorage.TaskStateZookeeperStore;
import ai.grakn.engine.util.ExceptionWrapper;
import java.time.Instant;
import mjson.Json;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

/* loaded from: input_file:ai/grakn/engine/backgroundtasks/distributed/DistributedTaskManager.class */
public final class DistributedTaskManager implements TaskManager {
    private final KafkaProducer<String, String> producer;
    private final SchedulerElector elector;
    private static final String TASKRUNNER_THREAD_NAME = "taskrunner-";
    private final ZookeeperConnection connection = new ZookeeperConnection();
    private final TaskStateStorage stateStorage = new TaskStateZookeeperStore(this.connection);
    private final TaskRunner taskRunner = new TaskRunner(this.stateStorage, this.connection);
    private Thread taskRunnerThread = new Thread(this.taskRunner, TASKRUNNER_THREAD_NAME + this.taskRunner.hashCode());

    public DistributedTaskManager() {
        this.taskRunnerThread.start();
        this.elector = new SchedulerElector(this.stateStorage, this.connection);
        this.producer = ConfigHelper.kafkaProducer();
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        KafkaProducer<String, String> kafkaProducer = this.producer;
        kafkaProducer.getClass();
        ExceptionWrapper.noThrow(kafkaProducer::close, "Error shutting down producer in TaskManager");
        SchedulerElector schedulerElector = this.elector;
        schedulerElector.getClass();
        ExceptionWrapper.noThrow(schedulerElector::stop, "Error stopping Scheduler elector from TaskManager");
        TaskRunner taskRunner = this.taskRunner;
        taskRunner.getClass();
        ExceptionWrapper.noThrow(taskRunner::close, "Error shutting down TaskRunner");
        Thread thread = this.taskRunnerThread;
        thread.getClass();
        ExceptionWrapper.noThrow(thread::join, "Error waiting for TaskRunner to close");
        ZookeeperConnection zookeeperConnection = this.connection;
        zookeeperConnection.getClass();
        ExceptionWrapper.noThrow(zookeeperConnection::close, "Error waiting for zookeeper connection to close");
    }

    @Override // ai.grakn.engine.backgroundtasks.TaskManager
    public String createTask(String str, String str2, Instant instant, long j, Json json) {
        TaskState configuration = new TaskState(str).creator(str2).runAt(instant).isRecurring(Boolean.valueOf(j > 0)).interval(j).configuration(json);
        this.producer.send(new ProducerRecord(KafkaTerms.NEW_TASKS_TOPIC, configuration.getId(), TaskState.serialize(configuration)));
        this.producer.flush();
        return configuration.getId();
    }

    @Override // ai.grakn.engine.backgroundtasks.TaskManager
    public TaskManager stopTask(String str, String str2) {
        throw new UnsupportedOperationException(getClass().getName() + " currently doesn't support stopping tasks");
    }

    @Override // ai.grakn.engine.backgroundtasks.TaskManager
    public TaskStateStorage storage() {
        return this.stateStorage;
    }
}
