package won.matcher.rescal.actor;

import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import akka.cluster.pubsub.DistributedPubSub;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Iterator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import scala.concurrent.duration.FiniteDuration;
import won.matcher.rescal.config.RescalMatcherConfig;
import won.matcher.rescal.service.HintReader;
import won.matcher.service.common.event.BulkHintEvent;
import won.matcher.service.common.event.HintEvent;
import won.matcher.utils.tensor.TensorEntry;
import won.matcher.utils.tensor.TensorEntryAllGenerator;
import won.matcher.utils.tensor.TensorEntryTokenizer;
import won.matcher.utils.tensor.TensorMatchingData;

@Scope("prototype")
@Component
/* loaded from: input_file:won/matcher/rescal/actor/RescalMatcherActor.class */
public class RescalMatcherActor extends UntypedActor {
    private LoggingAdapter log = Logging.getLogger(getContext().system(), this);
    private long lastQueryDate = Long.MIN_VALUE;
    private TensorMatchingData rescalInputData = new TensorMatchingData();
    private static final String TICK = "tick";
    private ActorRef pubSubMediator;

    @Autowired
    private HintReader hintReader;

    @Autowired
    private RescalMatcherConfig config;

    public void preStart() throws IOException {
        this.pubSubMediator = DistributedPubSub.get(getContext().system()).mediator();
        getContext().system().scheduler().schedule(FiniteDuration.Zero(), this.config.getExecutionDuration(), getSelf(), TICK, getContext().dispatcher(), (ActorRef) null);
    }

    public void onReceive(Object obj) throws Exception {
        if (obj.equals(TICK)) {
            executeRescalAlgorithm();
        } else {
            unhandled(obj);
        }
    }

    private void executeRescalAlgorithm() throws IOException, InterruptedException {
        this.log.info("start processing (every {} minutes) ...", this.config.getExecutionDuration());
        long currentTimeMillis = System.currentTimeMillis();
        this.log.info("query needs and connections from rdf store '{}' from date '{}' to date '{}'", this.config.getSparqlEndpoint(), Long.valueOf(this.lastQueryDate), Long.valueOf(currentTimeMillis));
        Iterator it = new TensorEntryTokenizer(new TensorEntryAllGenerator("queries/attribute", this.config.getSparqlEndpoint(), this.lastQueryDate, currentTimeMillis).generateTensorEntries()).generateTensorEntries().iterator();
        while (it.hasNext()) {
            this.rescalInputData.addNeedAttribute((TensorEntry) it.next());
        }
        for (TensorEntry tensorEntry : new TensorEntryAllGenerator("queries/connection", this.config.getSparqlEndpoint(), this.lastQueryDate, currentTimeMillis).generateTensorEntries()) {
            this.rescalInputData.addNeedConnection(tensorEntry.getNeedUri(), tensorEntry.getValue(), true);
        }
        this.log.info("number of needs in tensor: {}", Integer.valueOf(this.rescalInputData.getNeeds().size()));
        this.log.info("number of attributes in tensor: {}", Integer.valueOf(this.rescalInputData.getAttributes().size()));
        this.log.info("number of connections in tensor: {}", Integer.valueOf(this.rescalInputData.getNumberOfConnections()));
        this.log.info("number of slices in tensor: {}", Integer.valueOf(this.rescalInputData.getSlices().size()));
        if (!this.rescalInputData.isValidTensor()) {
            this.log.info("not enough tensor data available for execution yet, wait for next execution!");
            return;
        }
        this.log.info("write rescal input data to folder: {}", this.config.getExecutionDirectory());
        int i = this.rescalInputData.writeCleanedOutputFiles(this.config.getExecutionDirectory()).getTensorDimensions()[0];
        if (this.rescalInputData.getNeeds().size() + this.rescalInputData.getAttributes().size() < this.config.getRescalRank()) {
            this.log.info("Do not start rescal algorithm since tensor size (number of needs + number of attributes) = {} is smaller than rank parameter {}.", Integer.valueOf(i), Long.valueOf(this.config.getRescalRank()));
            return;
        }
        String str = "python " + this.config.getPythonScriptDirectory() + "/rescal-matcher.py -inputfolder " + this.config.getExecutionDirectory() + " -outputfolder " + this.config.getExecutionDirectory() + "/output -rank " + this.config.getRescalRank() + " -threshold " + this.config.getRescalThreshold();
        this.log.info("execute python script: " + str);
        Process exec = Runtime.getRuntime().exec(str);
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(exec.getInputStream()));
        while (true) {
            String readLine = bufferedReader.readLine();
            if (readLine == null) {
                break;
            } else {
                this.log.info(readLine);
            }
        }
        bufferedReader.close();
        BufferedReader bufferedReader2 = new BufferedReader(new InputStreamReader(exec.getErrorStream()));
        while (true) {
            String readLine2 = bufferedReader2.readLine();
            if (readLine2 == null) {
                break;
            } else {
                this.log.warning(readLine2);
            }
        }
        bufferedReader2.close();
        int waitFor = exec.waitFor();
        if (waitFor != 0) {
            this.log.error("rescal python call returned error code: " + waitFor);
            return;
        }
        BulkHintEvent readHints = this.hintReader.readHints(this.rescalInputData);
        int size = (readHints == null || readHints.getHintEvents() == null) ? 0 : readHints.getHintEvents().size();
        this.log.info("loaded {} hints into bulk hint event and publish", Integer.valueOf(size));
        if (size > 0) {
            StringBuilder sb = new StringBuilder();
            Iterator it2 = readHints.getHintEvents().iterator();
            while (it2.hasNext()) {
                sb.append("\n- " + ((HintEvent) it2.next()));
            }
            this.log.info(sb.toString());
            this.pubSubMediator.tell(new DistributedPubSubMediator.Publish(readHints.getClass().getName(), readHints), getSelf());
        }
        this.lastQueryDate = currentTimeMillis;
    }
}
