/*
 * Decompiled with CFR 0.152.
 */
package org.activemq;

import javax.jms.ConnectionConsumer;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.ServerSession;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import org.activemq.ActiveMQConnection;
import org.activemq.ActiveMQMessageDispatcher;
import org.activemq.ActiveMQQueueSession;
import org.activemq.ActiveMQSession;
import org.activemq.ActiveMQTopicSession;
import org.activemq.io.util.MemoryBoundedQueue;
import org.activemq.message.ActiveMQMessage;
import org.activemq.message.ConsumerInfo;

public class ActiveMQConnectionConsumer
implements ConnectionConsumer,
ActiveMQMessageDispatcher {
    private ActiveMQConnection connection;
    private ServerSessionPool sessionPool;
    private ConsumerInfo consumerInfo;
    private boolean closed;
    protected MemoryBoundedQueue messageQueue;

    protected ActiveMQConnectionConsumer(ActiveMQConnection theConnection, ServerSessionPool theSessionPool, ConsumerInfo theConsumerInfo, int theMaximumMessages) throws JMSException {
        this.connection = theConnection;
        this.sessionPool = theSessionPool;
        this.consumerInfo = theConsumerInfo;
        this.connection.addConnectionConsumer(this);
        this.consumerInfo.setStarted(true);
        this.consumerInfo.setPrefetchNumber(theMaximumMessages);
        this.connection.syncSendPacket(this.consumerInfo);
        String queueName = this.connection.clientID + ":" + theConsumerInfo.getConsumerName() + ":" + theConsumerInfo.getConsumerNo();
        this.messageQueue = this.connection.getMemoryBoundedQueue(queueName);
    }

    public boolean isTarget(ActiveMQMessage message) {
        return message.isConsumerTarget(this.consumerInfo.getConsumerNo());
    }

    public void dispatch(ActiveMQMessage message) {
        if (message.isConsumerTarget(this.consumerInfo.getConsumerNo())) {
            message.setConsumerIdentifer(this.consumerInfo.getConsumerId());
            message.setTransientConsumed(!this.consumerInfo.isDurableTopic() && !this.consumerInfo.getDestination().isQueue());
            try {
                if (this.sessionPool != null) {
                    this.dispatchToSession(message);
                } else {
                    this.dispatchToQueue(message);
                }
            }
            catch (JMSException jmsEx) {
                this.connection.handleAsyncException(jmsEx);
            }
        }
    }

    private void dispatchToQueue(ActiveMQMessage message) throws JMSException {
        this.messageQueue.enqueue(message);
    }

    public ActiveMQMessage receive(long timeout) throws JMSException {
        try {
            ActiveMQMessage message = (ActiveMQMessage)this.messageQueue.dequeue(timeout);
            return message;
        }
        catch (InterruptedException ioe) {
            return null;
        }
    }

    private void dispatchToSession(ActiveMQMessage message) throws JMSException {
        ServerSession serverSession = this.sessionPool.getServerSession();
        Session nestedSession = serverSession.getSession();
        ActiveMQSession session = null;
        if (nestedSession instanceof ActiveMQSession) {
            session = (ActiveMQSession)nestedSession;
        } else if (nestedSession instanceof ActiveMQTopicSession) {
            ActiveMQTopicSession topicSession = (ActiveMQTopicSession)nestedSession;
            session = (ActiveMQSession)topicSession.getNext();
        } else if (nestedSession instanceof ActiveMQQueueSession) {
            ActiveMQQueueSession queueSession = (ActiveMQQueueSession)nestedSession;
            session = (ActiveMQSession)queueSession.getNext();
        } else {
            throw new JMSException("Invalid instance of session obtained from server session.The instance should be one of the following: ActiveMQSession, ActiveMQTopicSession, ActiveMQQueueSession. Found instance of " + nestedSession.getClass().getName());
        }
        session.dispatch(message);
        serverSession.start();
    }

    public ServerSessionPool getServerSessionPool() throws JMSException {
        if (this.closed) {
            throw new IllegalStateException("The Connection Consumer is closed");
        }
        return this.sessionPool;
    }

    public void close() throws JMSException {
        if (!this.closed) {
            this.closed = true;
            this.consumerInfo.setStarted(false);
            this.connection.asyncSendPacket(this.consumerInfo);
            this.connection.removeConnectionConsumer(this);
        }
    }
}

