package jaicore.search.algorithms.parallel.parallelexploration.distributed;

import ai.libs.jaicore.basic.sets.SetUtil;
import com.google.common.eventbus.EventBus;
import jaicore.search.algorithms.parallel.parallelexploration.distributed.events.NodePassedToCoworkerEvent;
import jaicore.search.algorithms.parallel.parallelexploration.distributed.interfaces.DistributedSearchCommunicationLayer;
import jaicore.search.algorithms.parallel.parallelexploration.distributed.interfaces.DistributionSearchAdapter;
import jaicore.search.model.travesaltree.Node;
import java.lang.Comparable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:jaicore/search/algorithms/parallel/parallelexploration/distributed/DistributedSearchManager.class */
public class DistributedSearchManager<T, A, V extends Comparable<V>> {
    private static final Logger logger = LoggerFactory.getLogger(DistributedSearchManager.class);
    private final DistributedSearchCommunicationLayer<T, A, V> communicationLayer;
    private final DistributionSearchAdapter<T, V> distAdapter;
    private final BlockingQueue<String> idleCoworkers = new LinkedBlockingQueue();
    private final Set<String> pendingCoworkers = Collections.synchronizedSet(new HashSet());
    private final Map<String, Collection<Node<T, V>>> coworkerJobs = Collections.synchronizedMap(new HashMap());
    private final EventBus eventBus = new EventBus();
    private final List<Thread> auxThreads = new ArrayList();

    /* loaded from: input_file:jaicore/search/algorithms/parallel/parallelexploration/distributed/DistributedSearchManager$CoworkerSynchronizerWorker.class */
    private class CoworkerSynchronizerWorker extends Thread {
        private CoworkerSynchronizerWorker() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!Thread.interrupted()) {
                try {
                    DistributedSearchManager.logger.info("Scanning for new/removed coworkers ...");
                    DistributedSearchManager.this.detectNewCoworkers();
                    DistributedSearchManager.this.detectUnattachedCoworkers();
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    return;
                }
            }
        }
    }

    /* loaded from: input_file:jaicore/search/algorithms/parallel/parallelexploration/distributed/DistributedSearchManager$JobWriterWorker.class */
    private class JobWriterWorker extends Thread {
        private JobWriterWorker() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            Collection<Node<T, V>> nextJob;
            while (!Thread.interrupted()) {
                try {
                    while (true) {
                        nextJob = DistributedSearchManager.this.distAdapter.nextJob();
                        if (nextJob != null) {
                            break;
                        } else {
                            Thread.sleep(1000L);
                        }
                    }
                    DistributedSearchManager.logger.info("Waiting for the next available coworker ...");
                    String str = (String) DistributedSearchManager.this.idleCoworkers.take();
                    DistributedSearchManager.this.pendingCoworkers.add(str);
                    DistributedSearchManager.logger.info("Assigning next job to {}", str);
                    DistributedSearchManager.this.pendingCoworkers.remove(str);
                    DistributedSearchManager.this.coworkerJobs.put(str, nextJob);
                    DistributedSearchManager.this.idleCoworkers.remove(str);
                    DistributedSearchManager.this.communicationLayer.createNewJobForCoworker(str, nextJob);
                    Iterator<Node<T, V>> it = nextJob.iterator();
                    while (it.hasNext()) {
                        DistributedSearchManager.this.eventBus.post(new NodePassedToCoworkerEvent(it.next()));
                    }
                } catch (InterruptedException e) {
                    return;
                }
            }
        }
    }

    /* loaded from: input_file:jaicore/search/algorithms/parallel/parallelexploration/distributed/DistributedSearchManager$ResultReaderWorker.class */
    private class ResultReaderWorker extends Thread {
        private ResultReaderWorker() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!Thread.interrupted()) {
                try {
                    Iterator it = new ArrayList(DistributedSearchManager.this.coworkerJobs.keySet()).iterator();
                    while (it.hasNext()) {
                        String str = (String) it.next();
                        DistributedComputationResult<T, V> readResult = DistributedSearchManager.this.communicationLayer.readResult(str);
                        if (readResult != null) {
                            DistributedSearchManager.logger.info("Received result with {} open nodes and {} solution(s)", Integer.valueOf(readResult.getOpen().size()), Integer.valueOf(readResult.getSolutions().size()));
                            DistributedSearchManager.this.distAdapter.processResult((Collection) DistributedSearchManager.this.coworkerJobs.get(str), readResult);
                            DistributedSearchManager.this.coworkerJobs.remove(str);
                            DistributedSearchManager.this.idleCoworkers.add(str);
                        }
                    }
                    Thread.sleep(500L);
                } catch (InterruptedException e) {
                    return;
                }
            }
        }
    }

    public EventBus getEventBus() {
        return this.eventBus;
    }

    public DistributedSearchManager(DistributedSearchCommunicationLayer<T, A, V> distributedSearchCommunicationLayer, DistributionSearchAdapter<T, V> distributionSearchAdapter) {
        if (distributedSearchCommunicationLayer == null) {
            throw new IllegalArgumentException("communication layer must not be null!");
        }
        if (distributionSearchAdapter == null) {
            throw new IllegalArgumentException("result processor must not be null!");
        }
        this.communicationLayer = distributedSearchCommunicationLayer;
        this.distAdapter = distributionSearchAdapter;
        this.auxThreads.add(new CoworkerSynchronizerWorker());
        this.auxThreads.add(new JobWriterWorker());
        this.auxThreads.add(new ResultReaderWorker());
        Iterator<Thread> it = this.auxThreads.iterator();
        while (it.hasNext()) {
            it.next().start();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void detectNewCoworkers() {
        for (String str : this.communicationLayer.detectNewCoworkers()) {
            logger.info("Detected new coworker {}. Now trying to attach it ...", str);
            this.communicationLayer.attachCoworker(str);
            logger.info("Attached new coworker {}", str);
            this.idleCoworkers.add(str);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void detectUnattachedCoworkers() {
        Iterator it = new ArrayList(this.idleCoworkers).iterator();
        while (it.hasNext()) {
            String str = (String) it.next();
            if (!this.communicationLayer.isAttached(str)) {
                logger.info("Coworker {} was detached!", str);
                this.idleCoworkers.remove(str);
            }
        }
        Iterator it2 = new ArrayList(this.coworkerJobs.keySet()).iterator();
        while (it2.hasNext()) {
            String str2 = (String) it2.next();
            if (!this.communicationLayer.isAttached(str2)) {
                Collection<Node<T, V>> collection = this.coworkerJobs.get(str2);
                this.coworkerJobs.remove(str2);
                logger.warn("Busy coworker {} was detached. Resubmitting his job {}.", str2, collection);
            }
        }
    }

    public int getNumberOfHelpers() {
        return this.idleCoworkers.size() + this.pendingCoworkers.size() + this.coworkerJobs.size();
    }

    public int getNumbetOfIdleCoworkers() {
        return this.idleCoworkers.size();
    }

    public int getNumbetOfPendingCoworkers() {
        return this.pendingCoworkers.size();
    }

    public boolean isBusy() {
        return !this.coworkerJobs.isEmpty();
    }

    public void shutdown() {
        for (String str : SetUtil.union(new Collection[]{this.idleCoworkers, this.coworkerJobs.keySet()})) {
            this.coworkerJobs.remove(str);
            this.communicationLayer.detachCoworker(str);
        }
        for (Thread thread : this.auxThreads) {
            logger.info("Shutting down {}.", thread);
            thread.interrupt();
            try {
                thread.join();
                logger.info("Shutdown of {} complete.", thread);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
