package ai.grakn.engine.backgroundtasks.distributed;

import ai.grakn.engine.backgroundtasks.BackgroundTask;
import ai.grakn.engine.backgroundtasks.StateStorage;
import ai.grakn.engine.backgroundtasks.TaskManager;
import ai.grakn.engine.backgroundtasks.TaskStatus;
import ai.grakn.engine.backgroundtasks.config.ConfigHelper;
import ai.grakn.engine.backgroundtasks.config.KafkaTerms;
import ai.grakn.engine.backgroundtasks.taskstorage.GraknStateStorage;
import ai.grakn.engine.backgroundtasks.taskstorage.SynchronizedStateStorage;
import ai.grakn.engine.util.EngineID;
import ai.grakn.engine.util.ExceptionWrapper;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ai/grakn/engine/backgroundtasks/distributed/DistributedTaskManager.class */
public class DistributedTaskManager implements TaskManager {
    private final Logger LOG = LoggerFactory.getLogger(DistributedTaskManager.class);
    private final AtomicBoolean OPENED = new AtomicBoolean(false);
    private static DistributedTaskManager instance = null;
    private KafkaProducer<String, String> producer;
    private StateStorage stateStorage;
    private SynchronizedStateStorage zkStorage;

    public static synchronized DistributedTaskManager getInstance() {
        if (instance == null) {
            instance = new DistributedTaskManager();
        }
        return instance;
    }

    @Override // ai.grakn.engine.backgroundtasks.TaskManager
    public DistributedTaskManager open() {
        if (this.OPENED.compareAndSet(false, true)) {
            try {
                ExceptionWrapper.noThrow(() -> {
                    this.producer = ConfigHelper.kafkaProducer();
                }, "Could not instantiate Kafka Producer");
                ExceptionWrapper.noThrow(() -> {
                    this.stateStorage = new GraknStateStorage();
                }, "Could not instantiate grakn state storage");
                this.zkStorage = SynchronizedStateStorage.getInstance();
            } catch (Exception e) {
                e.printStackTrace(System.err);
                this.LOG.error("While trying to start the DistributedTaskManager", e);
                throw new RuntimeException("Could not start task manager : " + e);
            }
        } else {
            this.LOG.error("DistributedTaskManager open() called multiple times!");
        }
        return this;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (!this.OPENED.compareAndSet(true, false)) {
            this.LOG.error("DistributedTaskManager close() called before open()!");
            return;
        }
        KafkaProducer<String, String> kafkaProducer = this.producer;
        kafkaProducer.getClass();
        ExceptionWrapper.noThrow(kafkaProducer::close, "Could not close Kafka Producer.");
        this.stateStorage = null;
        this.zkStorage = null;
    }

    @Override // ai.grakn.engine.backgroundtasks.TaskManager
    public String scheduleTask(BackgroundTask backgroundTask, String str, Date date, long j, JSONObject jSONObject) {
        String newState = this.stateStorage.newState(backgroundTask.getClass().getName(), str, date, Boolean.valueOf(j > 0), j, jSONObject);
        try {
            this.zkStorage.newState(newState, TaskStatus.CREATED, null, null);
            this.producer.send(new ProducerRecord(KafkaTerms.NEW_TASKS_TOPIC, newState, jSONObject.toString()));
            this.producer.flush();
        } catch (Exception e) {
            this.LOG.error("Could not write to ZooKeeper! - " + ExceptionUtils.getFullStackTrace(e));
            this.stateStorage.updateState(newState, TaskStatus.FAILED, getClass().getName(), EngineID.getInstance().id(), e, null, null);
            newState = null;
        }
        return newState;
    }

    @Override // ai.grakn.engine.backgroundtasks.TaskManager
    public TaskManager stopTask(String str, String str2) {
        throw new UnsupportedOperationException(getClass().getName() + " currently doesnt support stopping tasks");
    }

    @Override // ai.grakn.engine.backgroundtasks.TaskManager
    public StateStorage storage() {
        return this.stateStorage;
    }

    @Override // ai.grakn.engine.backgroundtasks.TaskManager
    public CompletableFuture completableFuture(String str) {
        return CompletableFuture.runAsync(() -> {
            while (true) {
                TaskStatus status = this.zkStorage.getState(str).status();
                if (status == TaskStatus.COMPLETED || status == TaskStatus.FAILED || status == TaskStatus.STOPPED) {
                    return;
                }
                try {
                    Thread.sleep(5000L);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }

    public TaskStatus getState(String str) {
        return this.zkStorage.getState(str).status();
    }
}
