package won.matcher.service.crawler.actor;

import akka.actor.ActorRef;
import akka.actor.OneForOneStrategy;
import akka.actor.SupervisorStrategy;
import akka.actor.UntypedActor;
import akka.cluster.pubsub.DistributedPubSub;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.Function;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import won.matcher.service.common.event.WonNodeEvent;
import won.matcher.service.common.spring.SpringExtension;
import won.matcher.service.crawler.config.CrawlConfig;
import won.matcher.service.crawler.exception.CrawlWrapperException;
import won.matcher.service.crawler.msg.CrawlUriMessage;
import won.matcher.service.crawler.msg.ResourceCrawlUriMessage;
import won.matcher.service.crawler.service.CrawlSparqlService;
import won.protocol.service.WonNodeInfo;

@Scope("prototype")
@Component
/* loaded from: input_file:won/matcher/service/crawler/actor/MasterCrawlerActor.class */
public class MasterCrawlerActor extends UntypedActor {
    private static final FiniteDuration RESCHEDULE_MESSAGE_DURATION = Duration.create(500L, TimeUnit.MILLISECONDS);
    private ActorRef crawlingWorker;
    private ActorRef updateMetaDataWorker;
    private ActorRef pubSubMediator;
    private static final String RECRAWL_TICK = "recrawl_tick";
    private static final int MIN_PENDING_MESSAGES_TO_SKIP_RECRAWLING = 10;

    @Autowired
    private CrawlConfig config;

    @Autowired
    private CrawlSparqlService sparqlService;
    private LoggingAdapter log = Logging.getLogger(getContext().system(), this);
    private Map<String, CrawlUriMessage> pendingMessages = new HashMap();
    private Map<String, CrawlUriMessage> doneMessages = new HashMap();
    private Map<String, CrawlUriMessage> failedMessages = new HashMap();
    private Set<String> crawlWonNodeUris = new HashSet();
    private Set<String> skipWonNodeUris = new HashSet();

    @Override // akka.actor.UntypedActor, akka.actor.Actor
    public void preStart() {
        getContext().system().scheduler().schedule(this.config.getRecrawlIntervalDuration(), this.config.getRecrawlIntervalDuration(), getSelf(), RECRAWL_TICK, getContext().dispatcher(), null);
        this.crawlingWorker = getContext().actorOf(SpringExtension.SpringExtProvider.get(getContext().system()).fromConfigProps(WorkerCrawlerActor.class), "CrawlingRouter");
        this.updateMetaDataWorker = getContext().actorOf(SpringExtension.SpringExtProvider.get(getContext().system()).props(UpdateMetadataActor.class), "MetaDataUpdateWorker");
        getContext().watch(this.updateMetaDataWorker);
        getContext().actorOf(SpringExtension.SpringExtProvider.get(getContext().system()).props(NeedEventLoaderActor.class), "NeedEventLoader");
        this.pubSubMediator = DistributedPubSub.get(getContext().system()).mediator();
        this.pubSubMediator.tell(new DistributedPubSubMediator.Subscribe(WonNodeEvent.class.getName(), getSelf()), getSelf());
        this.pubSubMediator.tell(new DistributedPubSubMediator.Subscribe(CrawlUriMessage.class.getName(), getSelf()), getSelf());
        this.pubSubMediator.tell(new DistributedPubSubMediator.Subscribe(ResourceCrawlUriMessage.class.getName(), getSelf()), getSelf());
        for (CrawlUriMessage crawlUriMessage : this.sparqlService.retrieveMessagesForCrawling(CrawlUriMessage.STATUS.PROCESS)) {
            this.pendingMessages.put(crawlUriMessage.getUri(), crawlUriMessage);
            this.crawlingWorker.tell(crawlUriMessage, getSelf());
        }
        Iterator<CrawlUriMessage> it = this.sparqlService.retrieveMessagesForCrawling(CrawlUriMessage.STATUS.FAILED).iterator();
        while (it.hasNext()) {
            getSelf().tell(it.next(), getSelf());
        }
    }

    @Override // akka.actor.UntypedActor, akka.actor.Actor
    public SupervisorStrategy supervisorStrategy() {
        return new OneForOneStrategy(0, Duration.Zero(), new Function<Throwable, SupervisorStrategy.Directive>() { // from class: won.matcher.service.crawler.actor.MasterCrawlerActor.1
            @Override // akka.japi.Function
            public SupervisorStrategy.Directive apply(Throwable th) throws Exception {
                MasterCrawlerActor.this.log.warning("Actor encountered error: {}", th);
                if (!(th instanceof CrawlWrapperException)) {
                    return SupervisorStrategy.escalate();
                }
                CrawlWrapperException crawlWrapperException = (CrawlWrapperException) th;
                MasterCrawlerActor.this.log.warning("Handled breaking message: {}", crawlWrapperException.getBreakingMessage());
                MasterCrawlerActor.this.log.warning("Exception was: {}", crawlWrapperException.getException());
                MasterCrawlerActor.this.processCrawlUriMessage(crawlWrapperException.getBreakingMessage());
                return SupervisorStrategy.resume();
            }
        });
    }

    @Override // akka.actor.UntypedActor
    public void onReceive(Object obj) throws InterruptedException {
        if (obj.equals(RECRAWL_TICK)) {
            askWonNodeInfoForCrawling();
            return;
        }
        if (obj instanceof WonNodeEvent) {
            processWonNodeEvent((WonNodeEvent) obj);
        } else if (!(obj instanceof CrawlUriMessage)) {
            unhandled(obj);
        } else {
            processCrawlUriMessage((CrawlUriMessage) obj);
            this.log.debug("Number of pending messages: {}", Integer.valueOf(this.pendingMessages.size()));
        }
    }

    private void logStatus() {
        this.log.debug("Number of URIs\n Crawled: {}\n Failed: {}\n Pending: {}", Integer.valueOf(this.doneMessages.size()), Integer.valueOf(this.failedMessages.size()), Integer.valueOf(this.pendingMessages.size()));
        if (this.pendingMessages.size() == 0) {
            this.log.info("crawling process stopped. No pending uri messages in pending queue!");
        }
    }

    private boolean discoveredNewWonNode(String str) {
        return (str == null || str.isEmpty() || this.crawlWonNodeUris.contains(str) || this.skipWonNodeUris.contains(str)) ? false : true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processCrawlUriMessage(CrawlUriMessage crawlUriMessage) {
        this.log.debug("Process message: {}", crawlUriMessage);
        if (!crawlUriMessage.getStatus().equals(CrawlUriMessage.STATUS.PROCESS) && !crawlUriMessage.getStatus().equals(CrawlUriMessage.STATUS.SAVE)) {
            if (crawlUriMessage.getStatus().equals(CrawlUriMessage.STATUS.DONE)) {
                this.log.debug("Successfully processed URI: {}", crawlUriMessage.getUri());
                this.updateMetaDataWorker.tell(crawlUriMessage, getSelf());
                this.pendingMessages.remove(crawlUriMessage.getUri());
                if (this.doneMessages.put(crawlUriMessage.getUri(), crawlUriMessage) != null) {
                    this.log.warning("URI message received twice: {}", crawlUriMessage.getUri());
                }
                logStatus();
                return;
            }
            if (crawlUriMessage.getStatus().equals(CrawlUriMessage.STATUS.FAILED)) {
                this.log.debug("Crawling URI failed: {}", crawlUriMessage.getUri());
                this.updateMetaDataWorker.tell(crawlUriMessage, getSelf());
                this.pendingMessages.remove(crawlUriMessage.getUri());
                this.failedMessages.put(crawlUriMessage.getUri(), crawlUriMessage);
                logStatus();
                return;
            }
            return;
        }
        if (this.pendingMessages.get(crawlUriMessage.getUri()) != null || this.doneMessages.get(crawlUriMessage.getUri()) != null || this.failedMessages.get(crawlUriMessage.getUri()) != null) {
            this.log.debug("message {} already processing/processed ...", crawlUriMessage);
            return;
        }
        this.updateMetaDataWorker.tell(crawlUriMessage, getSelf());
        if (discoveredNewWonNode(crawlUriMessage.getWonNodeUri())) {
            this.log.debug("discovered new won node {}", crawlUriMessage.getWonNodeUri());
            WonNodeEvent wonNodeEvent = new WonNodeEvent(crawlUriMessage.getWonNodeUri(), WonNodeEvent.STATUS.NEW_WON_NODE_DISCOVERED);
            this.pubSubMediator.tell(new DistributedPubSubMediator.Publish(wonNodeEvent.getClass().getName(), wonNodeEvent), getSelf());
            getContext().system().scheduler().scheduleOnce(RESCHEDULE_MESSAGE_DURATION, getSelf(), crawlUriMessage, getContext().dispatcher(), null);
            return;
        }
        if (this.skipWonNodeUris.contains(crawlUriMessage.getWonNodeUri())) {
            return;
        }
        this.pendingMessages.put(crawlUriMessage.getUri(), crawlUriMessage);
        this.crawlingWorker.tell(crawlUriMessage, getSelf());
    }

    private void processWonNodeEvent(WonNodeEvent wonNodeEvent) throws InterruptedException {
        if (wonNodeEvent.getStatus().equals(WonNodeEvent.STATUS.CONNECTED_TO_WON_NODE)) {
            this.log.info("added new won node to set of connected and crawling won nodes: {}", wonNodeEvent.getWonNodeUri());
            this.skipWonNodeUris.remove(wonNodeEvent.getWonNodeUri());
            this.crawlWonNodeUris.add(wonNodeEvent.getWonNodeUri());
            getContext().system().scheduler().scheduleOnce(FiniteDuration.create(30L, TimeUnit.SECONDS), getSelf(), new WonNodeEvent(wonNodeEvent.getWonNodeUri(), WonNodeEvent.STATUS.START_CRAWLING_WON_NODE, wonNodeEvent.getWonNodeInfo()), getContext().dispatcher(), getSelf());
            return;
        }
        if (wonNodeEvent.getStatus().equals(WonNodeEvent.STATUS.START_CRAWLING_WON_NODE)) {
            startCrawling(wonNodeEvent.getWonNodeInfo());
        } else if (wonNodeEvent.getStatus().equals(WonNodeEvent.STATUS.SKIP_WON_NODE)) {
            this.log.debug("skip crawling won node: {}", wonNodeEvent.getWonNodeUri());
            this.crawlWonNodeUris.remove(wonNodeEvent.getWonNodeUri());
            this.skipWonNodeUris.add(wonNodeEvent.getWonNodeUri());
        }
    }

    private void askWonNodeInfoForCrawling() {
        if (this.pendingMessages.size() > 10) {
            this.log.warning("Skip crawling cylce since there are currently {} messages in the pending queue. Try to restart crawling again in {} minutes", Integer.valueOf(this.pendingMessages.size()), Long.valueOf(this.config.getRecrawlIntervalDuration().toMinutes()));
            return;
        }
        this.log.info("Start crawling process again. Clear the cached uris and crawling statistics");
        this.doneMessages.clear();
        this.failedMessages.clear();
        this.pendingMessages.clear();
        for (String str : this.crawlWonNodeUris) {
            this.log.info("ask for won node info of {}", str);
            WonNodeEvent wonNodeEvent = new WonNodeEvent(str, WonNodeEvent.STATUS.GET_WON_NODE_INFO_FOR_CRAWLING);
            this.pubSubMediator.tell(new DistributedPubSubMediator.Publish(wonNodeEvent.getClass().getName(), wonNodeEvent), getSelf());
        }
    }

    private void startCrawling(WonNodeInfo wonNodeInfo) {
        this.log.info("start crawling won node: {} ...", wonNodeInfo.getWonNodeURI());
        String retrieveNeedModificationDateForCrawling = this.sparqlService.retrieveNeedModificationDateForCrawling(wonNodeInfo.getWonNodeURI());
        if (retrieveNeedModificationDateForCrawling != null) {
            String removeEndingSlash = removeEndingSlash(wonNodeInfo.getNeedListURI());
            self().tell(new CrawlUriMessage(removeEndingSlash + "?modifiedafter=" + retrieveNeedModificationDateForCrawling, removeEndingSlash, wonNodeInfo.getWonNodeURI(), CrawlUriMessage.STATUS.PROCESS, System.currentTimeMillis(), null), getSelf());
        } else {
            String removeEndingSlash2 = removeEndingSlash(wonNodeInfo.getNeedListURI());
            self().tell(new CrawlUriMessage(removeEndingSlash2, removeEndingSlash2, wonNodeInfo.getWonNodeURI(), CrawlUriMessage.STATUS.PROCESS, System.currentTimeMillis(), null), getSelf());
        }
        String retrieveConnectionModificationDateForCrawling = this.sparqlService.retrieveConnectionModificationDateForCrawling(wonNodeInfo.getWonNodeURI());
        if (retrieveConnectionModificationDateForCrawling != null) {
            String removeEndingSlash3 = removeEndingSlash(wonNodeInfo.getConnectionURIPrefix());
            self().tell(new CrawlUriMessage(removeEndingSlash3 + "?modifiedafter=" + retrieveConnectionModificationDateForCrawling, removeEndingSlash3, wonNodeInfo.getWonNodeURI(), CrawlUriMessage.STATUS.PROCESS, System.currentTimeMillis(), null), getSelf());
        }
    }

    private String removeEndingSlash(String str) {
        return (str == null || !str.endsWith("/")) ? str : str.substring(0, str.length() - 1);
    }
}
