/*
 * Decompiled with CFR 0.152.
 */
package discord4j.gateway;

import discord4j.common.close.CloseException;
import discord4j.common.close.CloseHandlerAdapter;
import discord4j.common.close.CloseStatus;
import discord4j.gateway.ZlibDecompressor;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.DefaultByteBufHolder;
import io.netty.channel.ChannelHandler;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.SignalType;
import reactor.netty.NettyPipeline;
import reactor.netty.http.websocket.WebsocketInbound;
import reactor.netty.http.websocket.WebsocketOutbound;
import reactor.util.Logger;
import reactor.util.Loggers;

public class DiscordWebSocketHandler {
    private final FluxSink<ByteBuf> inbound;
    private final Flux<ByteBuf> outbound;
    private final MonoProcessor<CloseStatus> closeTrigger;
    private final MonoProcessor<Void> completionNotifier = MonoProcessor.create();
    private final ZlibDecompressor decompressor = new ZlibDecompressor();
    private final Logger log;
    private final int shardIndex;
    private static final String HANDLER = "client.last.closeHandler";

    public DiscordWebSocketHandler(FluxSink<ByteBuf> inbound, Flux<ByteBuf> outbound, MonoProcessor<CloseStatus> closeTrigger, int shardIndex) {
        this.inbound = inbound;
        this.outbound = outbound;
        this.closeTrigger = closeTrigger;
        this.shardIndex = shardIndex;
        this.log = this.shardLogger("");
    }

    public Mono<Void> handle(WebsocketInbound in, WebsocketOutbound out) {
        AtomicReference reason = new AtomicReference();
        in.withConnection(connection -> connection.addHandlerLast(HANDLER, (ChannelHandler)new CloseHandlerAdapter(reason, this.log)));
        Mono completionFuture = this.completionNotifier.log(this.shardLogger(".zip"), Level.FINEST, false, new SignalType[0]);
        Mono closeFuture = this.closeTrigger.map(status -> new CloseWebSocketFrame(status.getCode(), status.getReason()));
        Mono outboundEvents = out.options(NettyPipeline.SendOptions::flushOnEach).sendObject((Publisher)Flux.merge((Publisher[])new Publisher[]{closeFuture, this.outbound.map(TextWebSocketFrame::new)})).then().log(this.shardLogger(".zip.out"), Level.FINEST, false, new SignalType[0]);
        Mono inboundEvents = in.aggregateFrames().receiveFrames().map(DefaultByteBufHolder::content).compose(this.decompressor::completeMessages).doOnNext(arg_0 -> this.inbound.next(arg_0)).doOnError(this::error).then(Mono.defer(() -> {
            this.shardLogger(".inbound").info("Receiver completed on shard {}", new Object[]{this.shardIndex});
            CloseStatus closeStatus = (CloseStatus)reason.get();
            if (closeStatus != null && !this.closeTrigger.isTerminated()) {
                this.shardLogger(".inbound").info("Forwarding close reason: {}", new Object[]{closeStatus});
                return Mono.error((Throwable)new CloseException(closeStatus));
            }
            return Mono.empty();
        })).log(this.shardLogger(".zip.in"), Level.FINEST, false, new SignalType[0]);
        return Mono.zip((Mono)completionFuture, (Mono)outboundEvents, (Mono)inboundEvents).doOnError(t -> this.log.debug("WebSocket session threw an error: {}", new Object[]{t.toString()})).then();
    }

    public void close() {
        this.log.info("Triggering close sequence");
        this.closeTrigger.onNext((Object)CloseStatus.NORMAL_CLOSE);
        this.completionNotifier.onComplete();
    }

    public void error(Throwable error) {
        this.log.warn("Triggering error sequence ({})", new Object[]{error.toString()});
        if (!this.completionNotifier.isTerminated()) {
            if (error instanceof CloseException) {
                this.completionNotifier.onError(error);
            } else {
                this.completionNotifier.onError((Throwable)new CloseException(new CloseStatus(1006, error.toString()), error));
            }
        }
    }

    private Logger shardLogger(String gateway) {
        return Loggers.getLogger((String)("discord4j.gateway" + gateway + "." + this.shardIndex));
    }
}

