/*
 * Decompiled with CFR 0.152.
 */
package discord4j.common.operator;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.Logger;
import reactor.util.Loggers;

public class RateLimitOperator<T>
implements Function<Publisher<T>, Publisher<T>> {
    private static final Logger log = Loggers.getLogger((String)"discord4j.limiter");
    private static final AtomicInteger ID_GENERATOR = new AtomicInteger();
    private static final Supplier<Scheduler> DEFAULT_PUBLISH_SCHEDULER = () -> Schedulers.newSingle((String)("d4j-limiter-" + ID_GENERATOR.incrementAndGet()), (boolean)true);
    private final AtomicInteger tokens;
    private final Duration refillPeriod;
    private final Scheduler delayScheduler;
    private final ReplayProcessor<Integer> tokenChanged;
    private final FluxSink<Integer> tokenChangedSink;
    private final Scheduler tokenPublishScheduler;

    @Deprecated
    public RateLimitOperator(int capacity, Duration refillPeriod) {
        this(capacity, refillPeriod, Schedulers.parallel());
    }

    public RateLimitOperator(int capacity, Duration refillPeriod, Scheduler delayScheduler) {
        this(capacity, refillPeriod, delayScheduler, DEFAULT_PUBLISH_SCHEDULER.get());
    }

    public RateLimitOperator(int capacity, Duration refillPeriod, Scheduler delayScheduler, Scheduler publishScheduler) {
        this.tokens = new AtomicInteger(capacity);
        this.refillPeriod = refillPeriod;
        this.delayScheduler = delayScheduler;
        this.tokenChanged = ReplayProcessor.cacheLastOrDefault((Object)capacity);
        this.tokenChangedSink = this.tokenChanged.sink(FluxSink.OverflowStrategy.LATEST);
        this.tokenPublishScheduler = publishScheduler;
    }

    private String id() {
        return Integer.toHexString(this.hashCode());
    }

    @Override
    public Publisher<T> apply(Publisher<T> source) {
        return Flux.from(source).flatMap(value -> this.availableTokens().take(1L).doOnSubscribe(s -> {
            if (log.isTraceEnabled()) {
                log.trace("[{}] Subscribed to limiter", new Object[]{this.id()});
            }
        }).map(token -> {
            this.acquire();
            Mono.delay((Duration)this.refillPeriod, (Scheduler)this.delayScheduler).subscribe(__ -> this.release());
            return value;
        }));
    }

    private void acquire() {
        int token = this.tokens.decrementAndGet();
        if (log.isTraceEnabled()) {
            log.trace("[{}] Acquired a token, {} tokens remaining", new Object[]{this.id(), token});
        }
        this.tokenChangedSink.next((Object)token);
    }

    private void release() {
        int token = this.tokens.incrementAndGet();
        if (log.isTraceEnabled()) {
            log.trace("[{}] Released a token, {} tokens remaining", new Object[]{this.id(), token});
        }
        this.tokenChangedSink.next((Object)token);
    }

    private Flux<Integer> availableTokens() {
        return this.tokenChanged.publishOn(this.tokenPublishScheduler).filter(__ -> this.tokens.get() > 0);
    }
}

