package ai.grakn.kb.internal.computer;

import java.io.File;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationUtils;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.spark.HashPartitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.storage.StorageLevel;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.ComputerSubmissionHelper;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorage;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.GraphFilterAware;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.optimization.SparkInterceptorStrategy;
import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.optimization.SparkSingleIterationStrategy;
import org.apache.tinkerpop.gremlin.spark.structure.Spark;
import org.apache.tinkerpop.gremlin.spark.structure.io.InputFormatRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.InputOutputHelper;
import org.apache.tinkerpop.gremlin.spark.structure.io.InputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.OutputFormatRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.OutputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedInputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorage;
import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ai/grakn/kb/internal/computer/GraknSparkComputer.class */
public final class GraknSparkComputer extends AbstractHadoopGraphComputer {
    private static final Logger LOGGER;
    private final Configuration sparkConfiguration;
    private boolean workersSet;
    private final ThreadFactory threadFactoryBoss;
    private final ExecutorService computerService;
    private String jobGroupId;
    static final /* synthetic */ boolean $assertionsDisabled;

    public GraknSparkComputer(HadoopGraph hadoopGraph) {
        super(hadoopGraph);
        this.workersSet = false;
        this.threadFactoryBoss = new BasicThreadFactory.Builder().namingPattern(GraknSparkComputer.class.getSimpleName() + "-boss").build();
        this.computerService = Executors.newSingleThreadExecutor(this.threadFactoryBoss);
        this.jobGroupId = null;
        this.sparkConfiguration = new HadoopConfiguration();
        ConfigurationUtils.copy(this.hadoopGraph.configuration(), this.sparkConfiguration);
    }

    public GraphComputer workers(int i) {
        super.workers(i);
        if (this.sparkConfiguration.containsKey("spark.master") && this.sparkConfiguration.getString("spark.master").startsWith("local")) {
            this.sparkConfiguration.setProperty("spark.master", "local[" + this.workers + "]");
        }
        this.workersSet = true;
        return this;
    }

    public GraphComputer configure(String str, Object obj) {
        this.sparkConfiguration.setProperty(str, obj);
        return this;
    }

    public Future<ComputerResult> submit() {
        validateStatePriorToExecution();
        return ComputerSubmissionHelper.runWithBackgroundThread(this::submitWithExecutor, "SparkSubmitter");
    }

    public void cancelJobs() {
        if (this.jobGroupId != null) {
            Spark.getContext().cancelJobGroup(this.jobGroupId);
        }
    }

    private Future<ComputerResult> submitWithExecutor(Executor executor) {
        this.jobGroupId = Integer.toString(ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE));
        String obj = this.vertexProgram == null ? this.mapReducers.toString() : this.vertexProgram + "+" + this.mapReducers;
        this.sparkConfiguration.setProperty("gremlin.hadoop.outputLocation", this.sparkConfiguration.getString("gremlin.hadoop.outputLocation") + "/" + this.jobGroupId);
        updateConfigKeys(this.sparkConfiguration);
        Future<ComputerResult> submit = this.computerService.submit(() -> {
            boolean hasFilter;
            String str;
            long currentTimeMillis = System.currentTimeMillis();
            HadoopConfiguration hadoopConfiguration = new HadoopConfiguration(this.sparkConfiguration);
            if (!hadoopConfiguration.containsKey("spark.serializer")) {
                hadoopConfiguration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
            }
            hadoopConfiguration.setProperty("gremlin.hadoop.graphWriter.hasEdges", Boolean.valueOf(this.persist.equals(GraphComputer.Persist.EDGES)));
            org.apache.hadoop.conf.Configuration makeHadoopConfiguration = ConfUtil.makeHadoopConfiguration(hadoopConfiguration);
            FileSystemStorage open = FileSystemStorage.open(makeHadoopConfiguration);
            boolean isAssignableFrom = FileInputFormat.class.isAssignableFrom(makeHadoopConfiguration.getClass("gremlin.hadoop.graphReader", Object.class));
            boolean isAssignableFrom2 = PersistedInputRDD.class.isAssignableFrom(makeHadoopConfiguration.getClass("gremlin.hadoop.graphReader", Object.class));
            boolean isAssignableFrom3 = FileOutputFormat.class.isAssignableFrom(makeHadoopConfiguration.getClass("gremlin.hadoop.graphWriter", Object.class));
            boolean isAssignableFrom4 = PersistedOutputRDD.class.isAssignableFrom(makeHadoopConfiguration.getClass("gremlin.hadoop.graphWriter", Object.class));
            boolean z = hadoopConfiguration.getBoolean("gremlin.spark.skipPartitioner", false);
            boolean z2 = hadoopConfiguration.getBoolean("gremlin.spark.skipGraphCache", false);
            if (isAssignableFrom && null != (str = (String) Constants.getSearchGraphLocation(makeHadoopConfiguration.get("gremlin.hadoop.inputLocation"), open).orElse(null))) {
                try {
                    hadoopConfiguration.setProperty("mapreduce.input.fileinputformat.inputdir", FileSystem.get(makeHadoopConfiguration).getFileStatus(new Path(str)).getPath().toString());
                    makeHadoopConfiguration.set("mapreduce.input.fileinputformat.inputdir", FileSystem.get(makeHadoopConfiguration).getFileStatus(new Path(str)).getPath().toString());
                } catch (IOException e) {
                    throw new IllegalStateException(e.getMessage(), e);
                }
            }
            try {
                InputRDD inputRDD = InputRDD.class.isAssignableFrom(makeHadoopConfiguration.getClass("gremlin.hadoop.graphReader", Object.class)) ? (InputRDD) makeHadoopConfiguration.getClass("gremlin.hadoop.graphReader", InputRDD.class, InputRDD.class).newInstance() : (InputRDD) InputFormatRDD.class.newInstance();
                OutputRDD outputRDD = OutputRDD.class.isAssignableFrom(makeHadoopConfiguration.getClass("gremlin.hadoop.graphWriter", Object.class)) ? (OutputRDD) makeHadoopConfiguration.getClass("gremlin.hadoop.graphWriter", OutputRDD.class, OutputRDD.class).newInstance() : (OutputRDD) OutputFormatRDD.class.newInstance();
                if ((inputRDD instanceof InputFormatRDD) && GraphFilterAware.class.isAssignableFrom(makeHadoopConfiguration.getClass("gremlin.hadoop.graphReader", InputFormat.class, InputFormat.class))) {
                    GraphFilterAware.storeGraphFilter(hadoopConfiguration, makeHadoopConfiguration, this.graphFilter);
                    hasFilter = false;
                } else if (inputRDD instanceof GraphFilterAware) {
                    ((GraphFilterAware) inputRDD).setGraphFilter(this.graphFilter);
                    hasFilter = false;
                } else {
                    hasFilter = this.graphFilter.hasFilter();
                }
                JavaSparkContext javaSparkContext = new JavaSparkContext(Spark.create(makeHadoopConfiguration));
                SparkContextStorage open2 = SparkContextStorage.open();
                javaSparkContext.setJobGroup(this.jobGroupId, obj);
                Memory memory = null;
                String str2 = makeHadoopConfiguration.get("gremlin.hadoop.outputLocation", (String) null);
                if (null != str2) {
                    if (isAssignableFrom3 && open.exists(str2)) {
                        open.rm(str2);
                    }
                    if (isAssignableFrom4 && open2.exists(str2)) {
                        open2.rm(str2);
                    }
                }
                this.logger.debug("HadoopGremlin(Spark): " + ((Object) (null == this.vertexProgram ? "No VertexProgram" : this.vertexProgram)) + "[" + this.mapReducers + "]");
                loadJars(makeHadoopConfiguration, new Object[]{javaSparkContext});
                updateLocalConfiguration(javaSparkContext, makeHadoopConfiguration);
                boolean z3 = false;
                JavaPairRDD<Object, VertexWritable> readGraphRDD = inputRDD.readGraphRDD(hadoopConfiguration, javaSparkContext);
                if (hasFilter) {
                    this.logger.debug("Filtering the loaded graphRDD: " + this.graphFilter);
                    readGraphRDD = GraknSparkExecutor.applyGraphFilter(readGraphRDD, this.graphFilter);
                }
                if (readGraphRDD.partitioner().isPresent()) {
                    this.logger.debug("Using the existing partitioner associated with the loaded graphRDD: " + readGraphRDD.partitioner().get());
                } else if (z) {
                    if (!$assertionsDisabled) {
                        if (z != (!readGraphRDD.partitioner().isPresent())) {
                            throw new AssertionError();
                        }
                    }
                    this.logger.debug("Partitioning has been skipped for the loaded graphRDD via gremlin.spark.skipPartitioner");
                } else {
                    HashPartitioner hashPartitioner = new HashPartitioner(this.workersSet ? this.workers : readGraphRDD.partitions().size());
                    this.logger.debug("Partitioning the loaded graphRDD: " + hashPartitioner);
                    readGraphRDD = readGraphRDD.partitionBy(hashPartitioner);
                    z3 = true;
                    if (!$assertionsDisabled && !readGraphRDD.partitioner().isPresent()) {
                        throw new AssertionError();
                    }
                }
                if (this.workersSet) {
                    if (readGraphRDD.partitions().size() > this.workers) {
                        readGraphRDD = readGraphRDD.coalesce(this.workers);
                    } else if (readGraphRDD.partitions().size() < this.workers) {
                        readGraphRDD = readGraphRDD.repartition(this.workers);
                    }
                }
                if (!z2 && (!isAssignableFrom2 || z3 || hasFilter)) {
                    readGraphRDD = readGraphRDD.persist(StorageLevel.fromString(makeHadoopConfiguration.get("gremlin.spark.graphStorageLevel", "MEMORY_ONLY")));
                }
                JavaPairRDD<Object, VertexWritable> javaPairRDD = null;
                try {
                    if (null != this.vertexProgram) {
                        memory = new GraknSparkMemory(this.vertexProgram, this.mapReducers, javaSparkContext);
                        if (!hadoopConfiguration.containsKey("gremlin.hadoop.vertexProgramInterceptor")) {
                            HadoopConfiguration hadoopConfiguration2 = new HadoopConfiguration();
                            this.vertexProgram.storeState(hadoopConfiguration2);
                            this.vertexProgram.setup(memory);
                            JavaPairRDD javaPairRDD2 = null;
                            memory.broadcastMemory(javaSparkContext);
                            while (!Thread.interrupted()) {
                                memory.setInExecute(true);
                                javaPairRDD2 = GraknSparkExecutor.executeVertexProgramIteration(readGraphRDD, javaPairRDD2, memory, hadoopConfiguration, hadoopConfiguration2);
                                memory.setInExecute(false);
                                if (!this.vertexProgram.terminate(memory)) {
                                    memory.incrIteration();
                                    memory.broadcastMemory(javaSparkContext);
                                } else if ((null != outputRDD && !this.persist.equals(GraphComputer.Persist.NOTHING)) || !this.mapReducers.isEmpty()) {
                                    javaPairRDD = GraknSparkExecutor.prepareFinalGraphRDD(readGraphRDD, javaPairRDD2, this.vertexProgram.getVertexComputeKeys());
                                    if (!$assertionsDisabled && (null == javaPairRDD || javaPairRDD == readGraphRDD)) {
                                        throw new AssertionError();
                                    }
                                } else if (!$assertionsDisabled && 0 != 0) {
                                    throw new AssertionError();
                                }
                            }
                            javaSparkContext.cancelAllJobs();
                            throw new TraversalInterruptedException();
                        }
                        try {
                            javaPairRDD = (JavaPairRDD) ((GraknSparkVertexProgramInterceptor) Class.forName(hadoopConfiguration.getString("gremlin.hadoop.vertexProgramInterceptor")).newInstance()).apply(this.vertexProgram, readGraphRDD, memory);
                        } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e2) {
                            throw new IllegalStateException(e2.getMessage());
                        }
                        memory.complete();
                        if (null != outputRDD && !this.persist.equals(GraphComputer.Persist.NOTHING)) {
                            if (!$assertionsDisabled && null == javaPairRDD) {
                                throw new AssertionError();
                            }
                            outputRDD.writeGraphRDD(hadoopConfiguration, javaPairRDD);
                        }
                    }
                    boolean z4 = (javaPairRDD == null || javaPairRDD == readGraphRDD) ? false : true;
                    if (!z4) {
                        javaPairRDD = readGraphRDD;
                    }
                    MapMemory mapMemory = null == memory ? new MapMemory() : new MapMemory(memory);
                    if (!this.mapReducers.isEmpty()) {
                        JavaPairRDD<Object, VertexWritable> javaPairRDD3 = javaPairRDD;
                        if (z4 && !isAssignableFrom4) {
                            javaPairRDD3 = javaPairRDD.mapValues(vertexWritable -> {
                                vertexWritable.get().dropEdges(Direction.BOTH);
                                return vertexWritable;
                            });
                            if (this.mapReducers.size() > 1) {
                                javaPairRDD3 = javaPairRDD3.persist(StorageLevel.fromString(makeHadoopConfiguration.get("gremlin.spark.graphStorageLevel", "MEMORY_ONLY")));
                            }
                        }
                        for (MapReduce mapReduce : this.mapReducers) {
                            HadoopConfiguration hadoopConfiguration3 = new HadoopConfiguration(hadoopConfiguration);
                            mapReduce.storeState(hadoopConfiguration3);
                            JavaPairRDD executeMap = GraknSparkExecutor.executeMap(javaPairRDD3, mapReduce, hadoopConfiguration3);
                            JavaPairRDD executeCombine = mapReduce.doStage(MapReduce.Stage.COMBINE) ? GraknSparkExecutor.executeCombine(executeMap, hadoopConfiguration3) : executeMap;
                            JavaPairRDD executeReduce = mapReduce.doStage(MapReduce.Stage.REDUCE) ? GraknSparkExecutor.executeReduce(executeCombine, mapReduce, hadoopConfiguration3) : executeCombine;
                            if (null != outputRDD) {
                                mapReduce.addResultToMemory(mapMemory, outputRDD.writeMemoryRDD(hadoopConfiguration, mapReduce.getMemoryKey(), executeReduce));
                            }
                        }
                        if (z4 && !isAssignableFrom4) {
                            if (!$assertionsDisabled && readGraphRDD == javaPairRDD) {
                                throw new AssertionError();
                            }
                            if (!$assertionsDisabled && javaPairRDD3 == javaPairRDD) {
                                throw new AssertionError();
                            }
                            javaPairRDD3.unpersist();
                        } else if (!$assertionsDisabled && javaPairRDD3 != javaPairRDD) {
                            throw new AssertionError();
                        }
                    }
                    if (!isAssignableFrom2 || z3 || hasFilter) {
                        readGraphRDD.unpersist();
                    }
                    if ((!isAssignableFrom4 || this.persist.equals(GraphComputer.Persist.NOTHING)) && z4) {
                        javaPairRDD.unpersist();
                    }
                    if (null != str2 && this.persist.equals(GraphComputer.Persist.NOTHING)) {
                        if (isAssignableFrom3) {
                            open.rm(str2);
                        }
                        if (isAssignableFrom4) {
                            open2.rm(str2);
                        }
                    }
                    mapMemory.setRuntime(System.currentTimeMillis() - currentTimeMillis);
                    hadoopConfiguration.clearProperty("gremlin.hadoop.graphFilter");
                    hadoopConfiguration.clearProperty("gremlin.hadoop.vertexProgramInterceptor");
                    hadoopConfiguration.clearProperty("gremlin.spark.skipGraphCache");
                    hadoopConfiguration.clearProperty("gremlin.spark.skipPartitioner");
                    return new DefaultComputerResult(InputOutputHelper.getOutputGraph(hadoopConfiguration, this.resultGraph, this.persist), mapMemory.asImmutable());
                } catch (Exception e3) {
                    throw new RuntimeException(e3);
                }
            } catch (IllegalAccessException | InstantiationException e4) {
                throw new IllegalStateException(e4.getMessage(), e4);
            }
        });
        this.computerService.shutdown();
        return submit;
    }

    private static void updateConfigKeys(Configuration configuration) {
        HashSet hashSet = new HashSet();
        Iterator keys = configuration.getKeys();
        hashSet.getClass();
        keys.forEachRemaining((v1) -> {
            r1.add(v1);
        });
        hashSet.forEach(str -> {
            if (str.startsWith("janusmr")) {
                configuration.setProperty("janusgraphmr" + str.substring(7), configuration.getString(str));
            }
        });
    }

    protected void loadJar(org.apache.hadoop.conf.Configuration configuration, File file, Object... objArr) {
        ((JavaSparkContext) objArr[0]).addJar(file.getAbsolutePath());
    }

    private static void updateLocalConfiguration(JavaSparkContext javaSparkContext, org.apache.hadoop.conf.Configuration configuration) {
        for (String str : new String[]{"spark.job.description", "spark.jobGroup.id", "spark.job.interruptOnCancel", "spark.scheduler.pool"}) {
            String str2 = configuration.get(str);
            if (str2 != null) {
                LOGGER.info("Setting Thread Local SparkContext Property - " + str + " : " + str2);
                javaSparkContext.setLocalProperty(str, configuration.get(str));
            }
        }
    }

    public static void main(String[] strArr) throws Exception {
        PropertiesConfiguration propertiesConfiguration = new PropertiesConfiguration(strArr[0]);
        new GraknSparkComputer(HadoopGraph.open(propertiesConfiguration)).program(VertexProgram.createVertexProgram(HadoopGraph.open(propertiesConfiguration), propertiesConfiguration)).submit().get();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1585534781:
                if (implMethodName.equals("lambda$null$dcb61ba3$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("ai/grakn/kb/internal/computer/GraknSparkComputer") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable;)Lorg/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable;")) {
                    return vertexWritable -> {
                        vertexWritable.get().dropEdges(Direction.BOTH);
                        return vertexWritable;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    static {
        $assertionsDisabled = !GraknSparkComputer.class.desiredAssertionStatus();
        LOGGER = LoggerFactory.getLogger(GraknSparkComputer.class);
        TraversalStrategies.GlobalCache.registerStrategies(GraknSparkComputer.class, TraversalStrategies.GlobalCache.getStrategies(GraphComputer.class).clone().addStrategies(new TraversalStrategy[]{SparkSingleIterationStrategy.instance(), SparkInterceptorStrategy.instance()}));
    }
}
