package ai.grakn.engine.task.postprocessing;

import ai.grakn.GraknConfigKey;
import ai.grakn.GraknTxType;
import ai.grakn.Keyspace;
import ai.grakn.concept.ConceptId;
import ai.grakn.engine.GraknConfig;
import ai.grakn.engine.KeyspaceStore;
import ai.grakn.engine.factory.EngineGraknTxFactory;
import ai.grakn.engine.task.BackgroundTask;
import ai.grakn.kb.internal.EmbeddedGraknTx;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ai/grakn/engine/task/postprocessing/PostProcessingTask.class */
public class PostProcessingTask implements BackgroundTask {
    private static final Logger LOG = LoggerFactory.getLogger(PostProcessingTask.class);
    private final EngineGraknTxFactory factory;
    private final IndexPostProcessor indexPostProcessor;
    private final ScheduledExecutorService threadPool;
    private final int postProcessingMaxJobs;
    private final int postprocessingDelay;

    public PostProcessingTask(EngineGraknTxFactory engineGraknTxFactory, IndexPostProcessor indexPostProcessor, GraknConfig graknConfig) {
        this.factory = engineGraknTxFactory;
        this.indexPostProcessor = indexPostProcessor;
        this.postProcessingMaxJobs = ((Integer) graknConfig.getProperty(GraknConfigKey.POST_PROCESSOR_POOL_SIZE)).intValue();
        this.threadPool = Executors.newScheduledThreadPool(this.postProcessingMaxJobs);
        this.postprocessingDelay = ((Integer) graknConfig.getProperty(GraknConfigKey.POST_PROCESSOR_DELAY)).intValue();
    }

    @Override // ai.grakn.engine.task.BackgroundTask
    public void run() {
        UUID randomUUID = UUID.randomUUID();
        LOG.info("starting post-processing task with ID '" + randomUUID + "' ... ");
        KeyspaceStore keyspaceStore = this.factory.keyspaceStore();
        if (keyspaceStore == null) {
            LOG.info("post-processing '" + randomUUID + "': waiting for system keyspace to be ready.");
            return;
        }
        Set keyspaces = keyspaceStore.keyspaces();
        LOG.info("post-processing '" + randomUUID + "': attempting to process the following keyspaces: [" + ((String) keyspaces.stream().map((v0) -> {
            return v0.getValue();
        }).collect(Collectors.joining(", "))) + "]");
        keyspaces.forEach(keyspace -> {
            runPostProcessing(randomUUID, keyspace);
        });
        LOG.info("post-processing task with ID '" + randomUUID + "' finished.");
    }

    private void runPostProcessing(UUID uuid, Keyspace keyspace) {
        int i = 0;
        do {
            String popIndex = this.indexPostProcessor.popIndex(keyspace);
            LOG.info("post-processing '" + uuid + "': working on keyspace '" + keyspace.getValue() + "'. The index to be post-processed is '" + popIndex + "'");
            if (popIndex != null) {
                this.threadPool.schedule(() -> {
                    processIndex(keyspace, popIndex, uuid);
                }, this.postprocessingDelay, TimeUnit.SECONDS);
            }
            i++;
            if (popIndex == null) {
                return;
            }
        } while (i < this.postProcessingMaxJobs);
    }

    private void processIndex(Keyspace keyspace, String str, UUID uuid) {
        Set<ConceptId> popIds = this.indexPostProcessor.popIds(keyspace, str);
        if (popIds.isEmpty()) {
            LOG.info("post-processing '" + uuid + "': there " + popIds.size() + " concept ids to post-process.");
            return;
        }
        LOG.info("post-processing '" + uuid + "': processing " + popIds.size() + " concept ids...");
        try {
            EmbeddedGraknTx<?> tx = this.factory.tx(keyspace, GraknTxType.WRITE);
            Throwable th = null;
            try {
                try {
                    this.indexPostProcessor.mergeDuplicateConcepts(tx, str, popIds);
                    tx.commit();
                    if (tx != null) {
                        if (0 != 0) {
                            try {
                                tx.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            tx.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (RuntimeException e) {
            LOG.error(String.format("post-processing '" + uuid + "': Error during post processing index {%s} with ids {%s}", str, (String) popIds.stream().map((v0) -> {
                return v0.getValue();
            }).collect(Collectors.joining(","))), e);
        }
    }

    @Override // ai.grakn.engine.task.BackgroundTask, java.lang.AutoCloseable
    public void close() {
        LOG.info("post-processing is shutting down.");
        this.threadPool.shutdown();
    }
}
