package ai.grakn.engine.attribute.deduplicator;

import ai.grakn.GraknConfigKey;
import ai.grakn.Keyspace;
import ai.grakn.concept.ConceptId;
import ai.grakn.engine.GraknConfig;
import ai.grakn.engine.attribute.deduplicator.queue.Attribute;
import ai.grakn.engine.attribute.deduplicator.queue.RocksDbQueue;
import ai.grakn.engine.factory.EngineGraknTxFactory;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ai/grakn/engine/attribute/deduplicator/AttributeDeduplicatorDaemon.class */
public class AttributeDeduplicatorDaemon {
    private static final int QUEUE_GET_BATCH_MAX = 1000;
    private EngineGraknTxFactory txFactory;
    private RocksDbQueue queue;
    private boolean stopDaemon = false;
    private static Logger LOG = LoggerFactory.getLogger(AttributeDeduplicatorDaemon.class);
    private static final Path queueDataDirRelative = Paths.get("queue", new String[0]);

    public AttributeDeduplicatorDaemon(GraknConfig graknConfig, EngineGraknTxFactory engineGraknTxFactory) {
        this.queue = new RocksDbQueue(Paths.get((String) graknConfig.getProperty(GraknConfigKey.DATA_DIR), new String[0]).resolve(queueDataDirRelative));
        this.txFactory = engineGraknTxFactory;
    }

    public void markForDeduplication(Keyspace keyspace, String str, ConceptId conceptId) {
        Attribute create = Attribute.create(keyspace, str, conceptId);
        LOG.trace("insert(" + create + ")");
        this.queue.insert(create);
    }

    public CompletableFuture<Void> startDeduplicationDaemon() {
        this.stopDaemon = false;
        CompletableFuture<Void> supplyAsync = CompletableFuture.supplyAsync(() -> {
            LOG.info("startDeduplicationDaemon() - attribute de-duplicator daemon started.");
            while (!this.stopDaemon) {
                try {
                    List<Attribute> read = this.queue.read(1000);
                    LOG.trace("starting a new batch to process these new attributes: " + read);
                    Iterator it = ((Set) read.stream().map(attribute -> {
                        return KeyspaceIndexPair.create(attribute.keyspace(), attribute.index());
                    }).collect(Collectors.toSet())).iterator();
                    while (it.hasNext()) {
                        AttributeDeduplicator.deduplicate(this.txFactory, (KeyspaceIndexPair) it.next());
                    }
                    LOG.trace("new attributes processed.");
                    this.queue.ack(read);
                } catch (InterruptedException | RuntimeException e) {
                    LOG.error("An exception has occurred in the attribute de-duplicator daemon. ", e);
                }
            }
            LOG.info("startDeduplicationDaemon() - attribute de-duplicator daemon stopped");
            return null;
        });
        supplyAsync.exceptionally(th -> {
            LOG.error("An unhandled exception has occurred in the attribute de-duplicator daemon. ", th);
            return null;
        });
        return supplyAsync;
    }

    public void stopDeduplicationDaemon() {
        LOG.info("stopDeduplicationDaemon() - stopping the attribute de-duplicator daemon...");
        this.stopDaemon = true;
    }
}
