package ai.grakn.engine.loader.client;

import ai.grakn.engine.backgroundtasks.TaskStatus;
import ai.grakn.engine.loader.Loader;
import ai.grakn.engine.loader.LoaderTask;
import ai.grakn.engine.util.ConfigProperties;
import ai.grakn.engine.util.SystemOntologyElements;
import ai.grakn.graql.InsertQuery;
import ai.grakn.util.ErrorMessage;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.stream.Collectors;
import javax.xml.ws.http.HTTPException;
import mjson.Json;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import spark.utils.IOUtils;

/* loaded from: input_file:ai/grakn/engine/loader/client/LoaderClient.class */
public class LoaderClient implements Loader {
    private static final Logger LOG = LoggerFactory.getLogger(Loader.class);
    private static final ConfigProperties properties = ConfigProperties.getInstance();
    private static ExecutorService executor = Executors.newSingleThreadExecutor();
    private Future future;
    private long pollingFrequency;
    private int batchSize;
    private Collection<InsertQuery> queries;
    private String keyspace;
    private Set<String> hosts;
    private Map<String, Semaphore> availability;
    private Set<String> submitted;
    private final String POST = post();
    private final String GET = get();
    private int currentHost = 0;

    public LoaderClient(String str, Collection<String> collection) {
        this.hosts = Sets.newHashSet(collection);
        setBatchSize(properties.getPropertyAsInt(ConfigProperties.BATCH_SIZE_PROPERTY));
        setPollingFrequency(properties.getPropertyAsLong(ConfigProperties.POLLING_FREQUENCY_PROPERTY));
        setQueueSize(25);
        resetJobsTerminated();
        this.keyspace = str;
        this.queries = new HashSet();
    }

    @Override // ai.grakn.engine.loader.Loader
    public int getBatchSize() {
        return this.batchSize;
    }

    @Override // ai.grakn.engine.loader.Loader
    public Loader setBatchSize(int i) {
        this.batchSize = i;
        return this;
    }

    @Override // ai.grakn.engine.loader.Loader
    public Loader setQueueSize(int i) {
        this.availability = new HashMap();
        this.hosts.forEach(str -> {
            this.availability.put(str, new Semaphore(i));
        });
        return this;
    }

    @Override // ai.grakn.engine.loader.Loader
    public void add(InsertQuery insertQuery) {
        this.queries.add(insertQuery);
        if (this.queries.size() >= this.batchSize) {
            sendQueriesToLoader(new HashSet(this.queries));
            this.queries.clear();
        }
    }

    public void setPollingFrequency(long j) {
        this.pollingFrequency = j;
    }

    @Override // ai.grakn.engine.loader.Loader
    public void flush() {
        if (this.queries.size() > 0) {
            sendQueriesToLoader(this.queries);
            this.queries.clear();
        }
    }

    @Override // ai.grakn.engine.loader.Loader
    public void waitToFinish() {
        waitToFinish(100000);
    }

    @Override // ai.grakn.engine.loader.Loader
    public void waitToFinish(int i) {
        flush();
        if (this.future != null) {
            try {
                this.future.get();
            } catch (InterruptedException | ExecutionException e) {
                LOG.error(ExceptionUtils.getFullStackTrace(e));
            }
        }
    }

    public void sendQueriesToLoader(Collection<InsertQuery> collection) {
        HttpURLConnection acquireNextHost = acquireNextHost(getPostParams());
        String executePost = executePost(acquireNextHost, getConfiguration(collection));
        int responseCode = getResponseCode(acquireNextHost);
        if (responseCode != 200) {
            throw new HTTPException(responseCode);
        }
        String asString = Json.read(executePost).at("id").asString();
        this.submitted.add(asString);
        LOG.info("Job " + asString + " sent to host: " + this.hosts.toArray()[this.currentHost]);
        if (this.future == null) {
            startCheckingStatus();
        }
    }

    private String getHostState(String str) {
        HttpURLConnection host = getHost(str, this.GET, "");
        String responseBody = getResponseBody(host);
        host.disconnect();
        return responseBody;
    }

    private void startCheckingStatus() {
        this.future = executor.submit(this::checkForStatusLoop);
    }

    private void stopCheckingStatus() {
        executor.shutdownNow();
        executor = Executors.newSingleThreadExecutor();
        this.future = null;
    }

    public void checkForStatusLoop() {
        while (!this.submitted.isEmpty()) {
            int i = 0;
            int i2 = 0;
            int i3 = 0;
            int i4 = 0;
            int i5 = 0;
            int i6 = 0;
            for (String str : this.availability.keySet()) {
                Map<String, Integer> releaseJobsCompletedIn = releaseJobsCompletedIn(Json.read(getHostState(str)), str);
                int intValue = releaseJobsCompletedIn.get(TaskStatus.CREATED.name()).intValue();
                int intValue2 = releaseJobsCompletedIn.get(TaskStatus.SCHEDULED.name()).intValue();
                int intValue3 = releaseJobsCompletedIn.get(TaskStatus.COMPLETED.name()).intValue();
                int intValue4 = releaseJobsCompletedIn.get(TaskStatus.RUNNING.name()).intValue();
                int intValue5 = releaseJobsCompletedIn.get(TaskStatus.FAILED.name()).intValue();
                int intValue6 = releaseJobsCompletedIn.get(TaskStatus.STOPPED.name()).intValue();
                LOG.info(getLoaderState(intValue, intValue2, intValue3, intValue4, intValue5, intValue6));
                i += intValue;
                i2 += intValue2;
                i3 += intValue3;
                i4 += intValue4;
                i5 += intValue5;
                i6 += intValue6;
            }
            LOG.info(getLoaderState(i, i2, i3, i4, i5, i6));
            try {
                Thread.sleep(this.pollingFrequency);
            } catch (InterruptedException e) {
                LOG.error("Exception", e);
            }
        }
        stopCheckingStatus();
    }

    private HttpURLConnection acquireNextHost(String str) {
        String nextHost = nextHost();
        while (true) {
            String str2 = nextHost;
            if (this.availability.get(str2).tryAcquire()) {
                return getHost(str2, this.POST, str);
            }
            nextHost = nextHost();
        }
    }

    private String nextHost() {
        this.currentHost++;
        if (this.currentHost == this.hosts.size()) {
            this.currentHost = 0;
        }
        return this.hosts.toArray()[this.currentHost].toString();
    }

    private HttpURLConnection getHost(String str, String str2, String str3) {
        HttpURLConnection httpURLConnection = null;
        try {
            httpURLConnection = (HttpURLConnection) new URL(String.format(str2, str) + "?" + str3).openConnection();
            httpURLConnection.setDoOutput(true);
        } catch (IOException e) {
            LOG.error("IOException", e);
        }
        return httpURLConnection;
    }

    private String executePost(HttpURLConnection httpURLConnection, String str) {
        try {
            try {
                httpURLConnection.setRequestMethod("POST");
                httpURLConnection.addRequestProperty("Content-Type", "application/POST");
                httpURLConnection.setRequestProperty("Content-Length", Integer.toString(str.length()));
                httpURLConnection.getOutputStream().write(str.getBytes("UTF8"));
                httpURLConnection.getOutputStream().flush();
                String iOUtils = IOUtils.toString(httpURLConnection.getInputStream());
                httpURLConnection.disconnect();
                return iOUtils;
            } catch (IOException e) {
                LOG.error(ErrorMessage.ERROR_COMMUNICATING_TO_HOST.getMessage(new Object[]{httpURLConnection.getURL().toString()}));
                httpURLConnection.disconnect();
                return null;
            } catch (HTTPException e2) {
                LOG.error(ErrorMessage.ERROR_IN_DISTRIBUTED_TRANSACTION.getMessage(new Object[]{httpURLConnection.getURL().toString(), Integer.valueOf(e2.getStatusCode()), getResponseMessage(httpURLConnection)}));
                httpURLConnection.disconnect();
                return null;
            }
        } catch (Throwable th) {
            httpURLConnection.disconnect();
            throw th;
        }
    }

    private String getResponseMessage(HttpURLConnection httpURLConnection) {
        try {
            return httpURLConnection.getResponseMessage();
        } catch (IOException e) {
            LOG.error(ExceptionUtils.getFullStackTrace(e));
            return null;
        }
    }

    private String getResponseBody(HttpURLConnection httpURLConnection) {
        try {
            return IOUtils.toString(httpURLConnection.getInputStream());
        } catch (IOException e) {
            LOG.error(ExceptionUtils.getFullStackTrace(e));
            return null;
        }
    }

    private int getResponseCode(HttpURLConnection httpURLConnection) {
        try {
            return httpURLConnection.getResponseCode();
        } catch (IOException e) {
            LOG.error(ExceptionUtils.getFullStackTrace(e));
            return 0;
        }
    }

    @Override // ai.grakn.engine.loader.Loader
    public String getLoaderState() {
        int i = 0;
        int i2 = 0;
        int i3 = 0;
        int i4 = 0;
        int i5 = 0;
        int i6 = 0;
        for (String str : this.availability.keySet()) {
            Map<String, Integer> releaseJobsCompletedIn = releaseJobsCompletedIn(Json.read(getHostState(str)), str);
            i += releaseJobsCompletedIn.get(TaskStatus.CREATED.name()).intValue();
            i2 += releaseJobsCompletedIn.get(TaskStatus.SCHEDULED.name()).intValue();
            i3 += releaseJobsCompletedIn.get(TaskStatus.COMPLETED.name()).intValue();
            i4 += releaseJobsCompletedIn.get(TaskStatus.RUNNING.name()).intValue();
            i5 += releaseJobsCompletedIn.get(TaskStatus.FAILED.name()).intValue();
            i6 += releaseJobsCompletedIn.get(TaskStatus.STOPPED.name()).intValue();
        }
        return getLoaderState(i, i2, i3, i4, i5, i6);
    }

    public String getLoaderState(int i, int i2, int i3, int i4, int i5, int i6) {
        return Json.object().set(TaskStatus.CREATED.name(), Integer.valueOf(i)).set(TaskStatus.SCHEDULED.name(), Integer.valueOf(i2)).set(TaskStatus.COMPLETED.name(), Integer.valueOf(i3)).set(TaskStatus.RUNNING.name(), Integer.valueOf(i4)).set(TaskStatus.FAILED.name(), Integer.valueOf(i5)).set(TaskStatus.STOPPED.name(), Integer.valueOf(i6)).toString();
    }

    private void resetJobsTerminated() {
        this.submitted = new HashSet();
    }

    private String post() {
        return "http://%s/tasks/schedule";
    }

    private String get() {
        return "http://%s/tasks/all";
    }

    private String getPostParams() {
        return "className=" + LoaderTask.class.getName() + "&runAt=" + new Date().getTime() + "&creator=" + LoaderClient.class.getName();
    }

    private String getConfiguration(Collection<InsertQuery> collection) {
        return Json.object().set("keyspace", this.keyspace).set("inserts", collection.stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.toList())).toString();
    }

    private Map<String, Integer> releaseJobsCompletedIn(Json json, String str) {
        HashMap hashMap = new HashMap();
        hashMap.put(TaskStatus.CREATED.name(), 0);
        hashMap.put(TaskStatus.RUNNING.name(), 0);
        hashMap.put(TaskStatus.SCHEDULED.name(), 0);
        hashMap.put(TaskStatus.FAILED.name(), 0);
        hashMap.put(TaskStatus.COMPLETED.name(), 0);
        hashMap.put(TaskStatus.STOPPED.name(), 0);
        if (json == null) {
            return hashMap;
        }
        for (Object obj : json.asList()) {
            String obj2 = ((HashMap) obj).get(SystemOntologyElements.STATUS).toString();
            hashMap.put(obj2, Integer.valueOf(((Integer) hashMap.get(obj2)).intValue() + 1));
            String obj3 = ((HashMap) obj).get("id").toString();
            if ((this.submitted.contains(obj3) && obj2.equals(TaskStatus.COMPLETED.name())) || obj2.equals(TaskStatus.FAILED.name())) {
                this.availability.get(str).release();
                this.submitted.remove(obj3);
            }
        }
        return hashMap;
    }
}
