package ai.grakn.engine.backgroundtasks.distributed;

import ai.grakn.engine.backgroundtasks.TaskState;
import ai.grakn.engine.backgroundtasks.TaskStatus;
import ai.grakn.engine.backgroundtasks.config.ConfigHelper;
import ai.grakn.engine.backgroundtasks.config.KafkaTerms;
import ai.grakn.engine.backgroundtasks.taskstorage.GraknStateStorage;
import ai.grakn.engine.backgroundtasks.taskstorage.SynchronizedStateStorage;
import ai.grakn.engine.util.ConfigProperties;
import ai.grakn.engine.util.ExceptionWrapper;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javafx.util.Pair;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;

/* loaded from: input_file:ai/grakn/engine/backgroundtasks/distributed/Scheduler.class */
public class Scheduler implements Runnable, AutoCloseable {
    private static final ConfigProperties properties = ConfigProperties.getInstance();
    private GraknStateStorage stateStorage;
    private SynchronizedStateStorage zkStorage;
    private KafkaConsumer<String, String> consumer;
    private KafkaProducer<String, String> producer;
    private ScheduledExecutorService schedulingService;
    private CountDownLatch waitToClose;
    private final KafkaLogger LOG = KafkaLogger.getInstance();
    private final AtomicBoolean OPENED = new AtomicBoolean(false);
    private boolean initialised = false;
    private volatile boolean running = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:ai/grakn/engine/backgroundtasks/distributed/Scheduler$KafkaLoggingCallback.class */
    public class KafkaLoggingCallback implements Callback {
        private KafkaLoggingCallback() {
        }

        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            if (exc != null) {
                Scheduler.this.LOG.debug(ExceptionUtils.getFullStackTrace(exc));
            }
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        this.running = true;
        restartRecurringTasks();
        while (this.running) {
            try {
                try {
                    printInitialization();
                    this.LOG.debug("Scheduler polling, size of new tasks " + this.consumer.endOffsets((Collection) this.consumer.partitionsFor(KafkaTerms.NEW_TASKS_TOPIC).stream().map(partitionInfo -> {
                        return new TopicPartition(KafkaTerms.NEW_TASKS_TOPIC, partitionInfo.partition());
                    }).collect(Collectors.toSet())));
                    Iterator it = this.consumer.poll(properties.getPropertyAsInt(ConfigProperties.SCHEDULER_POLLING_FREQ)).iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        this.LOG.debug(String.format("Scheduler received topic = %s, partition = %s, offset = %s, taskid = %s, value = %s\n", consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), consumerRecord.key(), consumerRecord.value()));
                        scheduleTask((String) consumerRecord.key(), (String) consumerRecord.value());
                        this.LOG.debug("Scheduler acknowledging " + ((String) consumerRecord.key()) + " OFFSET " + (consumerRecord.offset() + 1) + " topic " + consumerRecord.topic());
                        this.consumer.seek(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset() + 1);
                    }
                } catch (WakeupException e) {
                    this.LOG.debug("Shutting down scheduler consumer");
                    this.consumer.commitSync();
                    this.consumer.close();
                    this.waitToClose.countDown();
                    return;
                }
            } finally {
                this.consumer.commitSync();
                this.consumer.close();
                this.waitToClose.countDown();
            }
        }
    }

    public Scheduler open() throws Exception {
        if (this.OPENED.compareAndSet(false, true)) {
            this.stateStorage = new GraknStateStorage();
            this.consumer = ConfigHelper.kafkaConsumer(KafkaTerms.SCHEDULERS_GROUP);
            this.consumer.subscribe(Collections.singletonList(KafkaTerms.NEW_TASKS_TOPIC), new RebalanceListener(this.consumer));
            this.producer = ConfigHelper.kafkaProducer();
            this.zkStorage = SynchronizedStateStorage.getInstance();
            this.waitToClose = new CountDownLatch(1);
            this.schedulingService = Executors.newScheduledThreadPool(1);
            this.LOG.debug("Scheduler started");
        } else {
            this.LOG.error("Scheduled already opened!");
        }
        return this;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (!this.OPENED.compareAndSet(true, false)) {
            this.LOG.error("Scheduler open() must be called before close()!");
            return;
        }
        this.running = false;
        KafkaConsumer<String, String> kafkaConsumer = this.consumer;
        kafkaConsumer.getClass();
        ExceptionWrapper.noThrow(kafkaConsumer::wakeup, "Could not wake up scheduler thread.");
        try {
            this.waitToClose.await(5 * properties.getPropertyAsLong(ConfigProperties.SCHEDULER_POLLING_FREQ), TimeUnit.MILLISECONDS);
        } catch (Throwable th) {
            this.LOG.error("Exception whilst waiting for scheduler run() thread to finish - " + ExceptionUtils.getFullStackTrace(th));
        }
        ScheduledExecutorService scheduledExecutorService = this.schedulingService;
        scheduledExecutorService.getClass();
        ExceptionWrapper.noThrow(scheduledExecutorService::shutdown, "Could not shutdown scheduling service.");
        KafkaProducer<String, String> kafkaProducer = this.producer;
        kafkaProducer.getClass();
        ExceptionWrapper.noThrow(kafkaProducer::flush, "Could not flush Kafka producer in scheduler.");
        KafkaProducer<String, String> kafkaProducer2 = this.producer;
        kafkaProducer2.getClass();
        ExceptionWrapper.noThrow(kafkaProducer2::close, "Could not close Kafka producer in scheduler.");
        this.stateStorage = null;
        this.zkStorage = null;
        ExceptionWrapper.noThrow(() -> {
            this.LOG.debug("Scheduler stopped.");
        }, "Kafka logging error.");
    }

    private void scheduleTask(String str, String str2) {
        scheduleTask(str, str2, this.stateStorage.getState(str));
    }

    private void scheduleTask(String str, String str2, TaskState taskState) {
        long time = taskState.runAt().getTime() - new Date().getTime();
        markAsScheduled(str);
        if (taskState.isRecurring().booleanValue()) {
            this.LOG.debug("Scheduling recurring " + str);
            this.schedulingService.scheduleAtFixedRate(() -> {
                markAsScheduled(str);
                sendToWorkQueue(str, str2);
            }, time, taskState.interval(), TimeUnit.MILLISECONDS);
        } else {
            this.LOG.debug("Scheduling once " + str + " @ " + time);
            this.schedulingService.schedule(() -> {
                sendToWorkQueue(str, str2);
            }, time, TimeUnit.MILLISECONDS);
        }
    }

    private void markAsScheduled(String str) {
        this.LOG.debug("Marking " + str + " as scheduled");
        this.zkStorage.updateState(str, TaskStatus.SCHEDULED, null, null);
        this.stateStorage.updateState(str, TaskStatus.SCHEDULED, getClass().getName(), null, null, null, null);
    }

    private void sendToWorkQueue(String str, String str2) {
        this.LOG.debug("Sending to work queue " + str);
        this.producer.send(new ProducerRecord(KafkaTerms.WORK_QUEUE_TOPIC, str, str2), new KafkaLoggingCallback());
        this.producer.flush();
    }

    private void restartRecurringTasks() {
        Set<Pair<String, TaskState>> tasks = this.stateStorage.getTasks(null, null, null, 0, 0, true);
        tasks.stream().filter(pair -> {
            return ((TaskState) pair.getValue()).status() != TaskStatus.STOPPED;
        }).forEach(pair2 -> {
            scheduleTask((String) pair2.getKey(), ((TaskState) pair2.getValue()).configuration() == null ? "{}" : ((TaskState) pair2.getValue()).configuration().toString(), (TaskState) pair2.getValue());
        });
        this.LOG.debug("Scheduler restarted " + tasks.size() + " recurring tasks");
    }

    private void printInitialization() {
        if (this.initialised) {
            return;
        }
        this.initialised = true;
        this.LOG.info("Scheduler initialised");
    }
}
