/*
 * Decompiled with CFR 0.152.
 */
package discord4j.rest.request;

import discord4j.rest.http.client.ClientException;
import discord4j.rest.http.client.ClientRequest;
import discord4j.rest.http.client.DiscordWebClient;
import discord4j.rest.request.BucketKey;
import discord4j.rest.request.DiscordRequest;
import discord4j.rest.request.GlobalRateLimiter;
import discord4j.rest.request.RequestCorrelation;
import discord4j.rest.request.RouterOptions;
import java.time.Duration;
import java.time.Instant;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.logging.Level;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.SignalType;
import reactor.core.scheduler.Scheduler;
import reactor.netty.http.client.HttpClientResponse;
import reactor.retry.BackoffDelay;
import reactor.retry.IterationContext;
import reactor.retry.Retry;
import reactor.retry.RetryContext;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;

class RequestStream<T> {
    private final EmitterProcessor<RequestCorrelation<T>> backing = EmitterProcessor.create((boolean)false);
    private final BucketKey id;
    private final Logger log;
    private final DiscordWebClient httpClient;
    private final GlobalRateLimiter globalRateLimiter;
    private final RateLimitStrategy rateLimitStrategy;
    private final Scheduler rateLimitScheduler;
    private final RouterOptions routerOptions;

    RequestStream(BucketKey id, DiscordWebClient httpClient, GlobalRateLimiter globalRateLimiter, RateLimitStrategy rateLimitStrategy, Scheduler rateLimitScheduler, RouterOptions routerOptions) {
        this.id = id;
        this.log = Loggers.getLogger((String)("discord4j.rest.traces." + id));
        this.httpClient = httpClient;
        this.globalRateLimiter = globalRateLimiter;
        this.rateLimitStrategy = rateLimitStrategy;
        this.rateLimitScheduler = rateLimitScheduler;
        this.routerOptions = routerOptions;
    }

    private Retry<?> rateLimitRetryFactory() {
        return Retry.onlyIf(this::isRateLimitError).backoff(context -> {
            if (this.isRateLimitError((IterationContext<?>)context)) {
                RetryContext ctx = (RetryContext)context;
                ClientException clientException = (ClientException)ctx.exception();
                boolean global = Boolean.parseBoolean(clientException.getHeaders().get("X-RateLimit-Global"));
                long retryAfter = Long.parseLong(clientException.getHeaders().get("Retry-After"));
                Duration fixedBackoff = Duration.ofMillis(retryAfter);
                if (global) {
                    Duration remaining = this.globalRateLimiter.getRemaining();
                    if (!remaining.isNegative() && !remaining.isZero()) {
                        return new BackoffDelay(remaining);
                    }
                    this.log.debug("Globally rate limited for {}", new Object[]{fixedBackoff});
                    this.globalRateLimiter.rateLimitFor(fixedBackoff);
                }
                return new BackoffDelay(fixedBackoff);
            }
            return new BackoffDelay(Duration.ZERO);
        }).doOnRetry(ctx -> {
            if (this.log.isTraceEnabled()) {
                this.log.trace("Retry {} due to {} for {}", new Object[]{ctx.iteration(), ctx.exception().toString(), ctx.backoff()});
            }
        });
    }

    private boolean isRateLimitError(IterationContext<?> context) {
        RetryContext ctx = (RetryContext)context;
        Throwable exception = ctx.exception();
        if (exception instanceof ClientException) {
            ClientException clientException = (ClientException)exception;
            return clientException.getStatus().code() == 429;
        }
        return false;
    }

    private Retry<?> serverErrorRetryFactory() {
        return Retry.onlyIf(ClientException.isRetryContextStatusCode(500, 502, 503, 504)).exponentialBackoffWithJitter(Duration.ofSeconds(2L), Duration.ofSeconds(30L)).doOnRetry(ctx -> {
            if (this.log.isTraceEnabled()) {
                this.log.trace("Retry {} due to {} for {}", new Object[]{ctx.iteration(), ctx.exception().toString(), ctx.backoff()});
            }
        });
    }

    void push(RequestCorrelation<T> request) {
        this.backing.onNext(request);
    }

    void start() {
        this.backing.subscribe((CoreSubscriber)new RequestSubscriber(this.rateLimitStrategy));
    }

    @FunctionalInterface
    static interface RateLimitStrategy
    extends Function<HttpClientResponse, Duration> {
    }

    private class RequestSubscriber
    extends BaseSubscriber<RequestCorrelation<T>> {
        private volatile Duration sleepTime = Duration.ZERO;
        private final Consumer<HttpClientResponse> rateLimitHandler = response -> {
            Duration nextReset;
            if (RequestStream.this.log.isTraceEnabled()) {
                Instant requestTimestamp = Instant.ofEpochMilli((Long)response.currentContext().get((Object)"requestTimestamp"));
                Duration responseTime = Duration.between(requestTimestamp, Instant.now());
                RequestStream.this.log.trace("Read {} in {} with headers: {}", new Object[]{response.status(), responseTime, response.responseHeaders()});
            }
            if (!(nextReset = (Duration)strategy.apply(response)).isZero()) {
                if (RequestStream.this.log.isTraceEnabled()) {
                    RequestStream.this.log.trace("Delaying next request by {}", new Object[]{nextReset});
                }
                this.sleepTime = nextReset;
            }
        };

        public RequestSubscriber(RateLimitStrategy strategy) {
        }

        protected void hookOnSubscribe(Subscription subscription) {
            this.request(1L);
        }

        protected void hookOnNext(RequestCorrelation<T> correlation) {
            DiscordRequest request = correlation.getRequest();
            MonoProcessor callback = correlation.getResponse();
            String shard = correlation.getShardId();
            Logger traceLog = this.getLogger("traces", shard);
            if (traceLog.isTraceEnabled()) {
                traceLog.trace("Accepting request: {}", new Object[]{request});
            }
            Logger requestLog = this.getLogger("request", shard);
            Logger responseLog = this.getLogger("response", shard);
            Class responseType = request.getRoute().getResponseType();
            RequestStream.this.globalRateLimiter.withLimiter(Mono.fromCallable(() -> new ClientRequest(request)).log(requestLog, Level.FINEST, false, new SignalType[0]).flatMap(r -> RequestStream.this.httpClient.exchange((ClientRequest)r, request.getBody(), responseType, this.rateLimitHandler)).retryWhen((Function)RequestStream.this.rateLimitRetryFactory()).transform(this.getResponseTransformers(request)).retryWhen((Function)RequestStream.this.serverErrorRetryFactory()).log(responseLog, Level.FINEST, false, new SignalType[0]).doFinally(signal -> this.next((SignalType)signal, traceLog))).materialize().subscribe(signal -> {
                if (signal.isOnSubscribe()) {
                    callback.onSubscribe(signal.getSubscription());
                } else if (signal.isOnNext()) {
                    callback.onNext(signal.get());
                } else if (signal.isOnError()) {
                    callback.onError(signal.getThrowable());
                } else if (signal.isOnComplete()) {
                    callback.onComplete();
                }
            });
        }

        private Function<Mono<T>, Mono<T>> getResponseTransformers(DiscordRequest<T> discordRequest) {
            return RequestStream.this.routerOptions.getResponseTransformers().stream().map(rt -> rt.transform(discordRequest)).reduce(Function::andThen).orElse(mono -> mono);
        }

        private void next(SignalType signal, Logger logger) {
            Mono.delay((Duration)this.sleepTime, (Scheduler)RequestStream.this.rateLimitScheduler).subscribe(l -> {
                if (logger.isTraceEnabled()) {
                    logger.trace("Ready to consume next request after {}", new Object[]{signal});
                }
                this.sleepTime = Duration.ZERO;
                this.request(1L);
            }, t -> logger.error("Error while scheduling next request", t));
        }

        private Logger getLogger(String path, @Nullable String shard) {
            String shardPath = shard == null ? "" : "." + shard;
            return Loggers.getLogger((String)("discord4j.rest." + path + "." + RequestStream.this.id + shardPath));
        }
    }
}

