package com.github.twitch4j.pubsub;

import com.github.philippheuer.events4j.core.EventManager;
import com.github.twitch4j.client.websocket.WebsocketConnection;
import com.github.twitch4j.client.websocket.domain.WebsocketConnectionState;
import com.github.twitch4j.common.config.ProxyConfig;
import com.github.twitch4j.common.events.TwitchEvent;
import com.github.twitch4j.common.util.CryptoUtils;
import com.github.twitch4j.common.util.TimeUtils;
import com.github.twitch4j.common.util.TypeConvert;
import com.github.twitch4j.pubsub.domain.PubSubRequest;
import com.github.twitch4j.pubsub.domain.PubSubResponse;
import com.github.twitch4j.pubsub.domain.PubSubResponsePayload;
import com.github.twitch4j.pubsub.enums.PubSubType;
import com.github.twitch4j.pubsub.events.PubSubAuthRevokeEvent;
import com.github.twitch4j.pubsub.events.PubSubConnectionStateEvent;
import com.github.twitch4j.pubsub.events.PubSubListenResponseEvent;
import com.github.twitch4j.pubsub.handlers.HandlerRegistry;
import com.github.twitch4j.pubsub.handlers.TopicHandler;
import com.github.twitch4j.util.IBackoffStrategy;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/twitch4j/pubsub/TwitchPubSub.class */
public class TwitchPubSub implements ITwitchPubSub {
    public static final int REQUIRED_THREAD_COUNT = 1;
    private final EventManager eventManager;
    private final WebsocketConnection connection;
    private static final String WEB_SOCKET_SERVER = "wss://pubsub-edge.twitch.tv:443";
    private final Runnable flushCommand;
    protected final Future<?> queueTask;
    protected final Future<?> heartbeatTask;
    protected final ScheduledExecutorService taskExecutor;
    private final Collection<String> botOwnerIds;
    private final Consumer<PubSubResponsePayload> fallbackTopicHandler;
    private static final Logger log = LoggerFactory.getLogger(TwitchPubSub.class);
    private static final Pattern LISTEN_AUTH_TOKEN = Pattern.compile("(\\{.*\"type\"\\s*?:\\s*?\"LISTEN\".*\"data\"\\s*?:\\s*?\\{.*\"auth_token\"\\s*?:\\s*?\").+(\".*}\\s*?})");
    private final Object $lock = new Object[0];
    private final AtomicBoolean flushing = new AtomicBoolean();
    private final AtomicBoolean flushRequested = new AtomicBoolean();
    protected volatile boolean isClosed = false;
    protected final BlockingQueue<String> commandQueue = new ArrayBlockingQueue(128);
    protected final Set<PubSubRequest> subscribedTopics = ConcurrentHashMap.newKeySet();
    protected volatile long lastPing = TimeUtils.getCurrentTimeInMillis() - 240000;
    protected volatile long lastPong = TimeUtils.getCurrentTimeInMillis();

    public TwitchPubSub(WebsocketConnection websocketConnection, EventManager eventManager, ScheduledThreadPoolExecutor scheduledThreadPoolExecutor, ProxyConfig proxyConfig, Collection<String> collection, int i, IBackoffStrategy iBackoffStrategy, int i2, Consumer<PubSubResponsePayload> consumer) {
        this.eventManager = eventManager;
        this.taskExecutor = scheduledThreadPoolExecutor;
        this.botOwnerIds = collection;
        if (consumer != null) {
            this.fallbackTopicHandler = consumer;
        } else {
            this.fallbackTopicHandler = pubSubResponsePayload -> {
                log.warn("Unparsable Message: " + pubSubResponsePayload.getTopic() + "|" + pubSubResponsePayload.getMessage());
            };
        }
        if (websocketConnection == null) {
            this.connection = new WebsocketConnection(websocketConnectionConfig -> {
                websocketConnectionConfig.baseUrl(WEB_SOCKET_SERVER);
                websocketConnectionConfig.closeDelay(i2);
                websocketConnectionConfig.wsPingPeriod(i);
                websocketConnectionConfig.onStateChanged((websocketConnectionState, websocketConnectionState2) -> {
                    eventManager.publish(new PubSubConnectionStateEvent(websocketConnectionState, websocketConnectionState2, this));
                });
                websocketConnectionConfig.onPreConnect(this::onPreConnect);
                websocketConnectionConfig.onConnected(this::onConnected);
                websocketConnectionConfig.onTextMessage(this::onTextMessage);
                BlockingQueue<String> blockingQueue = this.commandQueue;
                Objects.requireNonNull(blockingQueue);
                websocketConnectionConfig.onPostDisconnect(blockingQueue::clear);
                websocketConnectionConfig.taskExecutor(scheduledThreadPoolExecutor);
                websocketConnectionConfig.proxyConfig(proxyConfig);
                if (iBackoffStrategy != null) {
                    websocketConnectionConfig.backoffStrategy(iBackoffStrategy);
                }
            });
        } else {
            this.connection = websocketConnection;
        }
        this.eventManager.getServiceMediator().addService("twitch4j-pubsub", this);
        connect();
        this.heartbeatTask = scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> {
            if (this.isClosed || this.connection.getConnectionState() != WebsocketConnectionState.CONNECTED) {
                return;
            }
            PubSubRequest pubSubRequest = new PubSubRequest();
            pubSubRequest.setType(PubSubType.PING);
            sendCommand(TypeConvert.objectToJson(pubSubRequest));
            log.debug("PubSub: Sending PING!");
            this.lastPing = TimeUtils.getCurrentTimeInMillis();
        }, 0L, 4L, TimeUnit.MINUTES);
        this.flushCommand = () -> {
            if (this.flushing.getAndSet(true)) {
                return;
            }
            while (true) {
                if (this.isClosed) {
                    break;
                }
                try {
                    if (this.lastPong < this.lastPing && TimeUtils.getCurrentTimeInMillis() >= this.lastPing + 10000) {
                        break;
                    }
                    if (!WebsocketConnectionState.CONNECTED.equals(this.connection.getConnectionState())) {
                        break;
                    }
                    String poll = this.commandQueue.poll();
                    if (poll == null) {
                        break;
                    }
                    sendCommand(poll);
                    if (log.isDebugEnabled()) {
                        Matcher matcher = LISTEN_AUTH_TOKEN.matcher(poll);
                        log.debug("Processed command from queue: [{}].", matcher.find() ? matcher.group(1) + "•••" + matcher.group(2) : poll);
                    }
                } catch (Exception e) {
                    log.error("PubSub: Unexpected error in worker thread", e);
                }
            }
            this.flushRequested.set(false);
            this.flushing.set(false);
        };
        this.queueTask = scheduledThreadPoolExecutor.scheduleWithFixedDelay(this.flushCommand, 0L, 2500L, TimeUnit.MILLISECONDS);
        log.debug("PubSub: Started Queue Worker Thread");
    }

    public void connect() {
        this.connection.connect();
    }

    public void disconnect() {
        this.connection.disconnect();
    }

    public void reconnect() {
        synchronized (this.$lock) {
            this.connection.reconnect();
        }
    }

    protected void onPreConnect() {
        this.lastPong = TimeUtils.getCurrentTimeInMillis();
        this.lastPing = this.lastPong - 240000;
    }

    protected void onConnected() {
        log.info("Connected to Twitch PubSub {}", WEB_SOCKET_SERVER);
        this.subscribedTopics.forEach(this::queueRequest);
    }

    protected void onTextMessage(String str) {
        try {
            log.trace("Received WebSocketMessage: " + str);
            PubSubResponse pubSubResponse = (PubSubResponse) TypeConvert.jsonToObject(str, PubSubResponse.class);
            if (pubSubResponse.getType().equals(PubSubType.MESSAGE)) {
                String[] split = StringUtils.split(pubSubResponse.getData().getTopic(), '.');
                TopicHandler topicHandler = HandlerRegistry.INSTANCE.getHandlers().get(split[0]);
                boolean z = true;
                if (topicHandler != null) {
                    TwitchEvent twitchEvent = null;
                    try {
                        twitchEvent = topicHandler.apply(new TopicHandler.Args(split, pubSubResponse.getData().getMessage(), this.botOwnerIds));
                    } catch (Exception e) {
                        log.warn("PubSub: Encountered exception when parsing message", e);
                    }
                    if (twitchEvent != null) {
                        z = false;
                        try {
                            this.eventManager.publish(twitchEvent);
                        } catch (Exception e2) {
                            log.warn("An event consumer threw an exception while processing a PubSub event", e2);
                        }
                    }
                }
                if (z) {
                    this.fallbackTopicHandler.accept(pubSubResponse.getData());
                }
            } else if (pubSubResponse.getType().equals(PubSubType.RESPONSE)) {
                this.eventManager.publish(new PubSubListenResponseEvent(pubSubResponse.getNonce(), pubSubResponse.getError(), () -> {
                    for (PubSubRequest pubSubRequest : this.subscribedTopics) {
                        if (pubSubRequest != null && StringUtils.equals(pubSubResponse.getNonce(), pubSubRequest.getNonce())) {
                            return pubSubRequest;
                        }
                    }
                    return null;
                }));
                if (pubSubResponse.getError().length() > 0) {
                    if (pubSubResponse.getError().equalsIgnoreCase("ERR_BADAUTH")) {
                        log.error("PubSub: You used a invalid oauth token to subscribe to the topic. Please use a token that is authorized for the specified channel.");
                    } else {
                        log.error("PubSub: Failed to subscribe to topic - [" + pubSubResponse.getError() + "]");
                    }
                }
            } else if (pubSubResponse.getType().equals(PubSubType.PONG)) {
                log.debug("PubSub: Received PONG response!");
                this.lastPong = TimeUtils.getCurrentTimeInMillis();
            } else if (pubSubResponse.getType().equals(PubSubType.RECONNECT)) {
                log.warn("PubSub: Server instance we're connected to will go down for maintenance soon, reconnecting to obtain a new connection!");
                reconnect();
            } else if (pubSubResponse.getType() == PubSubType.AUTH_REVOKED) {
                Object obj = ((PubSubRequest) TypeConvert.jsonToObject(str, PubSubRequest.class)).getData().get("topics");
                if (obj instanceof Collection) {
                    HashMap hashMap = new HashMap();
                    for (Object obj2 : (Collection) obj) {
                        if (obj2 instanceof String) {
                            hashMap.put((String) obj2, null);
                        } else {
                            log.warn("Unparsable Revocation Topic: {}", obj2);
                        }
                    }
                    if (hashMap.isEmpty()) {
                        return;
                    }
                    this.subscribedTopics.removeIf(pubSubRequest -> {
                        Object obj3 = pubSubRequest.getData().get("topics");
                        if (!(obj3 instanceof Collection) || ((Collection) obj3).size() != 1) {
                            return false;
                        }
                        Object next = ((Collection) obj3).iterator().next();
                        return (next instanceof String) && hashMap.replace((String) next, null, pubSubRequest);
                    });
                    this.eventManager.publish(new PubSubAuthRevokeEvent(this, Collections.unmodifiableMap(hashMap)));
                } else {
                    log.warn("Unparsable Revocation: {}", str);
                }
            } else {
                log.debug("PubSub: Unknown Message Type: " + pubSubResponse);
            }
        } catch (Exception e3) {
            log.warn("PubSub: Unparsable Message: " + str + " - [" + e3.getMessage() + "]", e3);
        }
    }

    private void sendCommand(String str) {
        if (WebsocketConnectionState.CONNECTED.equals(this.connection.getConnectionState()) || WebsocketConnectionState.CONNECTING.equals(this.connection.getConnectionState())) {
            this.connection.sendText(str);
        } else {
            log.warn("Can't send IRC-WS Command [{}]", str);
        }
    }

    private void queueRequest(PubSubRequest pubSubRequest) {
        this.commandQueue.add(TypeConvert.objectToJson(pubSubRequest));
        if (this.flushing.get() || this.flushRequested.getAndSet(true)) {
            return;
        }
        this.taskExecutor.schedule(this.flushCommand, 50L, TimeUnit.MILLISECONDS);
    }

    @Override // com.github.twitch4j.pubsub.ITwitchPubSub
    public PubSubSubscription listenOnTopic(PubSubRequest pubSubRequest) {
        if (this.subscribedTopics.add(pubSubRequest)) {
            checkListenCount(pubSubRequest);
            queueRequest(pubSubRequest);
        }
        return new PubSubSubscription(pubSubRequest);
    }

    @Override // com.github.twitch4j.pubsub.ITwitchPubSub
    public boolean unsubscribeFromTopic(PubSubSubscription pubSubSubscription) {
        PubSubRequest request = pubSubSubscription.getRequest();
        if (request.getType() != PubSubType.LISTEN) {
            log.warn("Cannot unsubscribe using request with unexpected type: {}", request.getType());
            return false;
        }
        if (!this.subscribedTopics.remove(request)) {
            log.warn("Not subscribed to topic: {}", request);
            return false;
        }
        PubSubRequest pubSubRequest = new PubSubRequest();
        pubSubRequest.setType(PubSubType.UNLISTEN);
        pubSubRequest.setNonce(CryptoUtils.generateNonce(30));
        pubSubRequest.setData(request.getData());
        queueRequest(pubSubRequest);
        return true;
    }

    @Override // com.github.twitch4j.pubsub.ITwitchPubSub
    public long getLatency() {
        return this.connection.getLatency();
    }

    @Override // com.github.twitch4j.pubsub.ITwitchPubSub, java.lang.AutoCloseable
    public void close() {
        if (!this.isClosed) {
            this.isClosed = true;
            this.heartbeatTask.cancel(false);
            this.queueTask.cancel(false);
            this.connection.close();
        }
    }

    private void checkListenCount(PubSubRequest pubSubRequest) {
        Object obj = pubSubRequest.getData().get("topics");
        if (!(obj instanceof Collection) || ((Collection) obj).size() <= 1) {
            return;
        }
        log.warn("Listening to multiple PubSub topics in a single request is not recommended; automatic topic management can degrade upon PubSubAuthRevokeEvent");
    }

    @Override // com.github.twitch4j.pubsub.ITwitchPubSub
    public EventManager getEventManager() {
        return this.eventManager;
    }
}
