package won.matcher.service.nodemanager.actor;

import akka.actor.ActorRef;
import akka.actor.OneForOneStrategy;
import akka.actor.SupervisorStrategy;
import akka.camel.CamelMessage;
import akka.camel.javaapi.UntypedConsumerActor;
import akka.cluster.pubsub.DistributedPubSub;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.Function;
import java.net.URI;
import org.apache.jena.query.Dataset;
import org.apache.jena.riot.Lang;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import scala.concurrent.duration.Duration;
import won.matcher.service.common.event.NeedEvent;
import won.matcher.service.common.service.monitoring.MonitoringService;
import won.matcher.service.crawler.msg.CrawlUriMessage;
import won.matcher.service.crawler.msg.ResourceCrawlUriMessage;
import won.protocol.util.linkeddata.LinkedDataSource;

@Scope("prototype")
@Component
/* loaded from: input_file:won/matcher/service/nodemanager/actor/NeedConsumerProtocolActor.class */
public class NeedConsumerProtocolActor extends UntypedConsumerActor {
    private static final String MSG_HEADER_METHODNAME = "methodName";
    private static final String MSG_HEADER_METHODNAME_NEEDCREATED = "needCreated";
    private static final String MSG_HEADER_METHODNAME_NEEDACTIVATED = "needActivated";
    private static final String MSG_HEADER_METHODNAME_NEEDDEACTIVATED = "needDeactivated";
    private static final String MSG_HEADER_WON_NODE_URI = "wonNodeURI";
    private static final String MSG_HEADER_NEED_URI = "needURI";
    private final String endpoint;

    @Autowired
    private MonitoringService monitoringService;

    @Autowired
    private LinkedDataSource linkedDataSource;
    private LoggingAdapter log = Logging.getLogger(getContext().system(), this);
    private ActorRef pubSubMediator = DistributedPubSub.get(getContext().system()).mediator();

    public NeedConsumerProtocolActor(String str) {
        this.endpoint = str;
    }

    @Override // akka.camel.javaapi.UntypedConsumerActor
    public String getEndpointUri() {
        return this.endpoint;
    }

    @Override // akka.actor.UntypedActor
    public void onReceive(Object obj) throws Exception {
        if (obj instanceof CamelMessage) {
            CamelMessage camelMessage = (CamelMessage) obj;
            String str = (String) camelMessage.getHeaders().get(MSG_HEADER_NEED_URI);
            String str2 = (String) camelMessage.getHeaders().get(MSG_HEADER_WON_NODE_URI);
            this.monitoringService.startClock(MonitoringService.NEED_HINT_STOPWATCH, str);
            if (str == null || str2 == null) {
                this.log.warning("Message not processed; needURI or wonNodeURI is null");
            } else {
                Object obj2 = camelMessage.getHeaders().get(MSG_HEADER_METHODNAME);
                if (obj2 != null) {
                    this.log.debug("Received event '{}' for needUri '{}' and wonNeedUri '{}' and publish it to matchers", obj2, str, str2);
                    long currentTimeMillis = System.currentTimeMillis();
                    Dataset dataForResource = this.linkedDataSource.getDataForResource(URI.create(str));
                    if (obj2.equals(MSG_HEADER_METHODNAME_NEEDCREATED)) {
                        NeedEvent needEvent = new NeedEvent(str, str2, NeedEvent.TYPE.ACTIVE, currentTimeMillis, dataForResource);
                        this.pubSubMediator.tell(new DistributedPubSubMediator.Publish(needEvent.getClass().getName(), needEvent), getSelf());
                    } else if (obj2.equals(MSG_HEADER_METHODNAME_NEEDACTIVATED)) {
                        NeedEvent needEvent2 = new NeedEvent(str, str2, NeedEvent.TYPE.ACTIVE, currentTimeMillis, dataForResource);
                        this.pubSubMediator.tell(new DistributedPubSubMediator.Publish(needEvent2.getClass().getName(), needEvent2), getSelf());
                    } else if (obj2.equals(MSG_HEADER_METHODNAME_NEEDDEACTIVATED)) {
                        NeedEvent needEvent3 = new NeedEvent(str, str2, NeedEvent.TYPE.INACTIVE, currentTimeMillis, dataForResource);
                        this.pubSubMediator.tell(new DistributedPubSubMediator.Publish(needEvent3.getClass().getName(), needEvent3), getSelf());
                    } else {
                        unhandled(obj);
                    }
                    ResourceCrawlUriMessage resourceCrawlUriMessage = new ResourceCrawlUriMessage(str, str, str2, CrawlUriMessage.STATUS.SAVE, currentTimeMillis, null);
                    resourceCrawlUriMessage.setSerializedResource(camelMessage.body().toString());
                    resourceCrawlUriMessage.setSerializationFormat(Lang.TRIG);
                    this.pubSubMediator.tell(new DistributedPubSubMediator.Publish(resourceCrawlUriMessage.getClass().getName(), resourceCrawlUriMessage), getSelf());
                    return;
                }
                this.log.warning("Message not processed; methodName is null");
            }
        }
        unhandled(obj);
    }

    @Override // akka.actor.UntypedActor, akka.actor.Actor
    public SupervisorStrategy supervisorStrategy() {
        return new OneForOneStrategy(0, Duration.Zero(), new Function<Throwable, SupervisorStrategy.Directive>() { // from class: won.matcher.service.nodemanager.actor.NeedConsumerProtocolActor.1
            @Override // akka.japi.Function
            public SupervisorStrategy.Directive apply(Throwable th) throws Exception {
                NeedConsumerProtocolActor.this.log.warning("Actor encountered error: {}", th);
                return SupervisorStrategy.escalate();
            }
        });
    }
}
