/*
 * Decompiled with CFR 0.152.
 */
package com.skytix.schedulerclient;

import com.skytix.schedulerclient.HttpLeaderResolver;
import com.skytix.schedulerclient.LeaderResolver;
import com.skytix.schedulerclient.NoLeaderException;
import com.skytix.schedulerclient.SchedulerConfig;
import com.skytix.schedulerclient.SchedulerEventHandler;
import com.skytix.schedulerclient.SchedulerRemote;
import com.skytix.schedulerclient.ZooKeeperLeaderResolver;
import java.io.BufferedInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.mesos.v1.Protos;
import org.apache.mesos.v1.scheduler.Protos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class Scheduler
implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(Scheduler.class);
    private static final Semaphore mSemaphore = new Semaphore(0);
    private final Protos.FrameworkID mFrameworkId;
    private final SchedulerConfig mConfig;
    private final HttpClient mHttpClient = HttpClient.newHttpClient();
    private final SchedulerEventHandler mSchedulerEventHandler;
    private final LeaderResolver mLeaderResolver;
    private ScheduledExecutorService mExecutorService = null;
    private String mMesosStreamID = null;
    private String mMasterURL = null;
    private ScheduledFuture<?> mClientThread;
    private boolean mRunning = true;

    public static Scheduler newScheduler(String aFrameworkId, String aMesosMasterURI, SchedulerEventHandler aEventHandler) {
        return Scheduler.newScheduler(new SchedulerConfig.SchedulerConfigBuilder().frameworkID(aFrameworkId).mesosMasterURL(aMesosMasterURI), aEventHandler);
    }

    public static Scheduler newScheduler(SchedulerConfig.SchedulerConfigBuilder aConfig, SchedulerEventHandler aEventHandler) {
        return Scheduler.newScheduler(aConfig, aEventHandler, Executors.newScheduledThreadPool(1));
    }

    public static Scheduler newScheduler(SchedulerConfig.SchedulerConfigBuilder aConfig, SchedulerEventHandler aEventHandler, ScheduledExecutorService aExecutorService) {
        Scheduler scheduler = new Scheduler(aConfig.build(), aEventHandler);
        scheduler.init(aExecutorService);
        return scheduler;
    }

    private Scheduler(SchedulerConfig aConfig, SchedulerEventHandler aEventHandler) {
        Protos.FrameworkID.Builder frameworkID = Protos.FrameworkID.newBuilder();
        if (StringUtils.isEmpty((CharSequence)aConfig.getFrameworkID())) {
            frameworkID.setValue(UUID.randomUUID().toString());
        } else {
            frameworkID.setValue(aConfig.getFrameworkID());
        }
        this.mConfig = aConfig;
        this.mFrameworkId = frameworkID.build();
        this.mSchedulerEventHandler = aEventHandler;
        String masterURL = aConfig.getMesosMasterURL();
        if (StringUtils.isNotEmpty((CharSequence)masterURL)) {
            this.mLeaderResolver = masterURL.startsWith("zk") ? new ZooKeeperLeaderResolver() : new HttpLeaderResolver(masterURL);
        } else {
            throw new IllegalArgumentException("mesosMasterURL configuration is required");
        }
    }

    private void init(ScheduledExecutorService aThreadExecutorService) {
        this.mExecutorService = aThreadExecutorService;
        this.mClientThread = this.mExecutorService.schedule(() -> {
            try {
                Protos.FrameworkInfo.Builder frameworkInfo = this.createFrameworkInfo();
                Protos.Call subscribeCall = Protos.Call.newBuilder().setFrameworkId(this.mFrameworkId).setType(Protos.Call.Type.SUBSCRIBE).setSubscribe(Protos.Call.Subscribe.newBuilder().setFrameworkInfo(frameworkInfo)).build();
                String leader = this.mLeaderResolver.resolveLeader();
                URI leaderUri = new URI(leader + "/api/v1/scheduler");
                HttpRequest request = HttpRequest.newBuilder().uri(leaderUri).header("Content-Type", "application/x-protobuf").header("Accept", "application/x-protobuf").POST(HttpRequest.BodyPublishers.ofByteArray(subscribeCall.toByteArray())).build();
                log.info(String.format("Connecting to Mesos at: %s", leaderUri));
                HttpResponse<InputStream> response = this.mHttpClient.send(request, HttpResponse.BodyHandlers.ofInputStream());
                if (response.statusCode() == 200) {
                    this.mMesosStreamID = response.headers().firstValue("Mesos-Stream-Id").get();
                    this.mMasterURL = leader;
                    BufferedInputStream reader = new BufferedInputStream(response.body());
                    StringBuffer sb = new StringBuffer();
                    int data = ((InputStream)reader).read();
                    while (data != -1 && this.mRunning) {
                        if (data == 10) {
                            long recordLength = Long.parseLong(sb.toString());
                            byte[] buffer = reader.readNBytes((int)recordLength);
                            Protos.Event event = Protos.Event.parseFrom((byte[])buffer);
                            switch (event.getType()) {
                                case SUBSCRIBED: {
                                    this.mSchedulerEventHandler.onSubscribe(new SchedulerRemote(this));
                                    log.info(String.format("Connected to Master as FrameworkID: %s", this.mFrameworkId.getValue()));
                                    break;
                                }
                                case ERROR: {
                                    String error = String.format("Error subscribing to Mesos: %s", event.getMessage());
                                    log.error(error);
                                    this.mSchedulerEventHandler.onTerminate(new IllegalStateException(error));
                                    return;
                                }
                                default: {
                                    try {
                                        this.mSchedulerEventHandler.handleEvent(event);
                                        break;
                                    }
                                    catch (Exception aE) {
                                        log.error(aE.getMessage(), (Throwable)aE);
                                    }
                                }
                            }
                            sb = new StringBuffer();
                            data = ((InputStream)reader).read();
                            continue;
                        }
                        sb.append(new String(new byte[]{(byte)data}));
                        data = ((InputStream)reader).read();
                    }
                    if (this.mRunning) {
                        log.info(String.format("Scheduler '%s' %s has lost it's connection to Mesos", new Object[0]));
                        this.mSchedulerEventHandler.onDisconnect();
                    } else {
                        this.mSchedulerEventHandler.onExit();
                    }
                } else {
                    this.mSchedulerEventHandler.onTerminate(new Exception(String.format("Scheduler was unable to connect to mesos with exit code %d", response.statusCode())));
                    log.error("Error subscribing to Mesos");
                }
            }
            catch (NoLeaderException | IOException | InterruptedException | URISyntaxException aE) {
                this.mSchedulerEventHandler.onTerminate(aE);
                log.error(aE.getMessage(), (Throwable)aE);
            }
            finally {
                mSemaphore.release();
            }
        }, 0L, TimeUnit.SECONDS);
    }

    public void join() throws InterruptedException {
        mSemaphore.acquire();
    }

    private Protos.FrameworkInfo.Builder createFrameworkInfo() {
        Protos.FrameworkInfo.Builder frameworkInfo = Protos.FrameworkInfo.newBuilder().setId(this.mFrameworkId);
        if (StringUtils.isBlank((CharSequence)this.mConfig.getUser())) {
            frameworkInfo.setUser("root");
        } else {
            frameworkInfo.setUser(this.mConfig.getUser());
        }
        if (StringUtils.isBlank((CharSequence)this.mConfig.getName())) {
            frameworkInfo.setName("mesos-scheduler-client");
        } else {
            frameworkInfo.setName(this.mConfig.getName());
        }
        if (this.mConfig.getFailoverTimeout() > 0.0) {
            frameworkInfo.setFailoverTimeout(this.mConfig.getFailoverTimeout());
        }
        return frameworkInfo;
    }

    @Override
    public void close() throws IOException {
        this.mRunning = false;
        this.mClientThread.cancel(false);
    }

    protected Protos.FrameworkID getFrameworkID() {
        return this.mFrameworkId;
    }

    protected void sendCall(Protos.Call aCall) {
        try {
            HttpRequest request = HttpRequest.newBuilder().uri(new URI(this.mMasterURL + "/api/v1/scheduler")).header("Content-Type", "application/x-protobuf").header("Mesos-Stream-Id", this.mMesosStreamID).POST(HttpRequest.BodyPublishers.ofByteArray(aCall.toByteArray())).build();
            HttpResponse<String> response = this.mHttpClient.send(request, HttpResponse.BodyHandlers.ofString());
            if (response.statusCode() != 202) {
                log.error("Error sending call to Mesos: " + response.body());
            }
        }
        catch (IOException | InterruptedException | URISyntaxException aE) {
            aE.printStackTrace();
        }
    }

    protected Protos.Call.Builder createCall(Protos.Call.Type aType) {
        return Protos.Call.newBuilder().setFrameworkId(this.mFrameworkId).setType(aType);
    }
}

