/*
 * Decompiled with CFR 0.152.
 */
package discord4j.gateway;

import discord4j.common.LogUtil;
import discord4j.common.close.CloseStatus;
import discord4j.common.close.DisconnectBehavior;
import discord4j.gateway.ZlibDecompressor;
import discord4j.gateway.retry.GatewayException;
import discord4j.gateway.retry.PartialDisconnectException;
import discord4j.gateway.retry.ReconnectException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.DefaultByteBufHolder;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.IllegalReferenceCountException;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.netty.http.websocket.WebsocketInbound;
import reactor.netty.http.websocket.WebsocketOutbound;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.context.Context;
import reactor.util.function.Tuple2;

public class GatewayWebsocketHandler {
    private static final Logger log = Loggers.getLogger(GatewayWebsocketHandler.class);
    private final FluxSink<ByteBuf> inbound;
    private final Flux<ByteBuf> outbound;
    private final MonoProcessor<DisconnectBehavior> sessionClose;
    private final Context context;
    private final boolean unpooled;

    public GatewayWebsocketHandler(FluxSink<ByteBuf> inbound, Flux<ByteBuf> outbound, Context context) {
        this(inbound, outbound, context, false);
    }

    public GatewayWebsocketHandler(FluxSink<ByteBuf> inbound, Flux<ByteBuf> outbound, Context context, boolean unpooled) {
        this.inbound = inbound;
        this.outbound = outbound;
        this.sessionClose = MonoProcessor.create();
        this.context = context;
        this.unpooled = unpooled;
    }

    public Mono<Tuple2<DisconnectBehavior, CloseStatus>> handle(WebsocketInbound in, WebsocketOutbound out) {
        ZlibDecompressor decompressor = new ZlibDecompressor(out.alloc());
        Mono outboundClose = this.sessionClose.doOnNext(behavior -> log.debug(LogUtil.format((Context)this.context, (String)"Closing session with behavior: {}"), new Object[]{behavior})).flatMap(behavior -> {
            switch (behavior.getAction()) {
                case RETRY_ABRUPTLY: 
                case STOP_ABRUPTLY: {
                    return Mono.error((Throwable)(behavior.getCause() != null ? behavior.getCause() : new PartialDisconnectException(this.context)));
                }
            }
            return Mono.just((Object)CloseStatus.NORMAL_CLOSE);
        }).map(status -> new CloseWebSocketFrame(status.getCode(), (String)status.getReason().orElse(null)));
        Mono inboundClose = in.receiveCloseStatus().map(status -> new CloseStatus(status.code(), status.reasonText())).doOnNext(status -> log.debug(LogUtil.format((Context)this.context, (String)"Received close status: {}"), new Object[]{status})).doOnNext(status -> this.close(DisconnectBehavior.retryAbruptly((Throwable)new GatewayException(this.context, "Inbound close status"))));
        Mono outboundEvents = out.sendObject((Publisher)Flux.merge((Publisher[])new Publisher[]{outboundClose, this.outbound.map(TextWebSocketFrame::new)})).then();
        in.withConnection(c -> c.onDispose(() -> log.debug(LogUtil.format((Context)this.context, (String)"Connection disposed"))));
        Mono inboundEvents = in.aggregateFrames().receiveFrames().map(DefaultByteBufHolder::content).transformDeferred(decompressor::completeMessages).doOnNext(arg_0 -> this.inbound.next(arg_0)).doOnNext(this::safeRelease).then();
        return Mono.zip((Mono)outboundEvents, (Mono)inboundEvents).doOnError(this::error).onErrorResume(t -> t.getCause() instanceof GatewayException, t -> Mono.empty()).then(Mono.zip(this.sessionClose, (Mono)inboundClose.defaultIfEmpty((Object)CloseStatus.ABNORMAL_CLOSE)));
    }

    public void close() {
        this.close(DisconnectBehavior.retry(null));
    }

    public void close(DisconnectBehavior behavior) {
        this.sessionClose.onNext((Object)behavior);
    }

    public void error(Throwable error) {
        if (!(error instanceof ReconnectException)) {
            log.info(LogUtil.format((Context)this.context, (String)"Triggering error sequence: {}"), new Object[]{error.toString()});
        }
        this.close(DisconnectBehavior.retryAbruptly((Throwable)error));
    }

    private void safeRelease(ByteBuf buf) {
        block3: {
            if (!this.unpooled && buf.refCnt() > 0) {
                try {
                    buf.release();
                }
                catch (IllegalReferenceCountException e) {
                    if (!log.isDebugEnabled()) break block3;
                    log.debug("", (Throwable)e);
                }
            }
        }
    }
}

