/*
 * Decompiled with CFR 0.152.
 */
package discord4j.rest.request;

import discord4j.common.RateLimiter;
import discord4j.common.SimpleBucket;
import discord4j.rest.http.client.DiscordWebClient;
import discord4j.rest.request.BucketKey;
import discord4j.rest.request.DiscordRequest;
import discord4j.rest.request.GlobalRateLimiter;
import discord4j.rest.request.RequestCorrelation;
import discord4j.rest.request.RequestStream;
import discord4j.rest.request.Router;
import discord4j.rest.route.Routes;
import io.netty.handler.codec.http.HttpHeaders;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.netty.http.client.HttpClientResponse;
import reactor.util.Logger;
import reactor.util.Loggers;

public class DefaultRouter
implements Router {
    private static final Logger log = Loggers.getLogger(DefaultRouter.class);
    private static final ResponseHeaderStrategy HEADER_STRATEGY = new ResponseHeaderStrategy();
    private final DiscordWebClient httpClient;
    private final Scheduler responseScheduler;
    private final Scheduler rateLimitScheduler;
    private final GlobalRateLimiter globalRateLimiter = new GlobalRateLimiter();
    private final Map<BucketKey, RequestStream<?>> streamMap = new ConcurrentHashMap();

    public DefaultRouter(DiscordWebClient httpClient) {
        this(httpClient, Schedulers.elastic(), Schedulers.elastic());
    }

    public DefaultRouter(DiscordWebClient httpClient, Scheduler responseScheduler, Scheduler rateLimitScheduler) {
        this.httpClient = httpClient;
        this.responseScheduler = responseScheduler;
        this.rateLimitScheduler = rateLimitScheduler;
    }

    @Override
    public <T> Mono<T> exchange(DiscordRequest<T> request) {
        return Mono.defer(Mono::subscriberContext).flatMap(ctx -> {
            RequestStream stream = this.getStream(request);
            MonoProcessor callback = MonoProcessor.create();
            String shardId = ctx.getOrEmpty((Object)"shard").map(Object::toString).orElse("?");
            stream.push(new RequestCorrelation(request, callback, shardId));
            return callback;
        }).publishOn(this.responseScheduler);
    }

    private <T> RequestStream<T> getStream(DiscordRequest<T> request) {
        return this.streamMap.computeIfAbsent(this.computeBucket(request), k -> {
            if (log.isTraceEnabled()) {
                log.trace("Creating RequestStream with key {} for request: {} -> {}", new Object[]{k, request.getRoute().getUriTemplate(), request.getCompleteUri()});
            }
            RequestStream stream = new RequestStream((BucketKey)k, this.httpClient, this.globalRateLimiter, this.getRateLimitStrategy(request), this.rateLimitScheduler);
            stream.start();
            return stream;
        });
    }

    private <T> BucketKey computeBucket(DiscordRequest<T> request) {
        if (Routes.MESSAGE_DELETE.equals(request.getRoute())) {
            return BucketKey.of("DELETE " + request.getRoute().getUriTemplate(), request.getCompleteUri());
        }
        return BucketKey.of(request.getRoute().getUriTemplate(), request.getCompleteUri());
    }

    private RequestStream.RateLimitStrategy getRateLimitStrategy(DiscordRequest<?> request) {
        if (Routes.REACTION_CREATE.equals(request.getRoute())) {
            return new RateLimiterStrategy((RateLimiter)new SimpleBucket(1L, Duration.ofMillis(250L)));
        }
        return HEADER_STRATEGY;
    }

    static class ResponseHeaderStrategy
    implements RequestStream.RateLimitStrategy {
        ResponseHeaderStrategy() {
        }

        @Override
        public Duration apply(HttpClientResponse response) {
            HttpHeaders headers = response.responseHeaders();
            int remaining = headers.getInt((CharSequence)"X-RateLimit-Remaining", -1);
            if (remaining == 0) {
                long resetAt = Long.parseLong(headers.get("X-RateLimit-Reset"));
                long discordTime = headers.getTimeMillis((CharSequence)"Date") / 1000L;
                return Duration.ofSeconds(resetAt - discordTime);
            }
            return Duration.ZERO;
        }
    }

    static class RateLimiterStrategy
    implements RequestStream.RateLimitStrategy {
        private final RateLimiter rateLimiter;

        RateLimiterStrategy(RateLimiter rateLimiter) {
            this.rateLimiter = rateLimiter;
        }

        @Override
        public Duration apply(HttpClientResponse response) {
            this.rateLimiter.tryConsume(1);
            return Duration.ofMillis(this.rateLimiter.delayMillisToConsume(1L));
        }
    }
}

