/*
 * Decompiled with CFR 0.152.
 */
package discord4j.rest.request;

import discord4j.common.LogUtil;
import discord4j.rest.http.client.ClientException;
import discord4j.rest.http.client.ClientRequest;
import discord4j.rest.http.client.ClientResponse;
import discord4j.rest.http.client.DiscordWebClient;
import discord4j.rest.request.BucketKey;
import discord4j.rest.request.DiscardedRequestException;
import discord4j.rest.request.DiscordWebRequest;
import discord4j.rest.request.GlobalRateLimiter;
import discord4j.rest.request.RateLimitRetryOperator;
import discord4j.rest.request.RateLimitStrategy;
import discord4j.rest.request.RequestCorrelation;
import discord4j.rest.request.RequestQueue;
import discord4j.rest.request.RouterOptions;
import discord4j.rest.response.ResponseFunction;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.function.Function;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.SignalType;
import reactor.core.scheduler.Scheduler;
import reactor.netty.http.client.HttpClientResponse;
import reactor.retry.Retry;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.context.Context;

class RequestStream {
    private static final Logger log = Loggers.getLogger(RequestStream.class);
    private final BucketKey id;
    private final RequestQueue<RequestCorrelation<ClientResponse>> requestQueue;
    private final GlobalRateLimiter globalRateLimiter;
    private final Scheduler timedTaskScheduler;
    private final List<ResponseFunction> responseFunctions;
    private final DiscordWebClient httpClient;
    private final RateLimitStrategy rateLimitStrategy;
    private final RateLimitRetryOperator rateLimitRetryOperator;

    RequestStream(BucketKey id, RouterOptions routerOptions, DiscordWebClient httpClient, RateLimitStrategy rateLimitStrategy) {
        this.id = id;
        this.requestQueue = routerOptions.getRequestQueueFactory().create();
        this.globalRateLimiter = routerOptions.getGlobalRateLimiter();
        this.timedTaskScheduler = routerOptions.getReactorResources().getTimerTaskScheduler();
        this.responseFunctions = routerOptions.getResponseTransformers();
        this.httpClient = httpClient;
        this.rateLimitStrategy = rateLimitStrategy;
        this.rateLimitRetryOperator = new RateLimitRetryOperator(this.timedTaskScheduler);
    }

    private Retry<?> serverErrorRetryFactory() {
        return Retry.onlyIf(ClientException.isRetryContextStatusCode(500, 502, 503, 504)).withBackoffScheduler(this.timedTaskScheduler).exponentialBackoffWithJitter(Duration.ofSeconds(2L), Duration.ofSeconds(30L)).doOnRetry(ctx -> {
            if (log.isDebugEnabled()) {
                log.debug("Retry {} in bucket {} due to {} for {}", new Object[]{ctx.iteration(), this.id.toString(), ctx.exception().toString(), ctx.backoff()});
            }
        });
    }

    void push(RequestCorrelation<ClientResponse> request) {
        this.requestQueue.push(request);
    }

    void start() {
        this.requestQueue.requests().doOnDiscard(RequestCorrelation.class, this::onDiscard).subscribe((CoreSubscriber)new RequestSubscriber(this.rateLimitStrategy));
    }

    private void onDiscard(RequestCorrelation<?> requestCorrelation) {
        requestCorrelation.getResponse().onError((Throwable)new DiscardedRequestException(requestCorrelation.getRequest()));
    }

    private class RequestSubscriber
    extends BaseSubscriber<RequestCorrelation<ClientResponse>> {
        private volatile Duration sleepTime = Duration.ZERO;
        private final Function<ClientResponse, Mono<ClientResponse>> responseFunction = response -> {
            Duration nextReset;
            HttpClientResponse httpResponse = response.getHttpResponse();
            if (log.isDebugEnabled()) {
                Instant requestTimestamp = Instant.ofEpochMilli((Long)httpResponse.currentContext().get((Object)"discord4j.request.timestamp"));
                Duration responseTime = Duration.between(requestTimestamp, Instant.now());
                LogUtil.traceDebug((Logger)log, trace -> LogUtil.format((Context)httpResponse.currentContext(), (String)("Read " + httpResponse.status() + " in " + responseTime + (trace == false ? "" : " with headers: " + httpResponse.responseHeaders()))));
            }
            if (!(nextReset = strategy.apply(httpResponse)).isZero()) {
                if (log.isDebugEnabled()) {
                    log.debug(LogUtil.format((Context)httpResponse.currentContext(), (String)"Delaying next request by {}"), new Object[]{nextReset});
                }
                this.sleepTime = nextReset;
            }
            boolean global = Boolean.parseBoolean(httpResponse.responseHeaders().get("X-RateLimit-Global"));
            Mono action = Mono.empty();
            if (global) {
                long retryAfter = Long.parseLong(httpResponse.responseHeaders().get("Retry-After"));
                Duration fixedBackoff = Duration.ofMillis(retryAfter);
                action = RequestStream.this.globalRateLimiter.rateLimitFor(fixedBackoff).doOnTerminate(() -> log.debug(LogUtil.format((Context)httpResponse.currentContext(), (String)"Globally rate limited for {}"), new Object[]{fixedBackoff}));
            }
            if (httpResponse.status().code() >= 400) {
                return action.then(response.createException().flatMap(Mono::error));
            }
            return action.thenReturn(response);
        };

        public RequestSubscriber(RateLimitStrategy strategy) {
        }

        protected void hookOnSubscribe(Subscription subscription) {
            this.request(1L);
        }

        protected void hookOnNext(RequestCorrelation<ClientResponse> correlation) {
            DiscordWebRequest request = correlation.getRequest();
            ClientRequest clientRequest = new ClientRequest(request);
            MonoProcessor<ClientResponse> callback = correlation.getResponse();
            if (log.isDebugEnabled()) {
                log.debug("[B:{}, R:{}] {}", new Object[]{RequestStream.this.id.toString(), clientRequest.getId(), clientRequest.getDescription()});
            }
            ((MonoProcessor)Mono.just((Object)clientRequest).doOnEach(s -> log.trace(LogUtil.format((Context)s.getContext(), (String)">> {}"), new Object[]{s})).flatMap(req -> RequestStream.this.globalRateLimiter.withLimiter(RequestStream.this.httpClient.exchange((ClientRequest)req).flatMap(this.responseFunction)).next()).doOnEach(s -> log.trace(LogUtil.format((Context)s.getContext(), (String)"<< {}"), new Object[]{s})).subscriberContext(ctx -> ctx.putAll(correlation.getContext()).put((Object)"discord4j.request", (Object)clientRequest.getId()).put((Object)"discord4j.bucket", (Object)RequestStream.this.id.toString())).retryWhen(RequestStream.this.rateLimitRetryOperator::apply).transform(this.getResponseTransformers(request)).retryWhen((Function)RequestStream.this.serverErrorRetryFactory()).doFinally(this::next).checkpoint("Request to " + clientRequest.getDescription() + " [RequestStream]").subscribeWith(callback)).subscribe(null, t -> log.trace("Error while processing {}: {}", new Object[]{request, t}));
        }

        private Function<Mono<ClientResponse>, Mono<ClientResponse>> getResponseTransformers(DiscordWebRequest discordRequest) {
            return RequestStream.this.responseFunctions.stream().map(rt -> rt.transform(discordRequest).andThen(mono -> mono.checkpoint("Apply " + rt + " to " + discordRequest.getDescription() + " [RequestStream]"))).reduce(Function::andThen).orElse(mono -> mono);
        }

        private void next(SignalType signal) {
            Mono timer = this.sleepTime.isZero() ? Mono.just((Object)0L) : Mono.delay((Duration)this.sleepTime, (Scheduler)RequestStream.this.timedTaskScheduler);
            timer.subscribe(l -> {
                if (log.isDebugEnabled()) {
                    log.debug("[B:{}] Ready to consume next request after {}", new Object[]{RequestStream.this.id.toString(), signal});
                }
                this.sleepTime = Duration.ZERO;
                this.request(1L);
            }, t -> log.error("[B:{}] Error while scheduling next request", new Object[]{RequestStream.this.id.toString(), t}));
        }
    }
}

