/*
 * Decompiled with CFR 0.152.
 */
package discord4j.rest.request;

import discord4j.rest.http.client.ClientException;
import discord4j.rest.http.client.ClientRequest;
import discord4j.rest.http.client.DiscordWebClient;
import discord4j.rest.request.BucketKey;
import discord4j.rest.request.DiscordRequest;
import discord4j.rest.request.GlobalRateLimiter;
import discord4j.rest.request.RequestCorrelation;
import discord4j.rest.util.RouteUtils;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpHeaders;
import java.time.Duration;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.logging.Level;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.SignalType;
import reactor.core.scheduler.Scheduler;
import reactor.netty.http.client.HttpClientResponse;
import reactor.retry.BackoffDelay;
import reactor.retry.IterationContext;
import reactor.retry.Retry;
import reactor.retry.RetryContext;
import reactor.util.Logger;
import reactor.util.Loggers;

class RequestStream<T> {
    private final EmitterProcessor<RequestCorrelation<T>> backing = EmitterProcessor.create((boolean)false);
    private final BucketKey id;
    private final Logger log;
    private final DiscordWebClient httpClient;
    private final GlobalRateLimiter globalRateLimiter;
    private final RateLimitStrategy rateLimitStrategy;
    private final Scheduler rateLimitScheduler;

    RequestStream(BucketKey id, DiscordWebClient httpClient, GlobalRateLimiter globalRateLimiter, RateLimitStrategy rateLimitStrategy, Scheduler rateLimitScheduler) {
        this.id = id;
        this.log = Loggers.getLogger((String)("discord4j.rest.traces." + id));
        this.httpClient = httpClient;
        this.globalRateLimiter = globalRateLimiter;
        this.rateLimitStrategy = rateLimitStrategy;
        this.rateLimitScheduler = rateLimitScheduler;
    }

    private Retry<?> rateLimitRetryFactory() {
        return Retry.onlyIf(this::isRateLimitError).backoff(context -> {
            if (this.isRateLimitError((IterationContext<?>)context)) {
                RetryContext ctx = (RetryContext)context;
                ClientException clientException = (ClientException)ctx.exception();
                boolean global = Boolean.valueOf(clientException.getHeaders().get("X-RateLimit-Global"));
                long retryAfter = Long.valueOf(clientException.getHeaders().get("Retry-After"));
                Duration fixedBackoff = Duration.ofMillis(retryAfter);
                if (global) {
                    this.globalRateLimiter.rateLimitFor(fixedBackoff);
                }
                return new BackoffDelay(fixedBackoff);
            }
            return new BackoffDelay(Duration.ZERO);
        }).doOnRetry(ctx -> {
            if (this.log.isTraceEnabled()) {
                this.log.trace("Retry {} due to {} for {}", new Object[]{ctx.iteration(), ctx.exception().toString(), ctx.backoff()});
            }
        });
    }

    private boolean isRateLimitError(IterationContext<?> context) {
        RetryContext ctx = (RetryContext)context;
        Throwable exception = ctx.exception();
        if (exception instanceof ClientException) {
            ClientException clientException = (ClientException)exception;
            return clientException.getStatus().code() == 429;
        }
        return false;
    }

    private Retry<?> serverErrorRetryFactory() {
        return Retry.onlyIf(this::isServerError).exponentialBackoffWithJitter(Duration.ofSeconds(2L), Duration.ofSeconds(30L)).doOnRetry(ctx -> {
            if (this.log.isTraceEnabled()) {
                this.log.trace("Retry {} due to {} for {}", new Object[]{ctx.iteration(), ctx.exception().toString(), ctx.backoff()});
            }
        });
    }

    private boolean isServerError(IterationContext<?> context) {
        RetryContext ctx = (RetryContext)context;
        Throwable exception = ctx.exception();
        if (exception instanceof ClientException) {
            ClientException clientException = (ClientException)exception;
            int code = clientException.getStatus().code();
            return code == 502 || code == 503 || code == 504;
        }
        return false;
    }

    void push(RequestCorrelation<T> request) {
        this.backing.onNext(request);
    }

    void start() {
        this.read().subscribe((Consumer)new Reader(this.rateLimitStrategy), t -> this.log.error("Error while consuming first request", t));
    }

    private Mono<RequestCorrelation<T>> read() {
        return this.backing.next();
    }

    @FunctionalInterface
    static interface RateLimitStrategy
    extends Function<HttpClientResponse, Duration> {
    }

    private class Reader
    implements Consumer<RequestCorrelation<T>> {
        private volatile Duration sleepTime = Duration.ZERO;
        private final Consumer<HttpClientResponse> rateLimitHandler = response -> {
            Duration nextReset;
            if (RequestStream.this.log.isTraceEnabled()) {
                RequestStream.this.log.trace("Read {} with headers: {}", new Object[]{response.status(), response.responseHeaders()});
            }
            if (!(nextReset = (Duration)strategy.apply(response)).isZero()) {
                if (RequestStream.this.log.isTraceEnabled()) {
                    RequestStream.this.log.trace("Delaying next request by {}", new Object[]{nextReset});
                }
                this.sleepTime = nextReset;
            }
        };

        private Reader(RateLimitStrategy strategy) {
        }

        private Mono<ClientRequest> adaptRequest(DiscordRequest<?> req) {
            return Mono.fromCallable(() -> new ClientRequest(req.getRoute().getMethod(), RouteUtils.expandQuery(req.getCompleteUri(), req.getQueryParams()), Optional.ofNullable(req.getHeaders()).map(map -> map.entrySet().stream().reduce(new DefaultHttpHeaders(), (headers, entry) -> {
                String key = (String)entry.getKey();
                ((Set)entry.getValue()).forEach(value -> headers.add(key, value));
                return headers;
            }, HttpHeaders::add)).orElse((HttpHeaders)new DefaultHttpHeaders())));
        }

        @Override
        public void accept(RequestCorrelation<T> correlation) {
            DiscordRequest req = correlation.getRequest();
            MonoProcessor callback = correlation.getResponse();
            String shard = correlation.getShardId();
            Logger logger = this.getTraceLogger(shard);
            if (logger.isTraceEnabled()) {
                logger.trace("Accepting request: {}", new Object[]{req});
            }
            Mono<ClientRequest> request = this.adaptRequest(req);
            Class responseType = req.getRoute().getResponseType();
            RequestStream.this.globalRateLimiter.onComplete().then(request).log("discord4j.rest.request." + RequestStream.this.id + "." + shard, Level.FINEST, new SignalType[0]).flatMap(r -> RequestStream.this.httpClient.exchange((ClientRequest)r, req.getBody(), responseType, this.rateLimitHandler)).retryWhen((Function)RequestStream.this.rateLimitRetryFactory()).retryWhen((Function)RequestStream.this.serverErrorRetryFactory()).log("discord4j.rest.response." + RequestStream.this.id + "." + shard, Level.FINEST, new SignalType[0]).doFinally(signal -> this.next((SignalType)signal, shard)).materialize().subscribe(signal -> {
                if (signal.isOnSubscribe()) {
                    callback.onSubscribe(signal.getSubscription());
                } else if (signal.isOnNext()) {
                    callback.onNext(signal.get());
                } else if (signal.isOnError()) {
                    callback.onError(signal.getThrowable());
                } else if (signal.isOnComplete()) {
                    callback.onComplete();
                }
            });
        }

        private void next(SignalType signal, String shard) {
            Logger logger = this.getTraceLogger(shard);
            Mono.delay((Duration)this.sleepTime, (Scheduler)RequestStream.this.rateLimitScheduler).subscribe(l -> {
                if (logger.isTraceEnabled()) {
                    logger.trace("Ready to consume next request after {}", new Object[]{signal});
                }
                this.sleepTime = Duration.ZERO;
                RequestStream.this.read().subscribe((Consumer)this, t -> logger.error("Error while consuming request", t));
            }, t -> logger.error("Error while scheduling next request", t));
        }

        private Logger getTraceLogger(String shard) {
            return Loggers.getLogger((String)("discord4j.rest.traces." + RequestStream.this.id + "." + shard));
        }
    }
}

