/*
 * Decompiled with CFR 0.152.
 */
package com.skytix.velocity;

import com.skytix.schedulerclient.Scheduler;
import com.skytix.schedulerclient.SchedulerConfig;
import com.skytix.schedulerclient.SchedulerEventHandler;
import com.skytix.velocity.VelocityTaskException;
import com.skytix.velocity.entities.TaskDefinition;
import com.skytix.velocity.entities.VelocityTask;
import com.skytix.velocity.repository.InMemoryTaskRepository;
import com.skytix.velocity.repository.TaskRepository;
import com.skytix.velocity.scheduler.MesosScheduler;
import com.skytix.velocity.scheduler.RunningState;
import com.skytix.velocity.scheduler.TaskSubscriber;
import com.skytix.velocity.scheduler.VelocitySchedulerConfig;
import com.skytix.velocity.scheduler.VelocitySchedulerHandler;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.io.IOException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.mesos.v1.scheduler.Protos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VelocityMesosScheduler
implements MesosScheduler {
    private static final Logger log = LoggerFactory.getLogger(VelocityMesosScheduler.class);
    private Scheduler mMesosScheduler;
    private VelocitySchedulerHandler mSchedulerHandler;
    private final SubmissionPublisher<VelocityTask> mNewTaskPublisher;
    private boolean mRunning = true;
    private final TaskRepository<VelocityTask> mTaskRepository;
    private final MeterRegistry mMeterRegistry;
    private final VelocitySchedulerConfig mSchedulerConfig;
    private final Future<?> mReconnectTask;
    private final Object mErrorMonitor = new Object();
    private final AtomicReference<RunningState> mSchedulerRunningState = new AtomicReference<RunningState>(RunningState.STOPPED);
    private final ExecutorService mMainThreadPool = Executors.newFixedThreadPool(6);
    private final ExecutorService mTaskGeneralThreadPool = Executors.newFixedThreadPool(5);

    public VelocityMesosScheduler(VelocitySchedulerConfig aSchedulerConfig) {
        this(aSchedulerConfig, (MeterRegistry)new SimpleMeterRegistry());
    }

    public VelocityMesosScheduler(VelocitySchedulerConfig aSchedulerConfig, MeterRegistry aMeterRegistry) {
        this(aSchedulerConfig, aMeterRegistry, new InMemoryTaskRepository(aMeterRegistry, aSchedulerConfig));
    }

    private VelocityMesosScheduler(VelocitySchedulerConfig aSchedulerConfig, MeterRegistry aMeterRegistry, TaskRepository<VelocityTask> aTaskRepository) {
        this.mSchedulerConfig = aSchedulerConfig;
        this.mMeterRegistry = aMeterRegistry;
        this.mTaskRepository = aTaskRepository;
        this.mNewTaskPublisher = new SubmissionPublisher(this.mMainThreadPool, 1000);
        this.mNewTaskPublisher.subscribe(new TaskSubscriber(this.mTaskRepository));
        this.mReconnectTask = this.mTaskGeneralThreadPool.submit(() -> {
            try {
                while (this.mRunning) {
                    Object object = this.mErrorMonitor;
                    synchronized (object) {
                        this.mErrorMonitor.wait();
                        if (this.mSchedulerRunningState.get().equals((Object)RunningState.STOPPED)) {
                            this.handleReconnect();
                        }
                    }
                }
                return;
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        });
        this.handleReconnect();
    }

    private synchronized void handleReconnect() {
        try {
            if (this.mSchedulerRunningState.get().equals((Object)RunningState.STOPPED)) {
                VelocitySchedulerHandler schedulerHandler = this.createSchedulerHandler();
                this.mSchedulerRunningState.set(RunningState.STARTING);
                while (true) {
                    try {
                        this.mMesosScheduler = Scheduler.newScheduler((SchedulerConfig)this.mSchedulerConfig, (SchedulerEventHandler)schedulerHandler);
                        this.mSchedulerHandler = schedulerHandler;
                        this.mSchedulerRunningState.set(RunningState.RUNNING);
                        return;
                    }
                    catch (IOException aE) {
                        log.error(aE.getMessage(), (Throwable)aE);
                        log.error("Unable to connect to master.  Sleeping for 2 seconds before retrying...");
                        Thread.sleep(2000L);
                        if (this.mSchedulerRunningState.get().equals((Object)RunningState.STARTING)) continue;
                    }
                    break;
                }
            }
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    private VelocitySchedulerHandler createSchedulerHandler() {
        return new VelocitySchedulerHandler(this.mTaskRepository, this.mMeterRegistry, this.mSchedulerConfig, this.mMainThreadPool, this.mTaskGeneralThreadPool){

            @Override
            public void onSubscribe(Protos.Event.Subscribed aSubscribeEvent) {
                super.onSubscribe(aSubscribeEvent);
                VelocityMesosScheduler.this.mTaskGeneralThreadPool.submit(() -> {
                    try {
                        int failures = 0;
                        while (VelocityMesosScheduler.this.mSchedulerRunningState.get().equals((Object)RunningState.RUNNING)) {
                            Duration duration = Duration.between(this.getLastHeartbeat() != null ? this.getLastHeartbeat() : LocalDateTime.now(), LocalDateTime.now());
                            int heartbeatInterval = this.getHeartbeatInterval();
                            failures = duration.getSeconds() > (long)(heartbeatInterval + VelocityMesosScheduler.this.mSchedulerConfig.getHeartbeatDelaySeconds()) ? ++failures : 0;
                            if (failures > 1) {
                                log.error("Missed 2 heartbeat intervals.  Triggering reconnection to masters");
                                this.onHeartbeatFail();
                                return;
                            }
                            Thread.sleep(heartbeatInterval * 1000);
                        }
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                });
            }

            @Override
            public void onTerminate(Exception aException) {
                super.onTerminate(aException);
                log.error(String.format("Scheduler terminated: %s. Reconnecting", aException.getMessage()), (Throwable)aException);
                if (VelocityMesosScheduler.this.mSchedulerRunningState.get().equals((Object)RunningState.RUNNING)) {
                    this.notifyErrorMonitor();
                }
            }

            @Override
            public void onDisconnect() {
                super.onDisconnect();
                log.error("Scheduler disconnected from the master. Reconnecting");
                if (VelocityMesosScheduler.this.mSchedulerRunningState.get().equals((Object)RunningState.RUNNING)) {
                    this.notifyErrorMonitor();
                }
            }

            @Override
            public void onHeartbeatFail() {
                try {
                    VelocityMesosScheduler.this.mMesosScheduler.close();
                    this.onDisconnect();
                }
                catch (IOException aE) {
                    log.error(aE.getMessage(), (Throwable)aE);
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            private void notifyErrorMonitor() {
                Object object = VelocityMesosScheduler.this.mErrorMonitor;
                synchronized (object) {
                    VelocityMesosScheduler.this.mSchedulerRunningState.set(RunningState.STOPPED);
                    VelocityMesosScheduler.this.mErrorMonitor.notify();
                }
            }
        };
    }

    @Override
    public VelocityTask launch(TaskDefinition aTaskDefinition) throws VelocityTaskException {
        this.mMeterRegistry.counter("velocity.counter.scheduler.taskLaunch", new String[0]).increment();
        VelocityTask task = VelocityTask.builder().taskDefinition(aTaskDefinition).created(LocalDateTime.now()).build();
        if (this.mTaskRepository.getNumQueuedTasks() == 0) {
            this.mMesosScheduler.getRemote().revive(Collections.emptyList());
        }
        this.mNewTaskPublisher.submit(task);
        return task;
    }

    public int getNumQueuedTasks() {
        return this.mTaskRepository.getNumQueuedTasks();
    }

    public int getNumActiveTasks() {
        return this.mTaskRepository.getNumActiveTasks();
    }

    public Map<String, VelocityTask> getActiveTasksById() {
        List<VelocityTask> activeTasks = this.mTaskRepository.getActiveTasks();
        if (!activeTasks.isEmpty()) {
            return activeTasks.stream().collect(Collectors.toMap(task -> task.getTaskInfo().getTaskId().getValue(), task -> task));
        }
        return Collections.emptyMap();
    }

    public Map<String, VelocityTask> getQueuedTasksById() {
        List<VelocityTask> queuedTasks = this.mTaskRepository.getQueuedTasks();
        if (!queuedTasks.isEmpty()) {
            return queuedTasks.stream().collect(Collectors.toMap(task -> task.getTaskInfo().getTaskId().getValue(), task -> task));
        }
        return Collections.emptyMap();
    }

    public VelocityTask getTaskById(String aTaskId) {
        return this.mTaskRepository.getTaskByTaskId(aTaskId);
    }

    public LocalDateTime getLastHeartbeat() {
        if (this.mSchedulerHandler != null) {
            return this.mSchedulerHandler.getLastHeartbeat();
        }
        return null;
    }

    public void drainAndTeardown() throws Exception {
        this.waitTillEmpty();
        log.info(String.format("Scheduler is empty.  Tearing down framework: %s", this.mSchedulerConfig.getFrameworkID()));
        this.stop();
        this.mMesosScheduler.getRemote().teardown();
    }

    private void stop() {
        this.mSchedulerRunningState.set(RunningState.STOPPED);
        this.mRunning = false;
        this.mReconnectTask.cancel(true);
        this.mNewTaskPublisher.close();
        this.mTaskGeneralThreadPool.shutdown();
        this.mMainThreadPool.shutdown();
    }

    public void drainAndClose() throws Exception {
        this.waitTillEmpty();
        log.info("Scheduler is empty.  Now closing.");
        this.close();
    }

    private void waitTillEmpty() throws InterruptedException {
        int numActiveTasks = this.getNumActiveTasks();
        int numQueuedTasks = this.getNumQueuedTasks();
        while (numActiveTasks > 0 || numQueuedTasks > 0) {
            log.info(String.format("Waiting on task completion.  #Queued: %d, #Active: %d.", numQueuedTasks, numActiveTasks));
            Thread.sleep(2000L);
            numActiveTasks = this.getNumActiveTasks();
            numQueuedTasks = this.getNumQueuedTasks();
        }
    }

    @Override
    public void close() throws IOException {
        this.stop();
        this.mMesosScheduler.getRemote().exit();
    }
}

