/*
 * Decompiled with CFR 0.152.
 */
package com.skytix.velocity.scheduler;

import com.skytix.velocity.VelocityTaskException;
import com.skytix.velocity.entities.VelocityTask;
import com.skytix.velocity.repository.TaskRepository;
import com.skytix.velocity.scheduler.SchedulerRemoteProvider;
import com.skytix.velocity.scheduler.TaskEventHandler;
import com.skytix.velocity.scheduler.TaskEventUpdateSubscriber;
import com.skytix.velocity.scheduler.TaskUpdateEvent;
import io.micrometer.core.instrument.MeterRegistry;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import org.apache.mesos.v1.Protos;
import org.apache.mesos.v1.scheduler.Protos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UpdateSubscriber
implements Flow.Subscriber<Protos.Event.Update> {
    private static final Logger log = LoggerFactory.getLogger(UpdateSubscriber.class);
    private final TaskRepository<VelocityTask> mTaskRepository;
    private final SubmissionPublisher<TaskUpdateEvent> mEventUpdatePublisher;
    private final SchedulerRemoteProvider mRemote;
    private final TaskEventHandler mDefaultUpdateHandler;
    private final MeterRegistry mMeterRegistry;
    private Flow.Subscription mSubscription;

    public UpdateSubscriber(TaskRepository<VelocityTask> aTaskRepository, SubmissionPublisher<TaskUpdateEvent> aSubmissionPublisher, SchedulerRemoteProvider aRemote, TaskEventHandler aDefaultUpdateHandler, MeterRegistry aMeterRegistry) {
        this.mTaskRepository = aTaskRepository;
        this.mEventUpdatePublisher = aSubmissionPublisher;
        this.mRemote = aRemote;
        this.mDefaultUpdateHandler = aDefaultUpdateHandler;
        this.mMeterRegistry = aMeterRegistry;
        this.mEventUpdatePublisher.subscribe(new TaskEventUpdateSubscriber(aDefaultUpdateHandler));
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.mSubscription = subscription;
        subscription.request(1L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onNext(Protos.Event.Update update) {
        try {
            Protos.TaskStatus updateStatus = update.getStatus();
            VelocityTask task = this.mTaskRepository.getTaskByTaskId(updateStatus.getTaskId().getValue());
            if (task != null) {
                this.mTaskRepository.updateTaskState(task, updateStatus.getState());
                this.acknowledge(updateStatus);
                block3 : switch (updateStatus.getState()) {
                    case TASK_RUNNING: {
                        if (task.isRunning()) break;
                        task.setRunning(true);
                        task.setStartTime(LocalDateTime.now());
                        break;
                    }
                    case TASK_FINISHED: {
                        if (task.isComplete()) break;
                        task.setFinishTime(LocalDateTime.now());
                        this.mTaskRepository.completeTask(task);
                        this.mMeterRegistry.counter("velocity.counter.scheduler.completedTasks", new String[0]).increment();
                        this.mMeterRegistry.timer("velocity.timer.scheduler.taskDuration", new String[0]).record(Duration.between(task.getStartTime(), task.getFinishTime()));
                        if (this.mTaskRepository.getNumQueuedTasks() != 0 || this.mTaskRepository.getNumActiveTasks() != 0) break;
                        log.debug("Scheduler is idle. Suppressing offers");
                        this.mRemote.get().suppress(Collections.emptyList());
                        break;
                    }
                    case TASK_DROPPED: 
                    case TASK_FAILED: 
                    case TASK_ERROR: 
                    case TASK_KILLED: 
                    case TASK_GONE: 
                    case TASK_GONE_BY_OPERATOR: 
                    case TASK_LOST: {
                        switch (updateStatus.getReason()) {
                            case REASON_CONTAINER_LAUNCH_FAILED: 
                            case REASON_TASK_KILLED_DURING_LAUNCH: 
                            case REASON_EXECUTOR_TERMINATED: 
                            case REASON_GC_ERROR: 
                            case REASON_INVALID_OFFERS: {
                                try {
                                    log.debug(String.format("Task %s failed for reason: %s. Retrying...", updateStatus.getTaskId(), updateStatus.getReason()));
                                    this.mMeterRegistry.counter("velocity.counter.scheduler.retriedTasks", new String[0]).increment();
                                    this.mTaskRepository.retryTask(task);
                                }
                                catch (VelocityTaskException aE) {
                                    log.error(aE.getMessage(), (Throwable)aE);
                                }
                                break block3;
                            }
                        }
                        this.mMeterRegistry.counter("velocity.counter.scheduler.failedTasks", new String[0]).increment();
                        log.debug(String.format("Task %s failed for reason: (%s) %s.", updateStatus.getTaskId(), updateStatus.getReason(), updateStatus.getMessage()));
                        task.setFinishTime(LocalDateTime.now());
                        this.mTaskRepository.completeTask(task);
                    }
                }
                TaskEventHandler taskEventHandler = task.getTaskDefinition().getTaskEventHandler();
                this.mEventUpdatePublisher.submit(TaskUpdateEvent.builder().event(update).task(task).build());
            } else {
                this.acknowledge(updateStatus);
                if (this.mDefaultUpdateHandler != null) {
                    this.mDefaultUpdateHandler.onEvent(update);
                }
            }
        }
        catch (Exception aE) {
            log.error(aE.getMessage(), (Throwable)aE);
        }
        finally {
            this.mSubscription.request(1L);
        }
    }

    private void acknowledge(Protos.TaskStatus aStatus) {
        if (aStatus.hasUuid()) {
            this.mRemote.get().acknowledge((Protos.TaskStatusOrBuilder)aStatus);
        }
    }

    @Override
    public void onError(Throwable throwable) {
        log.error(throwable.getMessage(), throwable);
    }

    @Override
    public void onComplete() {
    }
}

