package ai.grakn.engine.backgroundtasks.distributed;

import ai.grakn.engine.TaskStatus;
import ai.grakn.engine.backgroundtasks.TaskState;
import ai.grakn.engine.backgroundtasks.TaskStateStorage;
import ai.grakn.engine.backgroundtasks.config.ConfigHelper;
import ai.grakn.engine.backgroundtasks.config.KafkaTerms;
import ai.grakn.engine.util.ExceptionWrapper;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ai/grakn/engine/backgroundtasks/distributed/Scheduler.class */
public class Scheduler implements Runnable, AutoCloseable {
    private static final String STATUS_MESSAGE = "Topic [%s], partition [%s] received [%s] records, next offset is [%s]";
    private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class);
    private final TaskStateStorage storage;
    private KafkaConsumer<String, String> consumer;
    private KafkaProducer<String, String> producer;
    private ScheduledExecutorService schedulingService;
    private CountDownLatch waitToClose;
    private final AtomicBoolean OPENED = new AtomicBoolean(false);
    private volatile boolean running = false;

    /* loaded from: input_file:ai/grakn/engine/backgroundtasks/distributed/Scheduler$HandleRebalance.class */
    private class HandleRebalance implements ConsumerRebalanceListener {
        private HandleRebalance() {
        }

        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            Scheduler.LOG.debug("Scheduler partitions assigned " + collection);
        }

        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            Scheduler.this.consumer.commitSync();
            Scheduler.LOG.debug("Scheduler partitions revoked " + collection);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:ai/grakn/engine/backgroundtasks/distributed/Scheduler$KafkaLoggingCallback.class */
    public static class KafkaLoggingCallback implements Callback {
        private KafkaLoggingCallback() {
        }

        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            if (exc != null) {
                Scheduler.LOG.debug(ExceptionUtils.getFullStackTrace(exc));
            }
        }
    }

    public Scheduler(TaskStateStorage taskStateStorage) {
        this.storage = taskStateStorage;
        if (!this.OPENED.compareAndSet(false, true)) {
            LOG.error("Scheduled already opened!");
            return;
        }
        this.consumer = ConfigHelper.kafkaConsumer(KafkaTerms.SCHEDULERS_GROUP);
        this.consumer.subscribe(Collections.singletonList(KafkaTerms.NEW_TASKS_TOPIC), new HandleRebalance());
        this.producer = ConfigHelper.kafkaProducer();
        this.waitToClose = new CountDownLatch(1);
        this.schedulingService = Executors.newScheduledThreadPool(1);
        LOG.debug("Scheduler started");
    }

    @Override // java.lang.Runnable
    public void run() {
        this.running = true;
        restartRecurringTasks();
        while (this.running) {
            try {
                try {
                    ConsumerRecords<String, String> poll = this.consumer.poll(1000L);
                    printConsumerStatus(poll);
                    Iterator it = poll.iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        TaskState deserialize = TaskState.deserialize((String) consumerRecord.value());
                        this.storage.newState(deserialize);
                        scheduleTask(deserialize);
                        this.consumer.seek(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset() + 1);
                    }
                } catch (WakeupException e) {
                    LOG.debug("Shutting down scheduler consumer");
                    KafkaConsumer<String, String> kafkaConsumer = this.consumer;
                    kafkaConsumer.getClass();
                    ExceptionWrapper.noThrow(kafkaConsumer::commitSync, "Exception syncing commits while closing in Scheduler");
                    KafkaConsumer<String, String> kafkaConsumer2 = this.consumer;
                    kafkaConsumer2.getClass();
                    ExceptionWrapper.noThrow(kafkaConsumer2::close, "Exception while closing consumer in Scheduler");
                    CountDownLatch countDownLatch = this.waitToClose;
                    countDownLatch.getClass();
                    ExceptionWrapper.noThrow(countDownLatch::countDown, "Exception while counting down close latch in Scheduler");
                    return;
                } catch (Throwable th) {
                    LOG.error("Error in scheduler poll " + ExceptionUtils.getFullStackTrace(th));
                    KafkaConsumer<String, String> kafkaConsumer3 = this.consumer;
                    kafkaConsumer3.getClass();
                    ExceptionWrapper.noThrow(kafkaConsumer3::commitSync, "Exception syncing commits while closing in Scheduler");
                    KafkaConsumer<String, String> kafkaConsumer4 = this.consumer;
                    kafkaConsumer4.getClass();
                    ExceptionWrapper.noThrow(kafkaConsumer4::close, "Exception while closing consumer in Scheduler");
                    CountDownLatch countDownLatch2 = this.waitToClose;
                    countDownLatch2.getClass();
                    ExceptionWrapper.noThrow(countDownLatch2::countDown, "Exception while counting down close latch in Scheduler");
                    return;
                }
            } catch (Throwable th2) {
                KafkaConsumer<String, String> kafkaConsumer5 = this.consumer;
                kafkaConsumer5.getClass();
                ExceptionWrapper.noThrow(kafkaConsumer5::commitSync, "Exception syncing commits while closing in Scheduler");
                KafkaConsumer<String, String> kafkaConsumer6 = this.consumer;
                kafkaConsumer6.getClass();
                ExceptionWrapper.noThrow(kafkaConsumer6::close, "Exception while closing consumer in Scheduler");
                CountDownLatch countDownLatch3 = this.waitToClose;
                countDownLatch3.getClass();
                ExceptionWrapper.noThrow(countDownLatch3::countDown, "Exception while counting down close latch in Scheduler");
                throw th2;
            }
        }
        KafkaConsumer<String, String> kafkaConsumer7 = this.consumer;
        kafkaConsumer7.getClass();
        ExceptionWrapper.noThrow(kafkaConsumer7::commitSync, "Exception syncing commits while closing in Scheduler");
        KafkaConsumer<String, String> kafkaConsumer8 = this.consumer;
        kafkaConsumer8.getClass();
        ExceptionWrapper.noThrow(kafkaConsumer8::close, "Exception while closing consumer in Scheduler");
        CountDownLatch countDownLatch4 = this.waitToClose;
        countDownLatch4.getClass();
        ExceptionWrapper.noThrow(countDownLatch4::countDown, "Exception while counting down close latch in Scheduler");
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (!this.OPENED.compareAndSet(true, false)) {
            LOG.error("Scheduler open() must be called before close()!");
            return;
        }
        this.running = false;
        KafkaConsumer<String, String> kafkaConsumer = this.consumer;
        kafkaConsumer.getClass();
        ExceptionWrapper.noThrow(kafkaConsumer::wakeup, "Could not wake up scheduler thread.");
        CountDownLatch countDownLatch = this.waitToClose;
        countDownLatch.getClass();
        ExceptionWrapper.noThrow(countDownLatch::await, "Error waiting for TaskRunner consumer to exit");
        ScheduledExecutorService scheduledExecutorService = this.schedulingService;
        scheduledExecutorService.getClass();
        ExceptionWrapper.noThrow(scheduledExecutorService::shutdownNow, "Could not shutdown scheduling service.");
        KafkaProducer<String, String> kafkaProducer = this.producer;
        kafkaProducer.getClass();
        ExceptionWrapper.noThrow(kafkaProducer::flush, "Could not flush Kafka producer in scheduler.");
        KafkaProducer<String, String> kafkaProducer2 = this.producer;
        kafkaProducer2.getClass();
        ExceptionWrapper.noThrow(kafkaProducer2::close, "Could not close Kafka producer in scheduler.");
        LOG.debug("Scheduler stopped.");
    }

    private void scheduleTask(TaskState taskState) {
        long millis = Duration.between(Instant.now(), taskState.runAt()).toMillis();
        markAsScheduled(taskState);
        if (taskState.isRecurring().booleanValue()) {
            this.schedulingService.scheduleAtFixedRate(() -> {
                markAsScheduled(taskState);
                sendToWorkQueue(taskState);
            }, millis, taskState.interval(), TimeUnit.MILLISECONDS);
        } else {
            this.schedulingService.schedule(() -> {
                sendToWorkQueue(taskState);
            }, millis, TimeUnit.MILLISECONDS);
        }
    }

    private void markAsScheduled(TaskState taskState) {
        LOG.debug("Marking " + taskState.getId() + " as scheduled");
        this.storage.updateState(taskState.status(TaskStatus.SCHEDULED));
    }

    private void sendToWorkQueue(TaskState taskState) {
        LOG.debug("Sending to work queue " + taskState.getId());
        this.producer.send(new ProducerRecord(KafkaTerms.WORK_QUEUE_TOPIC, taskState.getId(), TaskState.serialize(taskState)), new KafkaLoggingCallback());
        this.producer.flush();
    }

    private void restartRecurringTasks() {
        this.storage.getTasks(null, null, null, 0, 0).stream().filter((v0) -> {
            return v0.isRecurring();
        }).filter(taskState -> {
            return taskState.status() != TaskStatus.STOPPED;
        }).forEach(this::scheduleTask);
    }

    private void printConsumerStatus(ConsumerRecords<String, String> consumerRecords) {
        Stream map = this.consumer.assignment().stream().map(topicPartition -> {
            return String.format(STATUS_MESSAGE, Integer.valueOf(topicPartition.partition()), topicPartition.topic(), Integer.valueOf(consumerRecords.count()), Long.valueOf(this.consumer.position(topicPartition)));
        });
        Logger logger = LOG;
        logger.getClass();
        map.forEach(logger::debug);
    }
}
