/*
 * Decompiled with CFR 0.152.
 */
package discord4j.common.operator;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class RateLimitOperator<T>
implements Function<Publisher<T>, Publisher<T>> {
    private final AtomicInteger tokens;
    private final Duration refillPeriod;
    private final Scheduler delayScheduler;
    private final EmitterProcessor<Integer> tokenChanged;

    public RateLimitOperator(int capacity, Duration refillPeriod) {
        this(capacity, refillPeriod, Schedulers.parallel());
    }

    public RateLimitOperator(int capacity, Duration refillPeriod, Scheduler delayScheduler) {
        this.tokens = new AtomicInteger(capacity);
        this.refillPeriod = refillPeriod;
        this.delayScheduler = delayScheduler;
        this.tokenChanged = EmitterProcessor.create((boolean)false);
        this.tokenChanged.onNext((Object)this.tokens.get());
    }

    @Override
    public Publisher<T> apply(Publisher<T> source) {
        if (source instanceof Mono) {
            return Mono.from(source).flatMapMany(value -> this.availableTokens().take(1L).map(token -> {
                this.acquire();
                Mono.delay((Duration)this.refillPeriod, (Scheduler)this.delayScheduler).subscribe(__ -> this.release());
                return value;
            }));
        }
        if (source instanceof Flux) {
            return Flux.from(source).flatMap(value -> this.availableTokens().take(1L).map(token -> {
                this.acquire();
                Mono.delay((Duration)this.refillPeriod, (Scheduler)this.delayScheduler).subscribe(__ -> this.release());
                return value;
            }));
        }
        throw new IllegalArgumentException("Unsupported publisher: " + source.getClass());
    }

    private void acquire() {
        this.tokenChanged.onNext((Object)this.tokens.decrementAndGet());
    }

    private void release() {
        this.tokenChanged.onNext((Object)this.tokens.incrementAndGet());
    }

    private Flux<Integer> availableTokens() {
        return this.tokenChanged.filter(__ -> this.tokens.get() > 0);
    }
}

