package ai.grakn.engine.postprocessing;

import ai.grakn.engine.util.ConfigProperties;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ai/grakn/engine/postprocessing/PostProcessing.class */
public class PostProcessing {
    private static final String CASTING_STAGE = "Scanning for duplicate castings . . .";
    private static final String RESOURCE_STAGE = "Scanning for duplicate resources . . .";
    private static PostProcessing instance = null;
    private String currentStage;
    private final AtomicBoolean isRunning = new AtomicBoolean(false);
    private final Logger LOG = LoggerFactory.getLogger(PostProcessing.class);
    private ExecutorService postpool = Executors.newFixedThreadPool(1);
    private ExecutorService statDump = Executors.newSingleThreadExecutor();
    private Cache cache = Cache.getInstance();
    private Set<Future> futures = ConcurrentHashMap.newKeySet();

    private PostProcessing() {
        this.isRunning.set(false);
    }

    public static synchronized PostProcessing getInstance() {
        if (instance == null) {
            instance = new PostProcessing();
        }
        return instance;
    }

    public void run() {
        if (this.isRunning.get()) {
            return;
        }
        this.LOG.info("Starting maintenance.");
        this.isRunning.set(true);
        this.statDump.submit(this::dumpStats);
        performTasks();
        this.futures = ConcurrentHashMap.newKeySet();
        this.isRunning.set(false);
        this.LOG.info("Maintenance completed.");
    }

    public void stop() {
        if (this.isRunning.get()) {
            this.LOG.warn("Shutting down running tasks");
            this.futures.forEach(future -> {
                future.cancel(true);
            });
            this.postpool.shutdownNow();
            this.statDump.shutdownNow();
        }
        this.isRunning.set(false);
    }

    public void reset() {
        this.isRunning.set(false);
        this.futures.clear();
        this.postpool = Executors.newFixedThreadPool(ConfigProperties.getInstance().getAvailableThreads());
        this.statDump = Executors.newSingleThreadExecutor();
    }

    private void performTasks() {
        this.currentStage = CASTING_STAGE;
        this.LOG.info(this.currentStage);
        performCastingFix();
        waitToContinue();
        this.currentStage = RESOURCE_STAGE;
        this.LOG.info(this.currentStage);
        performResourceFix();
        waitToContinue();
    }

    private void performCastingFix() {
        this.cache.getKeyspaces().parallelStream().forEach(str -> {
            try {
                for (String str : this.cache.getCastingJobs(str)) {
                    this.futures.add(this.postpool.submit(() -> {
                        ConceptFixer.checkCasting(this.cache, str, str);
                    }));
                }
            } catch (RuntimeException e) {
                this.LOG.error("Error while trying to perform post processing on graph [" + str + "]", e);
            }
        });
    }

    private void performResourceFix() {
        this.cache.getKeyspaces().parallelStream().forEach(str -> {
            try {
                this.futures.add(this.postpool.submit(() -> {
                    ConceptFixer.checkResources(this.cache, str, (Set) this.cache.getResourceJobs(str).stream().map(String::new).collect(Collectors.toSet()));
                }));
            } catch (RuntimeException e) {
                this.LOG.error("Error while trying to perform post processing on graph [" + str + "]", e);
            }
        });
    }

    private void waitToContinue() {
        Iterator<Future> it = this.futures.iterator();
        while (it.hasNext()) {
            try {
                it.next().get(4L, TimeUnit.HOURS);
            } catch (InterruptedException | ExecutionException e) {
                this.LOG.error("Error while waiting for future: ", e);
            } catch (TimeoutException e2) {
                this.LOG.warn("Timeout exception waiting for future to complete", e2);
            }
        }
        this.futures.clear();
    }

    private void dumpStats() {
        while (this.isRunning.get()) {
            this.LOG.info("--------------------Current Status of Post Processing--------------------");
            dumpStatsType("Casting");
            dumpStatsType("Resources");
            this.LOG.info("Save in Progress: " + this.cache.isSaveInProgress());
            this.LOG.info("Current Stage: " + this.currentStage);
            this.LOG.info("-------------------------------------------------------------------------");
            try {
                Thread.sleep(30000L);
            } catch (InterruptedException e) {
                this.LOG.error("Exception", e);
            }
        }
    }

    private void dumpStatsType(String str) {
        long j = 0;
        this.LOG.info(str + " Jobs:");
        for (String str2 : this.cache.getKeyspaces()) {
            long j2 = 0;
            if (str.equals("Casting")) {
                j2 = this.cache.getCastingJobs(str2).size();
            } else if (str.equals("Resources")) {
                j2 = this.cache.getCastingJobs(str2).size();
            }
            this.LOG.info("        Post processing step [" + str + " for Graph [" + str2 + "] has jobs : " + j2);
            j += j2;
        }
        this.LOG.info("    Total " + str + " Jobs: " + j);
    }
}
