/*
 * Decompiled with CFR 0.152.
 */
package discord4j.gateway;

import discord4j.common.RateLimiter;
import discord4j.common.ResettableInterval;
import discord4j.common.SimpleBucket;
import discord4j.common.close.CloseException;
import discord4j.common.close.CloseStatus;
import discord4j.gateway.DiscordWebSocketHandler;
import discord4j.gateway.GatewayClient;
import discord4j.gateway.GatewayObserver;
import discord4j.gateway.IdentifyOptions;
import discord4j.gateway.PayloadContext;
import discord4j.gateway.PayloadHandlers;
import discord4j.gateway.json.GatewayPayload;
import discord4j.gateway.json.Heartbeat;
import discord4j.gateway.json.Opcode;
import discord4j.gateway.json.dispatch.Dispatch;
import discord4j.gateway.json.dispatch.Ready;
import discord4j.gateway.json.dispatch.Resumed;
import discord4j.gateway.payload.PayloadReader;
import discord4j.gateway.payload.PayloadWriter;
import discord4j.gateway.retry.GatewayStateChange;
import discord4j.gateway.retry.RetryContext;
import discord4j.gateway.retry.RetryOptions;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpHeaderNames;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.logging.Level;
import org.reactivestreams.Publisher;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.SignalType;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.netty.ConnectionObserver;
import reactor.netty.http.client.HttpClient;
import reactor.retry.Retry;
import reactor.util.Logger;
import reactor.util.Loggers;

public class DefaultGatewayClient
implements GatewayClient {
    private final Logger log;
    private final HttpClient httpClient;
    private final PayloadReader payloadReader;
    private final PayloadWriter payloadWriter;
    private final RetryOptions retryOptions;
    private final IdentifyOptions identifyOptions;
    private final String token;
    private final GatewayObserver initialObserver;
    private final RateLimiter identifyLimiter;
    private final long outboundDelayMillis;
    private final EmitterProcessor<Dispatch> dispatch = EmitterProcessor.create((boolean)false);
    private final EmitterProcessor<ByteBuf> receiver = EmitterProcessor.create((boolean)false);
    private final EmitterProcessor<GatewayPayload<?>> sender = EmitterProcessor.create((boolean)false);
    private final EmitterProcessor<GatewayPayload<Heartbeat>> heartbeats = EmitterProcessor.create((boolean)false);
    private final FluxSink<Dispatch> dispatchSink;
    private final FluxSink<ByteBuf> receiverSink;
    private final FluxSink<GatewayPayload<?>> senderSink;
    private final FluxSink<GatewayPayload<Heartbeat>> heartbeatSink;
    private final AtomicBoolean connected = new AtomicBoolean(false);
    private final AtomicBoolean resumable = new AtomicBoolean(true);
    private final AtomicInteger sequence = new AtomicInteger(0);
    private final AtomicLong lastSent = new AtomicLong(0L);
    private final AtomicLong lastAck = new AtomicLong(0L);
    private final AtomicLong responseTime = new AtomicLong(0L);
    private final ResettableInterval heartbeat = new ResettableInterval();
    private final AtomicReference<String> sessionId = new AtomicReference<String>("");
    private volatile GatewayObserver observer;
    private volatile MonoProcessor<Void> disconnectNotifier;
    private volatile MonoProcessor<CloseStatus> closeTrigger;
    private static final String OUTBOUND_CAPACITY_PROPERTY = "discord4j.gateway.outbound.capacity";
    private static final String OUTBOUND_DELAY_PROPERTY = "discord4j.gateway.outbound.delay.ms";

    public DefaultGatewayClient(HttpClient httpClient, PayloadReader payloadReader, PayloadWriter payloadWriter, RetryOptions retryOptions, String token, IdentifyOptions identifyOptions, GatewayObserver observer, RateLimiter identifyLimiter) {
        this.httpClient = Objects.requireNonNull(httpClient);
        this.payloadReader = Objects.requireNonNull(payloadReader);
        this.payloadWriter = Objects.requireNonNull(payloadWriter);
        this.retryOptions = Objects.requireNonNull(retryOptions);
        this.token = Objects.requireNonNull(token);
        this.identifyOptions = Objects.requireNonNull(identifyOptions);
        this.initialObserver = Objects.requireNonNull(observer);
        this.identifyLimiter = Objects.requireNonNull(identifyLimiter);
        this.outboundDelayMillis = this.outboundDelayMillis();
        this.dispatchSink = this.dispatch.sink(FluxSink.OverflowStrategy.LATEST);
        this.receiverSink = this.receiver.sink(FluxSink.OverflowStrategy.LATEST);
        this.senderSink = this.sender.sink(FluxSink.OverflowStrategy.LATEST);
        this.heartbeatSink = this.heartbeats.sink(FluxSink.OverflowStrategy.LATEST);
        this.log = this.shardLogger(".client");
    }

    @Override
    public Mono<Void> execute(String gatewayUrl) {
        return this.execute(gatewayUrl, GatewayObserver.NOOP_LISTENER);
    }

    @Override
    public Mono<Void> execute(String gatewayUrl, GatewayObserver additionalObserver) {
        return Mono.defer(() -> {
            this.disconnectNotifier = MonoProcessor.create();
            this.closeTrigger = MonoProcessor.create();
            this.observer = this.initialObserver.then(additionalObserver);
            SimpleBucket limiter = new SimpleBucket(this.outboundLimiterCapacity(), Duration.ofSeconds(60L));
            Flux outbound = Flux.merge((Publisher[])new Publisher[]{this.heartbeats, this.sender.concatMap(arg_0 -> this.lambda$null$0((RateLimiter)limiter, arg_0), 1)}).log(this.shardLogger(".outbound"), Level.FINE, false, new SignalType[0]).flatMap(payload -> Flux.from(this.payloadWriter.write((GatewayPayload<?>)payload)));
            int shard = this.identifyOptions.getShardIndex();
            DiscordWebSocketHandler handler = new DiscordWebSocketHandler(this.receiverSink, (Flux<ByteBuf>)outbound, this.closeTrigger, shard);
            if (this.identifyOptions.getResumeSequence() != null) {
                this.sequence.set(this.identifyOptions.getResumeSequence());
                this.sessionId.set(this.identifyOptions.getResumeSessionId());
            } else {
                this.resumable.set(false);
            }
            this.lastAck.set(System.currentTimeMillis());
            Mono readyHandler = this.dispatch.filter(DefaultGatewayClient::isReadyOrResume).flatMap(event -> {
                ConnectionObserver.State state;
                this.connected.compareAndSet(false, true);
                RetryContext retryContext = this.retryOptions.getRetryContext();
                if (retryContext.getResetCount() == 0) {
                    this.log.info("Connected to Gateway");
                    this.dispatchSink.next((Object)GatewayStateChange.connected());
                    state = GatewayObserver.CONNECTED;
                } else {
                    this.log.info("Reconnected to Gateway");
                    this.dispatchSink.next((Object)GatewayStateChange.retrySucceeded(retryContext.getAttempts()));
                    state = GatewayObserver.RETRY_SUCCEEDED;
                }
                retryContext.reset();
                this.identifyOptions.setResumeSessionId(this.sessionId.get());
                this.resumable.set(true);
                this.notifyObserver(state, this.identifyOptions);
                return Mono.just((Object)event);
            }).then().log(this.shardLogger(".zip.ready"), Level.FINEST, false, new SignalType[0]);
            Mono receiverFuture = this.receiver.flatMap(this.payloadReader::read).log(this.shardLogger(".inbound"), Level.FINE, false, new SignalType[0]).map(this::updateSequence).map(payload -> this.payloadContext((GatewayPayload<?>)payload, handler, this)).doOnNext(PayloadHandlers::handle).then().log(this.shardLogger(".zip.receiver"), Level.FINEST, false, new SignalType[0]);
            Mono senderFuture = this.sender.doOnComplete(handler::close).doOnNext(payload -> {
                if (Opcode.RECONNECT.equals(payload.getOp())) {
                    handler.error(new RuntimeException("Reconnecting due to user action"));
                }
            }).then().log(this.shardLogger(".zip.sender"), Level.FINEST, false, new SignalType[0]);
            Mono heartbeatHandler = this.heartbeat.ticks().flatMap(t -> {
                long delay = System.currentTimeMillis() - this.lastAck.get();
                if (delay > this.heartbeat.getPeriod().toMillis() + this.getResponseTime()) {
                    this.log.warn("Missing heartbeat ACK for {} ms", new Object[]{delay});
                    handler.error(new RuntimeException("Reconnecting due to zombie or failed connection"));
                    return Mono.empty();
                }
                this.log.debug("Sending heartbeat {} ms after last ACK", new Object[]{delay});
                this.lastSent.set(System.currentTimeMillis());
                return Mono.just(GatewayPayload.heartbeat(new Heartbeat(this.sequence.get())));
            }).doOnNext(arg_0 -> this.heartbeatSink.next(arg_0)).then().log(this.shardLogger(".zip.heartbeat"), Level.FINEST, false, new SignalType[0]);
            Mono httpFuture = ((HttpClient.WebsocketSender)this.httpClient.headers(headers -> headers.add((CharSequence)HttpHeaderNames.USER_AGENT, (Object)"DiscordBot(https://discord4j.com, 3)")).observe(this.getObserver()).websocket(Integer.MAX_VALUE).uri(gatewayUrl)).handle(handler::handle).doOnTerminate(() -> ((ResettableInterval)this.heartbeat).stop()).then().log(this.shardLogger(".zip.http"), Level.FINEST, false, new SignalType[0]);
            return Mono.zip((Mono)httpFuture, (Mono)readyHandler, (Mono)receiverFuture, (Mono)senderFuture, (Mono)heartbeatHandler).doOnError(this.logReconnectReason()).then();
        }).retryWhen(this.retryFactory()).doOnCancel(() -> this.closeTrigger.onNext((Object)CloseStatus.NORMAL_CLOSE)).then(Mono.defer(() -> this.disconnectNotifier));
    }

    private Publisher<? extends GatewayPayload<?>> throttle(GatewayPayload<?> payload, RateLimiter outboundLimiter) {
        RateLimiter limiter = Opcode.IDENTIFY.equals(payload.getOp()) ? this.identifyLimiter : outboundLimiter;
        return Mono.defer(() -> Mono.delay((Duration)Duration.ofMillis(this.calculateDelayMillis(limiter)), (Scheduler)Schedulers.single())).map(tick -> limiter.tryConsume(1)).flatMap(consumed -> {
            if (!consumed.booleanValue()) {
                return Mono.error((Throwable)new RuntimeException());
            }
            return Mono.just((Object)payload);
        }).doOnError(t -> this.shardLogger("").warn("Could not send OP {} payload, retrying", new Object[]{payload.getOp()})).retry();
    }

    private long calculateDelayMillis(RateLimiter limiter) {
        return limiter.delayMillisToConsume(1L) + this.outboundDelayMillis;
    }

    private Logger shardLogger(String gateway) {
        return Loggers.getLogger((String)("discord4j.gateway" + gateway + "." + this.identifyOptions.getShardIndex()));
    }

    private static boolean isReadyOrResume(Dispatch d) {
        return Ready.class.isAssignableFrom(d.getClass()) || Resumed.class.isAssignableFrom(d.getClass());
    }

    private GatewayPayload<?> updateSequence(GatewayPayload<?> payload) {
        if (payload.getSequence() != null) {
            this.sequence.set(payload.getSequence());
            this.identifyOptions.setResumeSequence(this.sequence.get());
            this.notifyObserver(GatewayObserver.SEQUENCE, this.identifyOptions);
        }
        return payload;
    }

    private PayloadContext<?> payloadContext(GatewayPayload<?> payload, DiscordWebSocketHandler handler, DefaultGatewayClient client) {
        return new PayloadContext(payload, handler, client);
    }

    private Retry<RetryContext> retryFactory() {
        return Retry.any().withApplicationContext((Object)this.retryOptions.getRetryContext()).withBackoffScheduler(this.retryOptions.getBackoffScheduler()).backoff(this.retryOptions.getBackoff()).jitter(this.retryOptions.getJitter()).retryMax((long)this.retryOptions.getMaxRetries()).doOnRetry(context -> {
            this.connected.compareAndSet(true, false);
            int attempt = ((RetryContext)context.applicationContext()).getAttempts();
            long backoff = context.backoff().toMillis();
            this.log.info("Retry attempt {} in {} ms", new Object[]{attempt, backoff});
            if (attempt == 1) {
                this.dispatchSink.next((Object)GatewayStateChange.retryStarted(Duration.ofMillis(backoff)));
                if (!this.resumable.get() || !this.isResumableError(context.exception())) {
                    this.resumable.compareAndSet(true, false);
                    this.notifyObserver(GatewayObserver.RETRY_STARTED, this.identifyOptions);
                } else {
                    this.notifyObserver(GatewayObserver.RETRY_RESUME_STARTED, this.identifyOptions);
                }
            } else {
                this.dispatchSink.next((Object)GatewayStateChange.retryFailed(attempt - 1, Duration.ofMillis(backoff)));
                this.notifyObserver(GatewayObserver.RETRY_FAILED, this.identifyOptions);
                this.resumable.set(false);
            }
            ((RetryContext)context.applicationContext()).next();
        });
    }

    private boolean isResumableError(Throwable t) {
        if (t instanceof CloseException) {
            CloseException closeException = (CloseException)t;
            return closeException.getCode() < 4000;
        }
        return true;
    }

    private Consumer<Throwable> logReconnectReason() {
        return t -> {
            if (t instanceof CloseException && this.isResumableError((Throwable)t)) {
                this.log.error("Gateway client error: {}", new Object[]{t.toString()});
            } else {
                this.log.error("Gateway client error", t);
            }
        };
    }

    private ConnectionObserver getObserver() {
        return (connection, newState) -> {
            this.log.debug("{} {}", new Object[]{newState, connection});
            if (this.closeTrigger.isTerminated() && newState == ConnectionObserver.State.DISCONNECTING) {
                this.log.info("Disconnected from Gateway");
                this.retryOptions.getRetryContext().clear();
                this.connected.compareAndSet(true, false);
                this.resumable.set(false);
                this.sequence.set(0);
                this.lastSent.set(0L);
                this.lastAck.set(0L);
                this.responseTime.set(0L);
                this.sessionId.set("");
                this.dispatchSink.next((Object)GatewayStateChange.disconnected());
                this.notifyObserver(GatewayObserver.DISCONNECTED, this.identifyOptions);
                this.disconnectNotifier.onComplete();
            }
            this.notifyObserver(newState, this.identifyOptions);
        };
    }

    private void notifyObserver(ConnectionObserver.State state, IdentifyOptions options) {
        this.observer.onStateChange(state, options);
    }

    @Override
    public Mono<Void> close(boolean reconnect) {
        if (reconnect) {
            this.resumable.set(false);
            this.senderSink.next(new GatewayPayload<Object>(Opcode.RECONNECT, null, null, null));
            return Mono.empty();
        }
        this.closeTrigger.onNext((Object)CloseStatus.NORMAL_CLOSE);
        if (this.disconnectNotifier == null) {
            return Mono.error((Throwable)new IllegalStateException("Gateway client is not active!"));
        }
        return this.disconnectNotifier.log(this.shardLogger(".disconnect"), Level.FINE, false, new SignalType[0]);
    }

    @Override
    public Flux<Dispatch> dispatch() {
        return this.dispatch;
    }

    @Override
    public Flux<GatewayPayload<?>> receiver() {
        return this.receiver.flatMap(this.payloadReader::read);
    }

    @Override
    public FluxSink<GatewayPayload<?>> sender() {
        return this.senderSink;
    }

    @Override
    public String getSessionId() {
        return this.sessionId.get();
    }

    @Override
    public int getSequence() {
        return this.sequence.get();
    }

    @Override
    public boolean isConnected() {
        return this.connected.get();
    }

    @Override
    public long getResponseTime() {
        return this.responseTime.get();
    }

    void ackHeartbeat() {
        this.lastAck.set(System.currentTimeMillis());
        this.responseTime.set(this.lastAck.get() - this.lastSent.get());
    }

    FluxSink<Dispatch> dispatchSink() {
        return this.dispatchSink;
    }

    AtomicInteger sequence() {
        return this.sequence;
    }

    AtomicReference<String> sessionId() {
        return this.sessionId;
    }

    ResettableInterval heartbeat() {
        return this.heartbeat;
    }

    String token() {
        return this.token;
    }

    AtomicBoolean resumable() {
        return this.resumable;
    }

    IdentifyOptions identifyOptions() {
        return this.identifyOptions;
    }

    RetryOptions retryOptions() {
        return this.retryOptions;
    }

    private long outboundLimiterCapacity() {
        String capacityValue = System.getProperty(OUTBOUND_CAPACITY_PROPERTY);
        if (capacityValue != null) {
            try {
                long capacity = Long.valueOf(capacityValue);
                this.shardLogger("").info("Overriding default outbound limiter capacity: {}", new Object[]{capacity});
            }
            catch (NumberFormatException e) {
                this.shardLogger("").warn("Invalid custom outbound limiter capacity: {}", new Object[]{capacityValue});
            }
        }
        return 115L;
    }

    private long outboundDelayMillis() {
        String delayValue = System.getProperty(OUTBOUND_DELAY_PROPERTY);
        if (delayValue != null) {
            try {
                long value = Long.valueOf(delayValue);
                this.shardLogger("").info("Overriding default outbound delay: {}", new Object[]{value});
            }
            catch (NumberFormatException e) {
                this.shardLogger("").warn("Invalid custom outbound delay: {}", new Object[]{delayValue});
            }
        }
        return 10L;
    }

    private /* synthetic */ Publisher lambda$null$0(RateLimiter limiter, GatewayPayload payload) {
        return this.throttle(payload, limiter);
    }
}

