/*
 * Decompiled with CFR 0.152.
 */
package discord4j.gateway;

import discord4j.common.GitProperties;
import discord4j.common.LogUtil;
import discord4j.common.ResettableInterval;
import discord4j.common.close.CloseException;
import discord4j.common.close.CloseStatus;
import discord4j.common.close.DisconnectBehavior;
import discord4j.common.operator.RateLimitOperator;
import discord4j.common.retry.ReconnectContext;
import discord4j.common.retry.ReconnectOptions;
import discord4j.discordjson.json.gateway.Dispatch;
import discord4j.discordjson.json.gateway.Heartbeat;
import discord4j.discordjson.json.gateway.ImmutableHeartbeat;
import discord4j.discordjson.json.gateway.Opcode;
import discord4j.discordjson.json.gateway.Ready;
import discord4j.discordjson.json.gateway.Resumed;
import discord4j.gateway.GatewayClient;
import discord4j.gateway.GatewayConnection;
import discord4j.gateway.GatewayObserver;
import discord4j.gateway.GatewayOptions;
import discord4j.gateway.GatewayReactorResources;
import discord4j.gateway.GatewayWebsocketHandler;
import discord4j.gateway.IdentifyOptions;
import discord4j.gateway.PayloadContext;
import discord4j.gateway.PayloadHandlers;
import discord4j.gateway.SessionInfo;
import discord4j.gateway.json.GatewayPayload;
import discord4j.gateway.limiter.PayloadTransformer;
import discord4j.gateway.payload.PayloadReader;
import discord4j.gateway.payload.PayloadWriter;
import discord4j.gateway.retry.GatewayException;
import discord4j.gateway.retry.GatewayRetrySpec;
import discord4j.gateway.retry.GatewayStateChange;
import discord4j.gateway.retry.ReconnectException;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.util.IllegalReferenceCountException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.ReplayProcessor;
import reactor.function.TupleUtils;
import reactor.netty.ConnectionObserver;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.WebsocketClientSpec;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.context.Context;
import reactor.util.retry.Retry;

public class DefaultGatewayClient
implements GatewayClient {
    private static final Logger log = Loggers.getLogger(DefaultGatewayClient.class);
    private static final Logger senderLog = Loggers.getLogger((String)"discord4j.gateway.protocol.sender");
    private static final Logger receiverLog = Loggers.getLogger((String)"discord4j.gateway.protocol.receiver");
    private final GatewayReactorResources reactorResources;
    private final PayloadReader payloadReader;
    private final PayloadWriter payloadWriter;
    private final ReconnectOptions reconnectOptions;
    private final ReconnectContext reconnectContext;
    private final IdentifyOptions identifyOptions;
    private final String token;
    private final GatewayObserver observer;
    private final PayloadTransformer identifyLimiter;
    private final ResettableInterval heartbeat;
    private final int maxMissedHeartbeatAck;
    private final boolean unpooled;
    private final EmitterProcessor<ByteBuf> receiver = EmitterProcessor.create((boolean)false);
    private final EmitterProcessor<ByteBuf> sender = EmitterProcessor.create((boolean)false);
    private final EmitterProcessor<Dispatch> dispatch = EmitterProcessor.create((boolean)false);
    private final EmitterProcessor<GatewayPayload<?>> outbound = EmitterProcessor.create((boolean)false);
    private final EmitterProcessor<GatewayPayload<Heartbeat>> heartbeats = EmitterProcessor.create((boolean)false);
    private final FluxSink<ByteBuf> receiverSink;
    private final FluxSink<ByteBuf> senderSink;
    private final FluxSink<Dispatch> dispatchSink;
    private final FluxSink<GatewayPayload<?>> outboundSink;
    private final FluxSink<GatewayPayload<Heartbeat>> heartbeatSink;
    private final ReplayProcessor<GatewayConnection.State> state;
    private final FluxSink<GatewayConnection.State> stateChanges;
    private final AtomicInteger sequence = new AtomicInteger(0);
    private final AtomicReference<String> sessionId = new AtomicReference<String>("");
    private final AtomicLong lastSent = new AtomicLong(0L);
    private final AtomicLong lastAck = new AtomicLong(0L);
    private final AtomicInteger missedAck = new AtomicInteger(0);
    private volatile long responseTime = 0L;
    private volatile MonoProcessor<CloseStatus> disconnectNotifier;
    private volatile GatewayWebsocketHandler sessionHandler;
    private static final String OUTBOUND_CAPACITY_PROPERTY = "discord4j.gateway.outbound.capacity";

    public DefaultGatewayClient(GatewayOptions options) {
        this.token = Objects.requireNonNull(options.getToken());
        this.reactorResources = Objects.requireNonNull(options.getReactorResources());
        this.payloadReader = Objects.requireNonNull(options.getPayloadReader());
        this.payloadWriter = Objects.requireNonNull(options.getPayloadWriter());
        this.reconnectOptions = options.getReconnectOptions();
        this.reconnectContext = new ReconnectContext(this.reconnectOptions.getFirstBackoff(), this.reconnectOptions.getMaxBackoffInterval());
        this.identifyOptions = Objects.requireNonNull(options.getIdentifyOptions());
        this.observer = options.getInitialObserver();
        this.identifyLimiter = Objects.requireNonNull(options.getIdentifyLimiter());
        this.maxMissedHeartbeatAck = Math.max(0, options.getMaxMissedHeartbeatAck());
        this.unpooled = options.isUnpooled();
        this.receiverSink = this.receiver.sink(FluxSink.OverflowStrategy.BUFFER);
        this.senderSink = this.sender.sink(FluxSink.OverflowStrategy.ERROR);
        this.dispatchSink = this.dispatch.sink(FluxSink.OverflowStrategy.BUFFER);
        this.outboundSink = this.outbound.sink(FluxSink.OverflowStrategy.ERROR);
        this.heartbeatSink = this.heartbeats.sink(FluxSink.OverflowStrategy.ERROR);
        this.heartbeat = new ResettableInterval(this.reactorResources.getTimerTaskScheduler());
        SessionInfo resumeSession = this.identifyOptions.getResumeSession().orElse(null);
        if (resumeSession != null) {
            this.sequence.set(resumeSession.getSequence());
            this.sessionId.set(resumeSession.getId());
            this.state = ReplayProcessor.cacheLastOrDefault((Object)((Object)GatewayConnection.State.START_RESUMING));
        } else {
            this.state = ReplayProcessor.cacheLastOrDefault((Object)((Object)GatewayConnection.State.START_IDENTIFYING));
        }
        this.stateChanges = this.state.sink(FluxSink.OverflowStrategy.LATEST);
    }

    @Override
    public Mono<Void> execute(String gatewayUrl) {
        return Mono.deferWithContext(context -> {
            this.disconnectNotifier = MonoProcessor.create();
            this.lastAck.set(0L);
            this.lastSent.set(0L);
            this.missedAck.set(0);
            MonoProcessor ping = MonoProcessor.create();
            Mono onConnected = this.state.filter(s -> s == GatewayConnection.State.CONNECTED).next().then();
            Flux heartbeatFlux = this.heartbeats.flatMap(payload -> Flux.from(this.payloadWriter.write((GatewayPayload<?>)payload)));
            Flux identifyFlux = this.outbound.filter(payload -> Opcode.IDENTIFY.equals(payload.getOp())).delayUntil(__ -> ping).flatMap(payload -> Flux.from(this.payloadWriter.write((GatewayPayload<?>)payload))).transform((Function)this.identifyLimiter);
            Flux resumeFlux = this.outbound.filter(payload -> Opcode.RESUME.equals(payload.getOp())).flatMap(payload -> Flux.from(this.payloadWriter.write((GatewayPayload<?>)payload)));
            Flux payloadFlux = this.outbound.filter(DefaultGatewayClient::isNotStartupPayload).delayUntil(__ -> onConnected).flatMap(payload -> Flux.from(this.payloadWriter.write((GatewayPayload<?>)payload))).transform(buf -> Flux.merge((Publisher[])new Publisher[]{buf, this.sender})).transform((Function)new RateLimitOperator(this.outboundLimiterCapacity(), Duration.ofSeconds(60L), this.reactorResources.getTimerTaskScheduler(), this.reactorResources.getPayloadSenderScheduler()));
            Flux outFlux = Flux.merge((Publisher[])new Publisher[]{heartbeatFlux, identifyFlux, resumeFlux, payloadFlux}).doOnNext(buf -> this.logPayload(senderLog, (Context)context, (ByteBuf)buf)).doOnDiscard(ByteBuf.class, DefaultGatewayClient::safeRelease);
            this.sessionHandler = new GatewayWebsocketHandler(this.receiverSink, (Flux<ByteBuf>)outFlux, (Context)context);
            Mono readyHandler = this.dispatch.filter(DefaultGatewayClient::isReadyOrResumed).zipWith((Publisher)this.state.next().repeat()).doOnNext(TupleUtils.consumer((event, currentState) -> {
                ConnectionObserver.State observerState;
                if (currentState == GatewayConnection.State.START_IDENTIFYING || currentState == GatewayConnection.State.START_RESUMING) {
                    log.info(LogUtil.format((Context)context, (String)"Connected to Gateway"));
                    this.dispatchSink.next((Object)GatewayStateChange.connected());
                    observerState = GatewayObserver.CONNECTED;
                } else {
                    log.info(LogUtil.format((Context)context, (String)"Reconnected to Gateway"));
                    this.dispatchSink.next((Object)GatewayStateChange.retrySucceeded(this.reconnectContext.getAttempts()));
                    observerState = GatewayObserver.RETRY_SUCCEEDED;
                }
                this.reconnectContext.reset();
                this.stateChanges.next((Object)GatewayConnection.State.CONNECTED);
                this.notifyObserver(observerState);
            })).then();
            Mono receiverFuture = this.receiver.map(buf -> this.unpooled ? buf : buf.retain()).doOnNext(buf -> this.logPayload(receiverLog, (Context)context, (ByteBuf)buf)).flatMap(this.payloadReader::read).doOnDiscard(ByteBuf.class, DefaultGatewayClient::safeRelease).doOnNext(payload -> {
                if (Opcode.HEARTBEAT_ACK.equals(payload.getOp())) {
                    ping.onComplete();
                }
            }).map(this::updateSequence).map(payload -> new PayloadContext(payload, this.sessionHandler, this, (Context)context)).flatMap(PayloadHandlers::handle).then();
            Mono senderFuture = this.outbound.doOnComplete(this.sessionHandler::close).doOnNext(payload -> {
                if (Opcode.RECONNECT.equals(payload.getOp())) {
                    this.sessionHandler.error(new GatewayException((Context)context, "Reconnecting due to user action"));
                }
            }).then();
            Mono heartbeatHandler = this.heartbeat.ticks().flatMap(t -> {
                long now = System.nanoTime();
                this.lastAck.compareAndSet(0L, now);
                long delay = now - this.lastAck.get();
                if (this.lastSent.get() - this.lastAck.get() > 0L && this.missedAck.incrementAndGet() > this.maxMissedHeartbeatAck) {
                    log.warn(LogUtil.format((Context)context, (String)"Missing heartbeat ACK for {} (tick: {}, seq: {})"), new Object[]{Duration.ofNanos(delay), t, this.sequence.get()});
                    this.sessionHandler.error(new GatewayException((Context)context, "Reconnecting due to zombie or failed connection"));
                    return Mono.empty();
                }
                log.debug(LogUtil.format((Context)context, (String)"Sending heartbeat {} after last ACK"), new Object[]{Duration.ofNanos(delay)});
                this.lastSent.set(now);
                return Mono.just(GatewayPayload.heartbeat((Heartbeat)ImmutableHeartbeat.of((int)this.sequence.get())));
            }).doOnNext(arg_0 -> this.heartbeatSink.next(arg_0)).then();
            Mono httpFuture = ((HttpClient.WebsocketSender)this.reactorResources.getHttpClient().headers(headers -> headers.add((CharSequence)HttpHeaderNames.USER_AGENT, (Object)this.initUserAgent())).observe(this.getObserver((Context)context)).websocket(((WebsocketClientSpec.Builder)WebsocketClientSpec.builder().maxFramePayloadLength(Integer.MAX_VALUE)).build()).uri(gatewayUrl)).handle(this.sessionHandler::handle).subscriberContext(LogUtil.clearContext()).flatMap(t2 -> this.handleClose((DisconnectBehavior)t2.getT1(), (CloseStatus)t2.getT2())).then();
            return Mono.zip((Mono)httpFuture, (Mono)readyHandler, (Mono)receiverFuture, (Mono)senderFuture, (Mono)heartbeatHandler).doOnError(t -> {
                if (t instanceof ReconnectException) {
                    log.info(LogUtil.format((Context)context, (String)"{}"), new Object[]{t.getMessage()});
                } else if (log.isTraceEnabled()) {
                    log.error(LogUtil.format((Context)context, (String)"Gateway client error"), t);
                } else {
                    log.error(LogUtil.format((Context)context, (String)"{}"), new Object[]{t.toString()});
                }
            }).doOnTerminate(() -> ((ResettableInterval)this.heartbeat).stop()).doOnCancel(() -> this.sessionHandler.close()).then();
        }).subscriberContext(ctx -> ctx.put((Object)"discord4j.shard", (Object)this.identifyOptions.getShardInfo().getIndex())).retryWhen(this.retryFactory()).then(Mono.defer(() -> this.disconnectNotifier.then())).doOnSubscribe(s -> {
            if (this.disconnectNotifier != null) {
                throw new IllegalStateException("execute can only be subscribed once");
            }
        });
    }

    private String initUserAgent() {
        Properties properties = GitProperties.getProperties();
        String version = properties.getProperty("git.build.version", "3");
        String url = properties.getProperty("application.url", "https://discord4j.com");
        return "DiscordBot(" + url + ", " + version + ")";
    }

    private void logPayload(Logger logger, Context context, ByteBuf buf) {
        logger.trace(LogUtil.format((Context)context, (String)buf.toString(StandardCharsets.UTF_8).replaceAll("(\"token\": ?\")([A-Za-z0-9._-]*)(\")", "$1hunter2$3")));
    }

    private static boolean isNotStartupPayload(GatewayPayload<?> payload) {
        return !Opcode.IDENTIFY.equals(payload.getOp()) && !Opcode.RESUME.equals(payload.getOp());
    }

    private static boolean isReadyOrResumed(Dispatch d) {
        return Ready.class.isAssignableFrom(d.getClass()) || Resumed.class.isAssignableFrom(d.getClass());
    }

    private GatewayPayload<?> updateSequence(GatewayPayload<?> payload) {
        if (payload.getSequence() != null) {
            this.sequence.set(payload.getSequence());
            this.notifyObserver(GatewayObserver.SEQUENCE);
        }
        return payload;
    }

    private Retry retryFactory() {
        return GatewayRetrySpec.create(this.reconnectOptions, this.reconnectContext).doBeforeRetry(retry -> {
            this.stateChanges.next((Object)retry.nextState());
            long attempt = retry.iteration();
            Duration backoff = retry.nextBackoff();
            log.debug(LogUtil.format((Context)this.getContextFromException(retry.failure()), (String)"{} in {} (attempts: {})"), new Object[]{retry.nextState(), backoff, attempt});
            if (retry.iteration() == 1L) {
                if (retry.nextState() == GatewayConnection.State.RESUMING) {
                    this.dispatchSink.next((Object)GatewayStateChange.retryStarted(backoff));
                    this.notifyObserver(GatewayObserver.RETRY_STARTED);
                } else {
                    this.dispatchSink.next((Object)GatewayStateChange.retryStartedResume(backoff));
                    this.notifyObserver(GatewayObserver.RETRY_RESUME_STARTED);
                }
            } else {
                this.dispatchSink.next((Object)GatewayStateChange.retryFailed(attempt - 1L, backoff));
                this.notifyObserver(GatewayObserver.RETRY_FAILED);
            }
        });
    }

    private Context getContextFromException(Throwable t) {
        if (t instanceof CloseException) {
            return ((CloseException)t).getContext();
        }
        if (t instanceof GatewayException) {
            return ((GatewayException)t).getContext();
        }
        return Context.empty();
    }

    private Mono<CloseStatus> handleClose(DisconnectBehavior sourceBehavior, CloseStatus closeStatus) {
        return Mono.deferWithContext(ctx -> {
            DisconnectBehavior behavior = GatewayRetrySpec.NON_RETRYABLE_STATUS_CODES.contains(closeStatus.getCode()) ? DisconnectBehavior.stop((Throwable)sourceBehavior.getCause()) : sourceBehavior;
            log.debug(LogUtil.format((Context)ctx, (String)"Closing and {} with status {}"), new Object[]{behavior, closeStatus});
            this.stateChanges.next((Object)GatewayConnection.State.DISCONNECTING);
            this.heartbeat.stop();
            if (behavior.getAction() == DisconnectBehavior.Action.STOP_ABRUPTLY) {
                this.dispatchSink.next((Object)GatewayStateChange.disconnectedResume());
                this.notifyObserver(GatewayObserver.DISCONNECTED_RESUME);
            } else if (behavior.getAction() == DisconnectBehavior.Action.STOP) {
                this.dispatchSink.next((Object)GatewayStateChange.disconnected(sourceBehavior, closeStatus));
                this.sequence.set(0);
                this.sessionId.set("");
                this.notifyObserver(GatewayObserver.DISCONNECTED);
            }
            switch (behavior.getAction()) {
                case STOP_ABRUPTLY: 
                case STOP: {
                    this.reconnectContext.clear();
                    this.responseTime = 0L;
                    this.lastSent.set(0L);
                    this.lastAck.set(0L);
                    this.stateChanges.next((Object)GatewayConnection.State.DISCONNECTED);
                    if (behavior.getCause() != null) {
                        return Mono.just((Object)new CloseException(closeStatus, ctx, behavior.getCause())).flatMap(ex -> {
                            this.disconnectNotifier.onError((Throwable)ex);
                            return Mono.error((Throwable)ex);
                        });
                    }
                    return Mono.just((Object)closeStatus).doOnNext(status -> this.disconnectNotifier.onNext((Object)closeStatus));
                }
            }
            return Mono.error((Throwable)new CloseException(closeStatus, ctx, behavior.getCause()));
        });
    }

    private ConnectionObserver getObserver(Context context) {
        return (connection, newState) -> {
            log.debug(LogUtil.format((Context)context, (String)"{} {}"), new Object[]{newState, connection});
            this.notifyObserver(newState);
        };
    }

    private void notifyObserver(ConnectionObserver.State state) {
        this.observer.onStateChange(state, this);
    }

    @Override
    public Mono<Void> close(boolean allowResume) {
        return Mono.defer(() -> {
            if (this.sessionHandler == null || this.disconnectNotifier == null) {
                return Mono.error((Throwable)new IllegalStateException("Gateway client is not active!"));
            }
            if (!this.disconnectNotifier.isTerminated()) {
                if (allowResume) {
                    this.sessionHandler.close(DisconnectBehavior.stopAbruptly(null));
                } else {
                    this.sessionHandler.close(DisconnectBehavior.stop(null));
                }
            }
            return this.disconnectNotifier.then();
        });
    }

    @Override
    public Flux<Dispatch> dispatch() {
        return this.dispatch;
    }

    @Override
    public Flux<GatewayPayload<?>> receiver() {
        return this.receiver(this.payloadReader::read);
    }

    @Override
    public <T> Flux<T> receiver(Function<ByteBuf, Publisher<? extends T>> mapper) {
        return this.receiver.map(ByteBuf::retainedDuplicate).doOnDiscard(ByteBuf.class, DefaultGatewayClient::safeRelease).flatMap(mapper);
    }

    private static void safeRelease(ByteBuf buf) {
        block3: {
            if (buf.refCnt() > 0) {
                try {
                    buf.release();
                }
                catch (IllegalReferenceCountException e) {
                    if (!log.isDebugEnabled()) break block3;
                    log.debug("", (Throwable)e);
                }
            }
        }
    }

    @Override
    public FluxSink<GatewayPayload<?>> sender() {
        return this.outboundSink;
    }

    @Override
    public Mono<Void> sendBuffer(Publisher<ByteBuf> publisher) {
        return Flux.from(publisher).doOnNext(arg_0 -> this.senderSink.next(arg_0)).then();
    }

    @Override
    public int getShardCount() {
        return this.identifyOptions.getShardInfo().getCount();
    }

    @Override
    public String getSessionId() {
        return this.sessionId.get();
    }

    @Override
    public int getSequence() {
        return this.sequence.get();
    }

    @Override
    public Flux<GatewayConnection.State> stateEvents() {
        return this.state;
    }

    @Override
    public Mono<Boolean> isConnected() {
        return this.state.next().filter(s -> s == GatewayConnection.State.CONNECTED).hasElement().defaultIfEmpty((Object)false);
    }

    @Override
    public Duration getResponseTime() {
        return Duration.ofNanos(this.responseTime);
    }

    void ackHeartbeat() {
        this.responseTime = this.lastAck.updateAndGet(x -> System.nanoTime()) - this.lastSent.get();
        this.missedAck.set(0);
    }

    FluxSink<Dispatch> dispatchSink() {
        return this.dispatchSink;
    }

    AtomicInteger sequence() {
        return this.sequence;
    }

    AtomicReference<String> sessionId() {
        return this.sessionId;
    }

    ResettableInterval heartbeat() {
        return this.heartbeat;
    }

    String token() {
        return this.token;
    }

    IdentifyOptions identifyOptions() {
        return this.identifyOptions;
    }

    private int outboundLimiterCapacity() {
        String capacityValue = System.getProperty(OUTBOUND_CAPACITY_PROPERTY);
        if (capacityValue != null) {
            try {
                int capacity = Integer.parseInt(capacityValue);
                log.info("Overriding default outbound limiter capacity: {}", new Object[]{capacity});
                return capacity;
            }
            catch (NumberFormatException e) {
                log.warn("Invalid custom outbound limiter capacity: {}", new Object[]{capacityValue});
            }
        }
        return 115;
    }
}

